线程池

ZaynPei Lv6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <chrono>
#include <string>

// 使用 std::function 重新定义任务,使其可以接受任何可调用对象(函数、lambda等)
using Task = std::function<void()>;

// 一个线程安全的、阻塞的任务队列
class SafeTaskQueue {
private:
std::queue<Task> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
bool m_shutdown = false;

public:
// 添加任务到队列
void addTask(Task task) {
// 使用 lock_guard 自动管理锁的生命周期
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(std::move(task));
// 唤醒一个可能正在等待的线程
m_cond.notify_one();
}

// 从队列中取出一个任务
Task takeTask() {
// unique_lock 功能比 lock_guard 更强大,可以配合 condition_variable 使用
std::unique_lock<std::mutex> lock(m_mutex);
// wait 会阻塞当前线程,直到条件满足或被唤醒
// 使用 lambda 表达式作为等待条件,可以防止伪唤醒
m_cond.wait(lock, [this] { return m_shutdown || !m_queue.empty(); });

if (m_shutdown && m_queue.empty()) {
return nullptr;
}

Task task = std::move(m_queue.front());
m_queue.pop();
return task;
}

// 关闭队列,唤醒所有等待的线程
void shutdown() {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_shutdown = true;
}
m_cond.notify_all();
}

// 获取队列大小(主要用于manager)
size_t size() {
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.size();
}
};


class ThreadPool {
private:
int m_minNum;
int m_maxNum;
std::atomic<int> m_busyNum;
std::atomic<int> m_aliveNum;
std::atomic<bool> m_shutdown;

std::vector<std::thread> m_workers;
std::thread m_manager;
SafeTaskQueue m_taskQ;

// 工作线程的任务函数
void worker() {
while (!m_shutdown) {
Task task = m_taskQ.takeTask();
// 如果取出的任务为空函数,说明是时候退出了
if (task == nullptr) {
break;
}

m_busyNum++;
task(); // 执行任务
m_busyNum--;
}
m_aliveNum--;
std::cout << "Thread " << std::this_thread::get_id() << " exiting..." << std::endl;
}

// 管理者线程的任务函数
void manager() {
while (!m_shutdown) {
std::this_thread::sleep_for(std::chrono::seconds(5));

int queueSize = m_taskQ.size();
int liveNum = m_aliveNum.load();
int busyNum = m_busyNum.load();

// 扩容:任务数 > 存活数 && 存活数 < 最大数
const int NUMBER = 2;
if (queueSize > liveNum && liveNum < m_maxNum) {
int count = 0;
// 一次最多创建 NUMBER 个线程
for (int i = 0; i < m_maxNum && count < NUMBER && m_aliveNum < m_maxNum; ++i) {
m_workers.emplace_back(&ThreadPool::worker, this);
m_aliveNum++;
count++;
}
}

// 缩容:忙碌数*2 < 存活数 && 存活数 > 最小数
// (注意:这是一个简化的缩容逻辑,实际工程中需要更复杂的机制来安全地 join 线程)
if (busyNum * 2 < liveNum && liveNum > m_minNum) {
// 在这个简化模型中,我们不再实现动态缩容,因为安全地终止和join一个正在运行的std::thread比较复杂。
// 生产环境的线程池通常在析构时统一清理。
}
}
}

public:
ThreadPool(int minNum, int maxNum)
: m_minNum(minNum), m_maxNum(maxNum), m_busyNum(0), m_aliveNum(0), m_shutdown(false) {

m_aliveNum = minNum;
for (int i = 0; i < minNum; ++i) {
// emplace_back 直接在 vector 中构造线程对象
m_workers.emplace_back(&ThreadPool::worker, this);
}

// 创建管理者线程
m_manager = std::thread(&ThreadPool::manager, this);
}

~ThreadPool() {
m_shutdown = true;
m_manager.join(); // 等待管理者线程退出

m_taskQ.shutdown(); // 通知所有工作线程准备退出

for (auto& t : m_workers) {
if (t.joinable()) {
t.join(); // 等待所有工作线程执行完毕并退出
}
}
}

// 添加任务
template<class F, class... Args>
void addTask(F&& f, Args&&... args) {
if (m_shutdown) return;
// 使用 std::bind 和完美转发来创建任务
Task task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
m_taskQ.addTask(std::move(task));
}

int getBusyNumber() const { return m_busyNum; }
int getAliveNumber() const { return m_aliveNum; }
};


// ================== 测试代码 ==================
void my_task_function(int id, int sleep_ms) {
std::cout << "Task " << id << " is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}

int main() {
ThreadPool pool(3, 10);

// 添加多个任务
for (int i = 0; i < 20; ++i) {
pool.addTask(my_task_function, i, 500);
}

std::cout << "All tasks have been added." << std::endl;

// 等待一段时间观察线程池活动
std::this_thread::sleep_for(std::chrono::seconds(10));

std::cout << "Main thread is preparing to shutdown." << std::endl;

// 析构函数会自动被调用,完成线程池的优雅关闭
return 0;
}
On this page
线程池