线程池

ZaynPei Lv6

线程池 (Thread Pool) 是一种并发设计模式,它预先创建并维护一组可复用的工作线程,而不是在需要时才创建新线程。这些线程“池化”在一起,等待执行分配给它们的任务。

它的核心思想是将“任务的提交”与“任务的执行”解耦。应用程序的各个部分可以将任务提交给线程池,而无需关心任务具体由哪个线程执行、何时执行。

一个典型的线程池包含以下几个关键部分: - 任务队列 (std::queue<TaskType>):一个用于存放待执行任务的缓冲区 - 它必须是线程安全的。这通常通过一个互斥锁 (std::mutex) 和一个条件变量 (std::condition_variable) 来实现, 这正是我们之前讨论的生产者-消费者模型的完美应用. - 任务提交者是生产者,将任务放入队列。 - 工作线程是消费者,从队列中取出任务执行。 - 任务类型 (TaskType):为了让线程池能执行任意类型的任务,通常使用 std::function<void()> 来包装任务。这使得线程池可以接受普通函数、Lambda 表达式、成员函数等任何可调用对象。 - 工作线程 (std::vector<std::thread>):一组预先创建的、可以循环执行任务的线程。 - 线程池管理器 (ThreadPool):一个类,用于封装整个线程池的逻辑, 负责创建、管理和销毁线程池本身,并向任务队列中添加新任务。

为什么要使用线程池

为了理解线程池的价值,我们首先要看看不使用线程池的原始做法有什么问题。原始做法是:“需要时创建,用完后销毁”。

1
2
3
4
5
void some_task() { /* ... */ }

// 每当有一个新任务,就创建一个新线程
std::thread t(some_task);
t.join(); // 或者 t.detach();
这种做法存在两个致命的缺陷:

  • 高昂的资源开销:
    • 创建开销:线程的创建和销毁是重量级操作,需要调用操作系统内核 API,分配和回收线程栈等内存资源,这个过程相对耗时。
    • 上下文切换开销:如果任务数量巨大,短时间内创建大量线程,会导致 CPU 在这些线程之间频繁进行上下文切换,这会消耗大量 CPU 时间,反而降低了程序的整体性能。
  • 资源耗尽风险:操作系统能够创建的线程数量是有限的。如果不加限制地为每个任务都创建一个线程,当并发请求量激增时,很容易耗尽系统内存和线程资源,最终导致程序崩溃或系统瘫痪。

线程池正是为了解决以上两个问题而生的。

工作流程

  1. 初始化:

    • 创建一个线程池对象,并指定线程数量(例如,CPU 核心数)。
    • 线程池立即创建指定数量的工作线程。
    • 每个工作线程都启动并进入一个无限循环,尝试从任务队列中获取任务。
  2. 等待任务:由于任务队列初始为空,所有工作线程都会在条件变量上 wait(),进入阻塞状态,等待新任务的到来。它们不消耗 CPU 时间。

  3. 提交任务:外部代码(例如 main 函数)调用线程池的 enqueue() 方法,传入一个任务。

    • enqueue() 方法会获取任务队列的锁,将任务 push 进队列,然后释放锁。
    • 之后,它调用条件变量的 notify_one(),唤醒一个正在等待的工作线程。
  4. 执行任务:

    • 被唤醒的线程从 wait() 返回,重新获取锁,从队列中 pop 一个任务,然后释放锁。
    • 线程开始执行取出的任务。任务执行完毕后,线程不会退出,而是返回到无限循环的开始,再次尝试从任务队列获取下一个任务,如果队列为空,则再次进入等待状态。
  5. 销毁:

    • 当程序希望关闭线程池时,会调用 shutdown() 方法(或者析构函数)。
    • 管理器设置一个停止标志位,并调用 notify_all() 唤醒所有工作线程。
    • 工作线程被唤醒后,检查到停止标志,于是退出无限循环。
    • 管理器 join() 所有工作线程,等待它们全部安全退出。

线程池示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include<iostream>
#include<vector>
#include<queue>
#include<mutex>
#include<thread>
#include<condition_variable>
#include<functional>
#include<atomic>
#include<memory>
#include<future>
#include<stdexcept>

class ThreadPool{
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> task_queue;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
public:
ThreadPool(size_t threads): stop(false){
if(threads==0){
throw std::runtime_error("ThreadPool's size must be > 0");
}
for(size_t i=0;i<threads;i++){
workers.emplace_back([this]{ // 这是每个工作线程一生都在执行的循环。
while(true){ // 线程启动后就永远在这个循环里,等待任务
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this]{
return this->stop || !this->task_queue.empty();
});
if(this->stop&&this->task_queue.empty()) return;
task = std::move(this->task_queue.front());
this->task_queue.pop();
}
task();
}
});
}
}
~ThreadPool(){
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(auto& worker: workers){
worker.join();
}
}


template<class F, class... Args>
// 可以接受任何函数和任意数量的参数,并返回一个 future,以便调用者将来可以获取该函数的结果。
// 模板参数在函数中使用, 所以是万能引用
// decltype 会推导表达式 f(args...)(即“调用函数f并传入参数args”)的返回类型。
auto enqueue(F&& f, Args&&... args)->std::future<std::invoke_result_t<F, Args...>>{
using return_type = std::invoke_result_t<F, Args...>; // C++17 的一个类型萃取 (Type Trait), 推导函数 F 和参数 Args... 的返回类型。
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// std::bind 会将函数 f 和它的参数 args... “绑定” 在一起,生成一个新的、不需要参数的可调用对象。
// std::packaged_task<return_type()>是一个包装器,可以将任何可调用对象(如函数、lambda 表达式等)封装成一个任务,并允许我们通过 std::future 获取该任务的结果。
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
task_queue.emplace([task](){
(*task)();
});
condition.notify_one();
return res;
}
}
};


// int main() {
// // 1. 创建一个有 4 个工作线程的线程池
// ThreadPool pool(4);

// // 2. 提交任务,并获取 future
// auto future1 = pool.enqueue([](int b) {
// std::this_thread::sleep_for(std::chrono::seconds(1));
// return b * 2;
// }, 42); // 42 是参数
// auto future2 = pool.enqueue([]() {
// std::this_thread::sleep_for(std::chrono::seconds(2));
// return std::string("Hello");
// });
// // 3. 主线程可以做其他事...
// std::cout << "Main thread is doing other work..." << std::endl;
// // 4. (阻塞) 等待并获取结果
// std::string result2 = future2.get();
// std::cout << "Result 2: " << result2 << std::endl;
// int result1 = future1.get();
// std::cout << "Result 1: " << result1 << std::endl;
// // 5. (线程池在此处析构,自动 join 所有线程)
// return 0;
// }