2025/5/24 线程池 BY C++
1. 核心功能与设计目标
- 目的:管理一个线程队列,用于异步执行提交的任务,避免频繁创建和销毁线程的开销。
- 主要操作:
- 初始化指定数量的工作线程。
- 向任务队列中添加任务 (
enqueue)。 - 工作线程从队列中取出任务并执行。
- 等待所有已提交任务完成 (
waitAllTasks)。 - 安全地停止并销毁线程池 (
~ThreadPool)。
2. 构造函数 ThreadPool(size_t numThreads)
- 初始化成员变量:
taskCount(已提交且未完成的任务数) 初始化为 0。stop(停止标志) 初始化为false。
- 创建工作线程:
- 循环
numThreads次,创建std::thread对象。 - 每个线程执行一个 lambda 函数,该函数是工作线程的核心逻辑。
threads.emplace_back(...): 高效地在std::vector<std::thread>末尾构造线程对象。
- 循环
3. 工作线程逻辑 (在构造函数的 lambda 中)
- 无限循环:
while (true),线程持续运行直到被明确停止。 - 任务获取与同步:
std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁mutex,保护任务队列tasks、停止标志stop和条件变量cv。cv.wait(lock, [this] { return stop || !tasks.empty(); }):- 等待条件:如果
stop为false(未停止) 且tasks为空 (无任务),则线程释放lock并在此阻塞,等待被唤醒。 - 唤醒后检查:被唤醒后,重新获取
lock并再次检查条件。如果条件满足 (即stop为true或tasks非空),则继续执行。
- 等待条件:如果
- 停止处理:
if (stop) { return; }- 如果线程被唤醒是因为
stop变为true(通常在析构时),则线程直接返回,结束执行。
- 如果线程被唤醒是因为
- 获取任务:
task = tasks.front();tasks.pop();- 此时
lock仍然持有,安全地从队列中取出任务。
- 任务执行:
lock在离开作用域时自动释放。task();: 执行取出的任务。这部分在锁之外执行,允许其他线程访问任务队列。
- 任务完成后的处理:
std::unique_lock<std::mutex> lock(mutex): 再次获取主互斥锁mutex。taskCount--;: 任务完成,递减计数器。if (taskCount == 0) { cv.notify_all(); }:- 如果
taskCount减到 0,意味着所有已入队的任务都已完成。 - 唤醒所有可能在
waitAllTasks中等待的线程。
- 如果
4. 任务入队 enqueue(Func &&func, Args &&...args)
- 模板化:接受任意可调用对象
Func及其参数Args...。 - 任务计数增加:
std::unique_lock<std::mutex> lock(mutexCount): 获取mutexCount锁。taskCount++;: 增加任务计数。lock自动释放。
- 任务封装与入队:
std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁mutex。tasks.emplace([func, args...] { func(args...); });:- 创建一个新的 lambda 函数,捕获
func和args。 - 这个新的 lambda (类型为
std::function<void()>) 被添加到tasks队列的末尾。
- 创建一个新的 lambda 函数,捕获
lock自动释放。
- 通知工作线程:
cv.notify_one();: 唤醒一个(可能)正在等待任务的工作线程。
5. 等待所有任务完成 waitAllTasks()
- 同步等待:
std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁mutex。cv.wait(lock, [this] { return taskCount == 0; }):- 等待条件:如果
taskCount不为 0,则释放lock并在此阻塞。 - 唤醒后检查:当被工作线程(在任务完成后且
taskCount变为 0 时)通过cv.notify_all()唤醒时,重新获取lock并检查taskCount == 0。如果为true,则继续。
- 等待条件:如果
lock自动释放,函数返回。
6. 析构函数 ~ThreadPool()
- 设置停止标志:
std::unique_lock<std::mutex> lock(mutex): 获取主互斥锁mutex。stop = true;: 设置停止标志。lock自动释放。
- 唤醒所有工作线程:
cv.notify_all();: 唤醒所有可能因任务队列为空而阻塞的工作线程。它们醒来后会检查到stop == true并退出。
- 等待线程结束:
for (auto &thread : threads) { thread.join(); }: 遍历所有工作线程对象,并对每个线程调用join()。这会阻塞析构函数的执行,直到相应的工作线程完全结束。确保资源被正确清理。
7. 关键成员变量
size_t taskCount: 追踪当前队列中和正在执行的任务总数。用于waitAllTasks。bool stop: 标志位,通知工作线程停止运行。std::mutex mutex: 主互斥锁,保护对tasks队列、stop标志以及cv的访问。也用于保护taskCount的递减操作。std::mutex mutexCount: 一个独立的互斥锁,专门用于保护taskCount的递增操作。std::condition_variable cv: 条件变量,用于工作线程等待任务,以及waitAllTasks等待任务完成。std::vector<std::thread> threads: 存储所有工作线程对象的容器。std::queue<std::function<void()>> tasks: 存储待执行任务的任务队列。任务被封装为std::function<void()>。
8. 同步机制分析
mutex和cv:构成了核心的生产者-消费者模型。enqueue是生产者,将任务放入tasks队列,并通过cv.notify_one()唤醒消费者。- 工作线程是消费者,等待
cv,当tasks非空或stop为真时被唤醒,然后从tasks取出任务。
mutexCount:单独用于保护taskCount的自增操作。- 这是一个值得注意的设计点。通常,如果
taskCount的所有修改都由同一个互斥锁(例如主mutex)保护,可以简化设计。这里使用两个互斥锁,taskCount++在mutexCount下,而taskCount--在mutex下。这需要仔细确保逻辑正确性,避免死锁(在此实现中,由于锁的获取顺序和范围,看起来没有直接的死锁风险,但增加了复杂性)。
- 这是一个值得注意的设计点。通常,如果
stop标志:用于优雅地关闭线程池。当stop为true时,工作线程在cv.wait返回后会检测到此标志并退出。taskCount和cvinwaitAllTasks:waitAllTasks使用mutex和cv来等待taskCount变为0。当最后一个任务完成时,执行该任务的工作线程会递减taskCount并通过cv.notify_all()唤醒waitAllTasks。
9.代码部分展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// thread pool
class ThreadPool {
public:
ThreadPool(size_t numThreads) : taskCount(0), stop(false) {
for (int i = 0; i < numThreads; i++) {
threads.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop) {
return;
}
task = tasks.front();
tasks.pop();
}
task();
{
std::unique_lock<std::mutex> lock(mutex);
taskCount--;
if (taskCount == 0) {
cv.notify_all();
}
}
}
});
}
}
void waitAllTasks() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this] { return taskCount == 0; });
}
template <typename Func, typename... Args>
void enqueue(Func &&func, Args &&...args) {
{
std::unique_lock<std::mutex> lock(mutexCount);
taskCount++;
}
{
std::unique_lock<std::mutex> lock(mutex);
tasks.emplace([func, args...] { func(args...); });
}
cv.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex);
stop = true;
}
cv.notify_all();
for (auto &thread : threads) {
thread.join();
}
}
private:
size_t taskCount; // num of tasks
bool stop; // signol of stop
std::mutex mutex;
std::mutex mutexCount;
std::condition_variable cv;
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
};
本文使用Gemini 2.5 pro生成笔记部分.
不得不说真的很方便,笔记部分清晰又明朗,适合我这种超级懒的人.
😋
本文由作者按照
CC BY 4.0
进行授权