线程池 (Thread Pool)
是一种并发设计模式,它预先创建并维护一组可复用的工作线程,而不是在需要时才创建新线程。这些线程“池化”在一起,等待执行分配给它们的任务。
它的核心思想是将“任务的提交”与“任务的执行”解耦。应用程序的各个部分可以将任务提交给线程池,而无需关心任务具体由哪个线程执行、何时执行。
一个典型的线程池包含以下几个关键部分: - 任务队列
(std::queue<TaskType>):一个用于存放待执行任务的缓冲区
-
它必须是线程安全的。这通常通过一个互斥锁
(std::mutex) 和一个条件变量 (std::condition_variable)
来实现, 这正是我们之前讨论的生产者-消费者模型的完美应用. -
任务提交者是生产者,将任务放入队列。 -
工作线程是消费者,从队列中取出任务执行。 - 任务类型
(TaskType):为了让线程池能执行任意类型的任务,通常使用
std::function<void()>
来包装任务。这使得线程池可以接受普通函数、Lambda
表达式、成员函数等任何可调用对象。 - 工作线程
(std::vector<std::thread>):一组预先创建的、可以循环执行任务的线程。
- 线程池管理器
(ThreadPool):一个类,用于封装整个线程池的逻辑,
负责创建、管理和销毁线程池本身,并向任务队列中添加新任务。
为什么要使用线程池
为了理解线程池的价值,我们首先要看看不使用线程池的原始做法有什么问题。原始做法是:“需要时创建,用完后销毁”。
1 2 3 4 5
| void some_task() { }
std::thread t(some_task); t.join();
|
这种做法存在两个致命的缺陷:
- 高昂的资源开销:
- 创建开销:线程的创建和销毁是重量级操作,需要调用操作系统内核
API,分配和回收线程栈等内存资源,这个过程相对耗时。
- 上下文切换开销:如果任务数量巨大,短时间内创建大量线程,会导致
CPU 在这些线程之间频繁进行上下文切换,这会消耗大量 CPU
时间,反而降低了程序的整体性能。
- 资源耗尽风险:操作系统能够创建的线程数量是有限的。如果不加限制地为每个任务都创建一个线程,当并发请求量激增时,很容易耗尽系统内存和线程资源,最终导致程序崩溃或系统瘫痪。
线程池正是为了解决以上两个问题而生的。
工作流程
初始化:
- 创建一个线程池对象,并指定线程数量(例如,CPU 核心数)。
- 线程池立即创建指定数量的工作线程。
- 每个工作线程都启动并进入一个无限循环,尝试从任务队列中获取任务。
等待任务:由于任务队列初始为空,所有工作线程都会在条件变量上
wait(),进入阻塞状态,等待新任务的到来。它们不消耗 CPU 时间。
提交任务:外部代码(例如 main 函数)调用线程池的 enqueue()
方法,传入一个任务。
- enqueue() 方法会获取任务队列的锁,将任务 push
进队列,然后释放锁。
- 之后,它调用条件变量的
notify_one(),唤醒一个正在等待的工作线程。
执行任务:
- 被唤醒的线程从 wait() 返回,重新获取锁,从队列中 pop
一个任务,然后释放锁。
- 线程开始执行取出的任务。任务执行完毕后,线程不会退出,而是返回到无限循环的开始,再次尝试从任务队列获取下一个任务,如果队列为空,则再次进入等待状态。
销毁:
- 当程序希望关闭线程池时,会调用 shutdown() 方法(或者析构函数)。
- 管理器设置一个停止标志位,并调用 notify_all()
唤醒所有工作线程。
- 工作线程被唤醒后,检查到停止标志,于是退出无限循环。
- 管理器 join() 所有工作线程,等待它们全部安全退出。
线程池示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| #include<iostream> #include<vector> #include<queue> #include<mutex> #include<thread> #include<condition_variable> #include<functional> #include<atomic> #include<memory> #include<future> #include<stdexcept>
class ThreadPool{ private: std::vector<std::thread> workers; std::queue<std::function<void()>> task_queue; std::mutex queue_mutex; std::condition_variable condition; std::atomic<bool> stop; public: ThreadPool(size_t threads): stop(false){ if(threads==0){ throw std::runtime_error("ThreadPool's size must be > 0"); } for(size_t i=0;i<threads;i++){ workers.emplace_back([this]{ while(true){ std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->task_queue.empty(); }); if(this->stop&&this->task_queue.empty()) return; task = std::move(this->task_queue.front()); this->task_queue.pop(); } task(); } }); } } ~ThreadPool(){ { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(auto& worker: workers){ worker.join(); } }
template<class F, class... Args> auto enqueue(F&& f, Args&&... args)->std::future<std::invoke_result_t<F, Args...>>{ using return_type = std::invoke_result_t<F, Args...>; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if(stop){ throw std::runtime_error("enqueue on stopped ThreadPool"); } task_queue.emplace([task](){ (*task)(); }); condition.notify_one(); return res; } } };
|