文章

2025/5/24 线程池 BY C++

1. 核心功能与设计目标

  • 目的:管理一个线程队列,用于异步执行提交的任务,避免频繁创建和销毁线程的开销。
  • 主要操作
    • 初始化指定数量的工作线程。
    • 向任务队列中添加任务 (enqueue)。
    • 工作线程从队列中取出任务并执行。
    • 等待所有已提交任务完成 (waitAllTasks)。
    • 安全地停止并销毁线程池 (~ThreadPool)。

2. 构造函数 ThreadPool(size_t numThreads)

  • 初始化成员变量
    • taskCount (已提交且未完成的任务数) 初始化为 0。
    • stop (停止标志) 初始化为 false
  • 创建工作线程
    • 循环 numThreads 次,创建 std::thread 对象。
    • 每个线程执行一个 lambda 函数,该函数是工作线程的核心逻辑。
    • threads.emplace_back(...): 高效地在 std::vector<std::thread> 末尾构造线程对象。

3. 工作线程逻辑 (在构造函数的 lambda 中)

  • 无限循环while (true),线程持续运行直到被明确停止。
  • 任务获取与同步
    1. std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁 mutex,保护任务队列 tasks、停止标志 stop 和条件变量 cv
    2. cv.wait(lock, [this] { return stop || !tasks.empty(); }):
      • 等待条件:如果 stopfalse (未停止) 且 tasks 为空 (无任务),则线程释放 lock 并在此阻塞,等待被唤醒。
      • 唤醒后检查:被唤醒后,重新获取 lock 并再次检查条件。如果条件满足 (即 stoptruetasks 非空),则继续执行。
    3. 停止处理if (stop) { return; }
      • 如果线程被唤醒是因为 stop 变为 true (通常在析构时),则线程直接返回,结束执行。
    4. 获取任务
      • task = tasks.front();
      • tasks.pop();
      • 此时 lock 仍然持有,安全地从队列中取出任务。
  • 任务执行
    • lock 在离开作用域时自动释放。
    • task();: 执行取出的任务。这部分在锁之外执行,允许其他线程访问任务队列。
  • 任务完成后的处理
    1. std::unique_lock<std::mutex> lock(mutex): 再次获取主互斥锁 mutex
    2. taskCount--;: 任务完成,递减计数器。
    3. if (taskCount == 0) { cv.notify_all(); }:
      • 如果 taskCount 减到 0,意味着所有已入队的任务都已完成。
      • 唤醒所有可能在 waitAllTasks 中等待的线程。

4. 任务入队 enqueue(Func &&func, Args &&...args)

  • 模板化:接受任意可调用对象 Func 及其参数 Args...
  • 任务计数增加
    1. std::unique_lock<std::mutex> lock(mutexCount): 获取 mutexCount 锁。
    2. taskCount++;: 增加任务计数。
    3. lock 自动释放。
  • 任务封装与入队
    1. std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁 mutex
    2. tasks.emplace([func, args...] { func(args...); });:
      • 创建一个新的 lambda 函数,捕获 funcargs
      • 这个新的 lambda (类型为 std::function<void()>) 被添加到 tasks 队列的末尾。
    3. lock 自动释放。
  • 通知工作线程
    • cv.notify_one();: 唤醒一个(可能)正在等待任务的工作线程。

5. 等待所有任务完成 waitAllTasks()

  • 同步等待
    1. std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁 mutex
    2. cv.wait(lock, [this] { return taskCount == 0; }):
      • 等待条件:如果 taskCount 不为 0,则释放 lock 并在此阻塞。
      • 唤醒后检查:当被工作线程(在任务完成后且 taskCount 变为 0 时)通过 cv.notify_all() 唤醒时,重新获取 lock 并检查 taskCount == 0。如果为 true,则继续。
    3. lock 自动释放,函数返回。

6. 析构函数 ~ThreadPool()

  • 设置停止标志
    1. std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁 mutex
    2. stop = true;: 设置停止标志。
    3. lock 自动释放。
  • 唤醒所有工作线程
    • cv.notify_all();: 唤醒所有可能因任务队列为空而阻塞的工作线程。它们醒来后会检查到 stop == true 并退出。
  • 等待线程结束
    • for (auto &thread : threads) { thread.join(); }: 遍历所有工作线程对象,并对每个线程调用 join()。这会阻塞析构函数的执行,直到相应的工作线程完全结束。确保资源被正确清理。

7. 关键成员变量

  • size_t taskCount: 追踪当前队列中和正在执行的任务总数。用于 waitAllTasks
  • bool stop: 标志位,通知工作线程停止运行。
  • std::mutex mutex: 主互斥锁,保护对 tasks 队列、stop 标志以及 cv 的访问。也用于保护 taskCount 的递减操作。
  • std::mutex mutexCount: 一个独立的互斥锁,专门用于保护 taskCount 的递增操作。
  • std::condition_variable cv: 条件变量,用于工作线程等待任务,以及 waitAllTasks 等待任务完成。
  • std::vector<std::thread> threads: 存储所有工作线程对象的容器。
  • std::queue<std::function<void()>> tasks: 存储待执行任务的任务队列。任务被封装为 std::function<void()>

8. 同步机制分析

  • mutexcv:构成了核心的生产者-消费者模型。
    • enqueue 是生产者,将任务放入 tasks 队列,并通过 cv.notify_one() 唤醒消费者。
    • 工作线程是消费者,等待 cv,当 tasks 非空或 stop 为真时被唤醒,然后从 tasks 取出任务。
  • mutexCount:单独用于保护 taskCount 的自增操作。
    • 这是一个值得注意的设计点。通常,如果 taskCount 的所有修改都由同一个互斥锁(例如主 mutex)保护,可以简化设计。这里使用两个互斥锁,taskCount++mutexCount 下,而 taskCount--mutex 下。这需要仔细确保逻辑正确性,避免死锁(在此实现中,由于锁的获取顺序和范围,看起来没有直接的死锁风险,但增加了复杂性)。
  • stop 标志:用于优雅地关闭线程池。当 stoptrue 时,工作线程在 cv.wait 返回后会检测到此标志并退出。
  • taskCountcv in waitAllTaskswaitAllTasks 使用 mutexcv 来等待 taskCount 变为0。当最后一个任务完成时,执行该任务的工作线程会递减 taskCount 并通过 cv.notify_all() 唤醒 waitAllTasks

9.代码部分展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// thread pool

class ThreadPool {

public:

ThreadPool(size_t numThreads) : taskCount(0), stop(false) {

for (int i = 0; i < numThreads; i++) {

threads.emplace_back([this] {

while (true) {

std::function<void()> task;

  

{

std::unique_lock<std::mutex> lock(mutex);

cv.wait(lock, [this] { return stop || !tasks.empty(); });

if (stop) {

return;

}

task = tasks.front();

tasks.pop();

}

task();

{

std::unique_lock<std::mutex> lock(mutex);

taskCount--;

if (taskCount == 0) {

cv.notify_all();

}

}

}

});

}

}

  

void waitAllTasks() {

std::unique_lock<std::mutex> lock(mutex);

cv.wait(lock, [this] { return taskCount == 0; });

}

  

template <typename Func, typename... Args>

void enqueue(Func &&func, Args &&...args) {

{

std::unique_lock<std::mutex> lock(mutexCount);

taskCount++;

}

  

{

std::unique_lock<std::mutex> lock(mutex);

tasks.emplace([func, args...] { func(args...); });

}

cv.notify_one();

}

  

~ThreadPool() {

{

std::unique_lock<std::mutex> lock(mutex);

stop = true;

}

cv.notify_all();

for (auto &thread : threads) {

thread.join();

}

}

  

private:

size_t taskCount; // num of tasks

bool stop; // signol of stop

std::mutex mutex;

std::mutex mutexCount;

std::condition_variable cv;

  

std::vector<std::thread> threads;

std::queue<std::function<void()>> tasks;

};

本文使用Gemini 2.5 pro生成笔记部分.

不得不说真的很方便,笔记部分清晰又明朗,适合我这种超级懒的人.

😋

本文由作者按照 CC BY 4.0 进行授权