Ractor

ZaynPei Lv6

Reactor 模式,也称为“反应器模式”或“分发者模式”(Dispatcher Pattern),是一种用于处理并发服务请求事件驱动设计模式。它的核心思想是将所有事件(如 I/O 事件、定时器事件等)的监听分发任务交给一个中心化的组件Reactor),当事件发生时,Reactor 负责将事件分发给预先注册的、特定的处理程序Event Handler)来进行处理

这种模式特别适用于需要高效处理大量并发、短时连接的 I/O 密集型应用,例如网络服务器。

它是一种设计模式,而IO多路复用+线程池则是Reactor模式的一种具体实现技术。 这里中心化的组件就是epoll, 而预先注册的处理程序就是线程池中的线程。

组件

Reactor 模式主要由以下五个组件构成:

  • 句柄(Handle): 代表一个可以产生事件的 I/O 资源。在网络编程中,这通常是一个文件描述符(File Descriptor, fd),例如一个 socket 连接。它是操作系统内核用来唯一标识 I/O 资源的整数。

  • 同步事件多路分发器(Synchronous Event Demultiplexer): 这是 Reactor 模式的引擎。它是一个系统调用,能够同时监听多个句柄上的事件。这个调用是同步阻塞的,即程序会在这里等待,直到至少有一个句柄上发生了它所关心的事件。

    • 为何需要它:这是避免忙轮询(busy-waiting)的关键。如果没有它,程序就需要不断地轮询所有句柄,极大地浪费 CPU 资源。
    • 常见实现:Linux 系统下的 select、poll、epoll,以及 BSD/macOS 下的 kqueue。其中 epoll 是目前 Linux 上性能最高的实现, 因此下面主要以 epoll 为例进行说明。
  • 事件处理器接口(Event Handler): 定义了一系列用于处理不同类型事件的方法(回调函数)的接口。例如,handle_read()、handle_write()、handle_error() 等。它是一个抽象的基类,具体的业务逻辑由其子类实现。

  • 具体事件处理器(Concrete Event Handler): 实现了事件处理器接口的类。它负责执行与特定事件相关的具体业务逻辑。例如,一个 HttpEventHandler 在 handle_read() 方法中可能会读取 HTTP 请求数据、解析请求头,然后在 handle_write() 中发送 HTTP 响应。例如在网络模型中, 有至少下列两种具体处理器:

    • Acceptor (接收器):一种特殊的 Handler,它只与监听套接字 (listen_sockfd) 绑定。它的唯一职责就是处理“新连接”事件,即调用 accept(),然后创建一个新的 Connection Handler 来处理这个新连接。
    • Connection Handler (连接处理器):与客户端套接字 (client_sockfd) 绑定。它负责处理这个连接上的所有读写事件:解析请求、执行业务逻辑、准备并发送响应。
    • 在具体实现中, 这些函数会在一个新的线程中执行, 以避免阻塞主事件循环。
  • 反应器(Reactor): 模式的中心组件,扮演着“分发者”的角色。它主要负责以下工作:

    • 注册与注销:提供接口,让应用程序可以将一个事件处理器和其关心的句柄绑定(注册)到 Reactor 上,或是在不需要时解绑(注销)。
    • 运行事件循环(Event Loop):Reactor 内部包含一个循环。在每次循环中,它会调用“同步事件多路分发器”来等待事件的发生。
    • 事件分发:当“同步事件多路分发器”返回时,表明有事件发生。Reactor 会根据返回的句柄和事件类型,查找之前注册的事件处理器,并调用其对应的方法来处理事件。

工作流程

下面是 Reactor 模式的典型工作流程,以一个网络服务器为例:

  1. 初始化:服务器启动,创建一个 Reactor 对象和一个用于监听新连接的句柄(listen_fd)。

  2. 注册处理器:服务器创建一个 AcceptorEventHandler(一个具体的事件处理器),并将其与 listen_fdREAD 事件一起注册到 Reactor 中。这是在告诉 Reactor,如果 listen_fd 上有新连接请求(表现为可读事件),就调用 AcceptorEventHandler 的处理方法。

  3. 启动事件循环:服务器调用 Reactor 的 event_loop() 方法,启动事件循环。

  4. 等待事件:Reactor 在循环中调用 epoll_wait()(同步事件多路分发器),阻塞程序,等待事件发生。

  5. 事件发生与分发:

    • 一个客户端发起连接,listen_fd 变为可读。
    • epoll_wait() 从阻塞中返回,并告知 Reactor listen_fd 上有 READ 事件。
    • Reactor 查找注册表,发现这个事件应该由 AcceptorEventHandler 处理。
    • Reactor 调用 AcceptorEventHandlerhandle_read() 方法。
  6. 处理连接事件:AcceptorEventHandler 的 handle_read() 方法内部调用 accept() 函数,接受新的客户端连接,并获得一个新的句柄 client_fd。然后,它会创建一个新的 ConnectionEventHandler(用于处理与该客户端的数据交互),并将其与 client_fd 上的 READ 或 WRITE 事件一起注册到同一个 Reactor 中。

  7. 处理数据事件:

    • 如果客户端发送数据,client_fd 变为可读。
    • epoll_wait() 再次返回,告知 Reactor client_fd 上有 READ 事件。
    • Reactor 分发事件给 ConnectionEventHandler,调用其 handle_read() 方法来读取和处理数据。
  8. 循环往复:Reactor 不断重复第 4 到第 7 步,持续地等待和分发事件。

Reactor 的几种演进模型

根据线程模型的不同,Reactor 模式可以分为以下几种:

alt text

单线程 Reactor 模型: 所有操作(接受连接、事件分发、I/O 读写、业务逻辑处理)都在同一个线程中完成。

优点在于实现简单,没有多线程带来的锁竞争上下文切换开销; 逻辑清晰,所有状态都在一个线程内管理。

缺点是无法利用多核 CPU 的性能。如果某个业务逻辑处理非常耗时(例如数据库查询),会阻塞整个事件循环,导致其他所有客户端的请求都无法被及时响应。

适用于业务逻辑非常简单、快速的场景。例如,Redis 就是一个典型的单线程 Reactor 模型,因为它的绝大部分操作都是内存操作,速度极快。

alt text

多线程 Reactor 模型: 一个主线程(Reactor 线程)负责监听和分发事件(accept、demultiplex、dispatch), 业务逻辑的处理(非 I/O 操作)则交给一个工作线程池(Worker Thread Pool)来执行。

主线程在接收到 I/O 事件后,将数据读取出来,封装成一个任务,然后提交给工作线程池。工作线程池中的线程执行完业务逻辑后,可能会将结果交还给主线程去发送。

优点是充分利用了多核 CPU 的计算能力, 将耗时的业务逻辑与 I/O 处理分离,避免了 I/O 线程的阻塞。

缺点是引入了多线程,需要处理线程安全问题(如共享数据的同步)。而且主线程仍然是性能瓶颈,因为它需要处理所有连接的 I/O 事件

alt text

主从 Reactor 模型(Main-Sub Reactor): 这是对多线程模型的一种优化,也是现代高性能网络框架(如 Netty)普遍采用的模型。

其 Reactor 部分由两部分组成:

  • 主 Reactor(Main Reactor):一个独立的线程,只负责一件事——监听并接受新的客户端连接 (accept)。

  • 从 Reactor(Sub Reactors):一个或多个独立的线程,每个从 Reactor 都有自己的事件多路分发器(epoll实例), 负责处理已连接客户端的 I/O 事件(读写)。

也就是说, 主 Reactor 接受新连接后,不自己处理,而是通过某种负载均衡策略(如轮询)将这个新的连接分配给一个从 Reactor。被选中的从 Reactor 将这个连接的句柄加入到自己的事件监听集合中。之后,这个连接上所有的 I/O 事件(读、写)都由这个从 Reactor 来负责处理。

而业务逻辑可以由从 Reactor 线程自己执行,也可以再分发给另外的工作线程池

优点:职责单一, 主 Reactor 只管连接,从 Reactor 只管 I/O,职责清晰; 扩展性强, 可以通过增加从 Reactor 的数量来线性地提升处理能力,充分利用多核 CPU; 性能优异:主 Reactor 不会因为处理 I/O 而成为瓶颈,从 Reactors 之间也基本没有数据竞争。

代表实现是Netty、Memcached 等。

单线程 Reactor 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
* 程序名:reactor_server.cpp
* 功能:一个完整的、基于 Epoll ET模式 + 非阻塞IO 的 Reactor 模型服务器。
* 业务:一个 Echo 服务器,将客户端发来的消息转为大写后返回。
*/
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <cerrno>
#include <cctype>

// 前向声明 Reactor 类,因为 EventHandler 中会用到它
class Reactor;

// 抽象事件处理器基类 (Handler)
// 提供统一的接口,让不同类型的事件(监听 socket、客户端 socket)都能被 Reactor 调度。
// 所有具体的事件处理器(Acceptor, ConnectionHandler)都继承自它。
class EventHandler {
public:
virtual ~EventHandler() {}
// 纯虚函数,由子类实现,用于处理具体的事件
virtual void handle_event(uint32_t events) = 0;
// 纯虚函数,由子类实现,用于获取该处理器关联的文件描述符
virtual int get_fd() const = 0;
};

// Reactor 核心类 (反应器/调度器)
// 负责管理 epoll 实例,运行事件循环,并将事件分发给对应的 EventHandler。
class Reactor {
private:
int epoll_fd; // epoll 实例的文件描述符
// 使用哈希表存储 fd 到其对应 EventHandler 指针的映射
std::unordered_map<int, EventHandler*> handlers;

public:
// 构造函数:创建 epoll 实例
Reactor() {
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
}

// 析构函数:关闭 epoll 文件描述符
~Reactor() {
close(epoll_fd);
}

// 注册事件处理器
void register_handler(EventHandler* handler, uint32_t events) {
int fd = handler->get_fd();
struct epoll_event ev; // 作用是描述要监听的事件, epoll_event的结构是:
// struct epoll_event {
// uint32_t events; /* Epoll events (bit mask) */
// epoll_data_t data; /* User data variable */
// };
ev.events = events;
ev.data.ptr = handler; // 核心:将 handler 指针存入 data.ptr,实现 fd 和 handler 的绑定

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { // Epoll得以监听该 fd, 关心可读事件 (EPOLLIN)和边缘触发 (EPOLLET)
perror("epoll_ctl: add");
return;
}
handlers[fd] = handler; // 将 fd 和 handler 的映射关系存入哈希表
}

// 修改事件处理器监听的事件
void modify_handler(EventHandler* handler, uint32_t events) {
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = handler;

if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0) {
perror("epoll_ctl: mod");
}
}

// 移除事件处理器
void remove_handler(EventHandler* handler) {
int fd = handler->get_fd();
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0) {
perror("epoll_ctl: del");
}
handlers.erase(fd); // 从哈希表中移除映射关系
// 注意:这里由调用者负责 delete handler 对象,Reactor 本身不管理其生命周期
}

// 事件循环 (Event Loop)
void event_loop() {
std::vector<struct epoll_event> ready_events(1024); // 用于存储就绪事件的数组
while (true) {
// 阻塞等待事件发生
int n = epoll_wait(epoll_fd, ready_events.data(), ready_events.size(), -1);
if (n < 0) {
if (errno == EINTR) continue; // 被信号中断,继续等待
perror("epoll_wait");
break;
}

// 遍历所有就绪的事件
for (int i = 0; i < n; ++i) {
// 从 data.ptr 中取回 handler 指针,并进行类型转换
EventHandler* handler = static_cast<EventHandler*>(ready_events[i].data.ptr);
// 调用 handler 的方法处理事件
handler->handle_event(ready_events[i].events);
}
}
}
};

// 工具函数:设置文件描述符为非阻塞模式
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl: F_GETFL");
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl: F_SETFL");
return false;
}
return true;
}

// 具体事件处理器:处理已连接客户端的读写事件 (Connection Handler)
class ConnectionHandler : public EventHandler {
private:
int fd; // 客户端 socket fd
Reactor* reactor; // 指向 Reactor 的指针,用于修改或移除自身
std::string read_buffer; // 读缓冲区
std::string write_buffer; // 写缓冲区

public:
ConnectionHandler(int cfd, Reactor* r) : fd(cfd), reactor(r) {}

~ConnectionHandler() {
close(fd);
}

int get_fd() const override { return fd; }

// 事件处理入口
void handle_event(uint32_t events) override {
if (events & (EPOLLHUP | EPOLLERR)) { // 发生挂起或错误
handle_close();
return;
}
if (events & EPOLLIN) { // 可读事件
handle_read();
}
if (events & EPOLLOUT) { // 可写事件
handle_write();
}
}

private:
// 处理读事件
void handle_read() {
char buf[1024];
ssize_t n;
// ET 模式,必须循环读取直到返回 EAGAIN
while ((n = read(fd, buf, sizeof(buf))) > 0) {
read_buffer.append(buf, n);
}

if (n == 0) { // 客户端关闭连接
handle_close();
} else if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { // 真正的错误
perror("read");
handle_close();
}
}

// 收到数据后,进行业务处理并准备发送
if (!read_buffer.empty()) {
// 业务逻辑:将接收到的字符转为大写
for (char &c : read_buffer) {
c = toupper(c);
}
write_buffer += read_buffer; // 将处理后的数据放入写缓冲区
read_buffer.clear();
// 数据已准备好,注册写事件,以便在 socket 可写时发送
reactor->modify_handler(this, EPOLLIN | EPOLLOUT | EPOLLET);
}
}

// 处理写事件
void handle_write() {
ssize_t n;
// 循环写入,直到写缓冲区为空或 socket 发送缓冲区满
while (!write_buffer.empty()) {
n = write(fd, write_buffer.c_str(), write_buffer.length());
if (n > 0) {
write_buffer.erase(0, n); // 从写缓冲区中移除已发送的数据
} else if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { // 真正的错误
perror("write");
handle_close();
}
break; // 发送缓冲区满了,等待下一次 EPOLLOUT 通知
}
}

// 如果数据都写完了,就不再关心写事件,避免 epoll_wait 忙轮询
if (write_buffer.empty()) {
reactor->modify_handler(this, EPOLLIN | EPOLLET);
}
}

// 处理关闭连接
void handle_close() {
std::cout << "Client " << fd << " disconnected." << std::endl;
reactor->remove_handler(this); // 从 Reactor 中移除自己
delete this; // 自我销毁,释放资源
}
};

// 具体事件处理器:处理新连接的接收事件 (Acceptor)
class Acceptor : public EventHandler {
private:
int listen_fd; // 监听 socket fd
Reactor* reactor; // 指向 Reactor 的指针

public:
Acceptor(int lfd, Reactor* r) : listen_fd(lfd), reactor(r) {}

int get_fd() const override { return listen_fd; }

// 处理新连接事件
void handle_event(uint32_t events) override {
if (events & EPOLLIN) {
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd;
// ET 模式,必须循环 accept 直到返回 EAGAIN
while ((client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len)) > 0) {
std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr)
<< ":" << ntohs(client_addr.sin_port) << std::endl;

set_nonblocking(client_fd); // 将新连接设置为非阻塞

// 为新连接创建一个 ConnectionHandler 并注册到 Reactor
ConnectionHandler* handler = new ConnectionHandler(client_fd, reactor);
reactor->register_handler(handler, EPOLLIN | EPOLLET); // 初始只关心读事件
}
if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) {
perror("accept");
}
}
}
};

// 主函数
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>\n";
return 1;
}

// 1. 创建、绑定、监听 socket
int port = atoi(argv[1]);
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) { perror("socket"); return 1; }

int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(port);

if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
perror("bind"); return 1;
}

if (listen(listen_fd, SOMAXCONN) < 0) {
perror("listen"); return 1;
}

// 2. 将监听 socket 设置为非阻塞,配合 ET 模式
set_nonblocking(listen_fd);

// -------------3. 初始化 Reactor 和 Acceptor--------------
Reactor reactor;
Acceptor acceptor(listen_fd, &reactor);

// 4. 将 Acceptor 注册到 Reactor,监听新连接事件
reactor.register_handler(&acceptor, EPOLLIN | EPOLLET);

// 5. 启动服务器
std::cout << "Reactor Server is running on port " << port << "..." << std::endl;
reactor.event_loop(); // 进入事件循环

close(listen_fd);
return 0;
}

多线程 Reactor 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
* 程序名:reactor_server_multithread.cpp
* 功能:一个基于 Epoll ET + 非阻塞IO 的多线程 Reactor 模型服务器。
* I/O 线程负责读写,业务逻辑交由独立的线程池处理。
* 业务:一个 Echo 服务器,将客户端发来的消息转为大写后返回。
*/
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <cstring>
#include <cstdlib>
#include <unistd.h> // POSIX 标准操作系统 API,如 read, write, close
#include <sys/socket.h> // 套接字编程 API
#include <sys/epoll.h> // Epoll API
#include <netinet/in.h> // Internet 地址族
#include <arpa/inet.h> // IP 地址转换函数
#include <fcntl.h> // 文件控制,如设置非阻塞
#include <cerrno> // 错误码
#include <cctype> // 字符处理函数,如 toupper
#include <memory> // 智能指针
#include <thread> // 线程库
#include <mutex> // 互斥锁
#include <condition_variable> // 条件变量
#include <queue> // 队列
#include <functional> // std::function

// --- 线程池实现 ---
// 一个简单的固定大小的线程池
class ThreadPool {
private:
std::vector<std::thread> workers; // 存储工作线程的容器
std::queue<std::function<void()>> tasks; // 任务队列,存储待执行的任务
std::mutex queue_mutex; // 保护任务队列的互斥锁
std::condition_variable condition; // 条件变量,用于唤醒等待的线程
bool stop; // 线程池停止标志

public:
// 构造函数,创建指定数量的线程
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
// emplace_back 直接在容器末尾构造线程对象
workers.emplace_back([this] {
// 工作线程的无限循环
while (true) {
std::function<void()> task;
{
// 使用 unique_lock 管理互斥锁
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待条件:线程池停止 或 任务队列不为空
// wait 会原子地解锁并阻塞,被唤醒后重新加锁并检查条件
// 最终线程全部阻塞在这里等待任务到来
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });

// 如果线程池停止且任务队列为空,则线程退出
if (this->stop && this->tasks.empty()) return;

// 从队列中取出一个任务
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务(此时锁已释放,不阻塞其他线程取任务)
task();
}
});
}
}

// 提交任务到线程池
template<class F>
void submit_task(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 如果线程池已停止,则不允许提交新任务
if (stop) throw std::runtime_error("submit on stopped ThreadPool");
// 将任务添加到队列
tasks.emplace(std::forward<F>(f));
}
// 唤醒一个正在等待的线程来处理任务
condition.notify_one();
}

// 析构函数,优雅地关闭线程池
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true; // 设置停止标志
}
// 唤醒所有线程,让他们能够检查到 stop 标志并退出
condition.notify_all();
// 等待所有工作线程执行完毕
for (std::thread &worker : workers) {
worker.join();
}
}
};


// --- Reactor 框架 ---

class Reactor;

// 事件处理器接口(抽象基类)
class EventHandler {
public:
virtual ~EventHandler() {}
// 事件处理函数,由子类实现
virtual void handle_event(uint32_t events) = 0;
// 获取文件描述符
virtual int get_fd() const = 0;
};

// Reactor 类,负责事件的分发
class Reactor {
private:
int epoll_fd; // epoll 实例的文件描述符
// 使用智能指针管理 EventHandler 的生命周期
// 键是文件描述符,值是对应的事件处理器
std::unordered_map<int, std::shared_ptr<EventHandler>> handlers;
std::mutex mtx; // 保护 handlers 的互斥锁,防止多线程并发修改

public:
Reactor() {
epoll_fd = epoll_create1(0); // 创建 epoll 实例
if (epoll_fd < 0) { perror("epoll_create1"); exit(EXIT_FAILURE); }
}
~Reactor() { close(epoll_fd); }

// 注册、修改、移除 handler 都需要加锁以保证线程安全
void register_handler(std::shared_ptr<EventHandler> handler, uint32_t events) {
std::lock_guard<std::mutex> lock(mtx); // 自动加锁和解锁
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events; // 设置要监听的事件
ev.data.ptr = handler.get(); // 存储裸指针,用于快速查找

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
perror("epoll_ctl: add");
return;
}
handlers[fd] = handler; // 将 handler 的所有权交给 map
}

void modify_handler(std::shared_ptr<EventHandler> handler, uint32_t events) {
// modify_handler 可能会被工作线程调用,所以必须是线程安全的
std::lock_guard<std::mutex> lock(mtx);
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = handler.get();

if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0) {
perror("epoll_ctl: mod");
}
}

void remove_handler(std::shared_ptr<EventHandler> handler) {
std::lock_guard<std::mutex> lock(mtx);
int fd = handler->get_fd();
// 从 epoll 中移除监听
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0) {
perror("epoll_ctl: del");
}
// 从 map 中移除 handler,shared_ptr 的引用计数会减少
handlers.erase(fd);
}

// 事件循环,Reactor 的核心
void event_loop() {
std::vector<struct epoll_event> ready_events(1024);
while (true) {
// 等待事件发生,-1 表示无限等待
int n = epoll_wait(epoll_fd, ready_events.data(), ready_events.size(), -1);
if (n < 0) {
if (errno == EINTR) continue; // 被信号中断,继续等待
perror("epoll_wait");
break;
}

// 遍历所有就绪的事件
for (int i = 0; i < n; ++i) {
// 从 epoll_event 中获取之前存储的裸指针
EventHandler* handler_ptr = static_cast<EventHandler*>(ready_events[i].data.ptr);
std::shared_ptr<EventHandler> handler;
{
// 从 map 中获取 shared_ptr,保证在处理事件期间对象不会被销毁
// 这是一个关键的线程安全操作
std::lock_guard<std::mutex> lock(mtx);
auto it = handlers.find(handler_ptr->get_fd());
if (it != handlers.end()) {
handler = it->second; // 复制 shared_ptr,增加引用计数
}
}

if (handler) {
// 调用对应的事件处理函数
handler->handle_event(ready_events[i].events);
}
}
}
}
};

// 工具函数:设置文件描述符为非阻塞模式
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) { perror("fcntl: F_GETFL"); return false; }
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl: F_SETFL"); return false; }
return true;
}

// 连接处理器,负责处理单个客户端连接的读写事件
class ConnectionHandler : public EventHandler, public std::enable_shared_from_this<ConnectionHandler> {
private:
int fd; // 客户端连接的 socket fd
Reactor* reactor; // 指向所属 Reactor 的指针
ThreadPool* thread_pool; // 指向业务线程池的指针
std::string read_buffer; // 读缓冲区
std::string write_buffer; // 写缓冲区
std::mutex mtx; // 保护 write_buffer,因为工作线程和I/O线程都可能访问它

public:
ConnectionHandler(int cfd, Reactor* r, ThreadPool* tp) : fd(cfd), reactor(r), thread_pool(tp) {}
~ConnectionHandler() { close(fd); } // 析构时关闭 socket

int get_fd() const override { return fd; }

// 事件分发
void handle_event(uint32_t events) override {
if (events & (EPOLLHUP | EPOLLERR)) { handle_close(); return; } // 错误或挂起
if (events & EPOLLIN) { handle_read(); } // 可读事件
if (events & EPOLLOUT) { handle_write(); } // 可写事件
}

private:
// 处理读事件
void handle_read() {
char buf[1024];
ssize_t n;
// ET 模式,必须循环读取直到返回 EAGAIN 或 EWOULDBLOCK
while ((n = read(fd, buf, sizeof(buf))) > 0) {
read_buffer.append(buf, n);
}

if (n == 0) { // 对端关闭连接
handle_close();
} else if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { // 发生真实错误
perror("read");
handle_close();
}
}

if (!read_buffer.empty()) {
// 将业务逻辑提交给线程池
std::string data = std::move(read_buffer); // 移动数据,避免拷贝
read_buffer.clear();

// 使用 shared_from_this() 获取自身的 shared_ptr,安全地传递给工作线程
auto self = shared_from_this();
thread_pool->submit_task([self, data] {
self->process_and_send(data);
});
}
}

// 该函数由工作线程执行,处理业务逻辑并准备发送数据
void process_and_send(std::string data) {
// 1. 执行业务逻辑(转为大写)
for (char &c : data) {
c = toupper(c);
}

// 2. 将结果放入写缓冲区,并注册写事件
{
std::lock_guard<std::mutex> lock(mtx);
write_buffer += data;
}
// 通知 Reactor,该连接现在对写事件也感兴趣
reactor->modify_handler(shared_from_this(), EPOLLIN | EPOLLOUT | EPOLLET);
}

// 处理写事件
void handle_write() {
std::lock_guard<std::mutex> lock(mtx); // 写操作前加锁
ssize_t n;
// ET 模式,循环写入
while (!write_buffer.empty()) {
n = write(fd, write_buffer.c_str(), write_buffer.length());
if (n > 0) {
write_buffer.erase(0, n); // 从缓冲区移除已发送的数据
} else if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { // 真实错误
perror("write");
handle_close();
}
break; // 如果是 EAGAIN,则等待下一次 EPOLLOUT 通知
}
}

// 如果写缓冲区已空,说明数据已全部发送,取消对写事件的关注
if (write_buffer.empty()) {
reactor->modify_handler(shared_from_this(), EPOLLIN | EPOLLET);
}
}

// 处理连接关闭
void handle_close() {
std::cout << "Client " << fd << " disconnected." << std::endl;
reactor->remove_handler(shared_from_this());
// 对象由 shared_ptr 自动管理,当所有 shared_ptr 失效时,对象会被销毁
}
};

// 连接接收器,负责接受新的客户端连接
class Acceptor : public EventHandler, public std::enable_shared_from_this<Acceptor> {
private:
int listen_fd; // 监听 socket
Reactor* reactor; // 指向所属 Reactor
ThreadPool* thread_pool; // 指向线程池

public:
Acceptor(int lfd, Reactor* r, ThreadPool* tp) : listen_fd(lfd), reactor(r), thread_pool(tp) {}
int get_fd() const override { return listen_fd; }

// 只处理读事件(即新连接到来)
void handle_event(uint32_t events) override {
if (events & EPOLLIN) {
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd;
// ET 模式,循环 accept 直到返回 EAGAIN
while ((client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len)) > 0) {
std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr)
<< ":" << ntohs(client_addr.sin_port) << std::endl;

set_nonblocking(client_fd); // 将新连接设置为非阻塞

// 为新连接创建一个 ConnectionHandler
auto handler = std::make_shared<ConnectionHandler>(client_fd, reactor, thread_pool);
// 将新连接注册到 Reactor,监听读事件
reactor->register_handler(handler, EPOLLIN | EPOLLET);
}
if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) {
perror("accept");
}
}
}
};

int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>\n";
return 1;
}

int port = atoi(argv[1]);
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) { perror("socket"); return 1; }

// 设置地址重用,方便服务器快速重启
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网络接口
serv_addr.sin_port = htons(port);

if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
perror("bind"); return 1;
}

if (listen(listen_fd, SOMAXCONN) < 0) {
perror("listen"); return 1;
}

set_nonblocking(listen_fd); // 将监听 socket 设置为非阻塞

// --------- 初始化 Reactor, ThreadPool, Acceptor ---------
Reactor reactor;
// 获取硬件支持的线程数,创建相应数量的工作线程
unsigned int num_threads = std::thread::hardware_concurrency();
ThreadPool pool(num_threads > 0 ? num_threads : 4); // 如果获取失败,默认4个

// 使用智能指针管理 Acceptor 的生命周期, 注意和单线程版本的区别
auto acceptor = std::make_shared<Acceptor>(listen_fd, &reactor, &pool);

// 将 Acceptor 注册到 Reactor,监听新连接事件
reactor.register_handler(acceptor, EPOLLIN | EPOLLET);

std::cout << "Multi-threaded Reactor Server is running on port " << port << " with " << num_threads << " worker threads..." << std::endl;
reactor.event_loop(); // 主线程进入事件循环,成为 I/O 线程

close(listen_fd);
return 0;
}

主从 Reactor 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
/*
* 程序名:reactor_server_full.cpp
* 功能:一个完整的主从 Reactor + 线程池模型服务器。
* Main Reactor (主线程) 负责 accept 连接并分发。
* Sub Reactors (I/O 线程) 负责网络 I/O 读写。
* Worker Thread Pool (工作线程) 负责业务逻辑处理。
* 业务:一个 Echo 服务器,将客户端发来的消息转为大写后返回。
*/
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <cerrno>
#include <cctype>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <atomic>

// --- 线程池实现 (与多线程 Reactor 版本相同) ---
class ThreadPool {
private:
std::vector<std::thread> workers; // 存储工作线程的容器
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 保护任务队列的互斥锁
std::condition_variable condition; // 用于唤醒等待任务的线程的条件变量
bool stop; // 线程池停止标志

public:
// 构造函数:创建指定数量的工作线程
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
// 工作线程的执行循环
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待条件:线程池停止 或 任务队列不为空
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
// 如果线程池已停止且任务队列为空,则线程可以安全退出
if (this->stop && this->tasks.empty()) return;
// 从队列中获取一个任务
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务(在锁之外,避免阻塞其他工作线程)
task();
}
});
}
}

// 提交任务到线程池
template<class F>
void submit_task(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) throw std::runtime_error("submit on stopped ThreadPool");
tasks.emplace(std::forward<F>(f));
}
// 唤醒一个等待的线程来处理新任务
condition.notify_one();
}

// 析构函数:优雅地关闭线程池
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true; // 设置停止标志
}
condition.notify_all(); // 唤醒所有线程,让他们检查到 stop 标志并退出
for (std::thread &worker : workers) {
worker.join(); // 等待所有线程执行完毕
}
}
};


// --- 前向声明 ---
class SubReactor;
class EventHandler;

// 工具函数:设置文件描述符为非阻塞模式
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) { perror("fcntl: F_GETFL"); return false; }
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl: F_SETFL"); return false; }
return true;
}

// --- 事件处理器基类 (接口) ---
class EventHandler {
public:
virtual ~EventHandler() {}
// 纯虚函数,处理事件的接口,由子类实现
virtual void handle_event(uint32_t events) = 0;
// 纯虚函数,获取文件描述符的接口
virtual int get_fd() const = 0;
};

// --- 连接处理器 (负责处理单个客户端连接的读写,由 SubReactor 管理) ---
class ConnectionHandler : public EventHandler, public std::enable_shared_from_this<ConnectionHandler> {
private:
int fd; // 客户端连接的 socket fd
SubReactor* owner_reactor; // 指向管理它的 SubReactor
ThreadPool* thread_pool; // 指向业务逻辑处理的线程池
std::string read_buffer; // 读缓冲区
std::string write_buffer; // 写缓冲区
std::mutex mtx; // 保护 write_buffer,因为它会被 I/O 线程和工作线程访问

public:
ConnectionHandler(int cfd, SubReactor* reactor, ThreadPool* pool)
: fd(cfd), owner_reactor(reactor), thread_pool(pool) {}
~ConnectionHandler() { close(fd); } // 析构时自动关闭 socket

int get_fd() const override { return fd; }

// 事件分发器,根据 epoll 返回的事件类型调用不同的处理函数
void handle_event(uint32_t events) override;

private:
void handle_read(); // 处理读事件(在 I/O 线程中执行)
void handle_write(); // 处理写事件(在 I/O 线程中执行)
void handle_close(); // 处理关闭事件(在 I/O 线程中执行)
void process_data(std::string data); // 处理业务逻辑(在工作线程中执行)
};

// --- SubReactor (运行在 I/O 线程中,负责处理网络 I/O) ---
class SubReactor {
private:
int epoll_fd; // 每个 SubReactor 拥有自己的 epoll 实例
int wakeup_fd; // 用于主线程唤醒此 Reactor 的 eventfd
ThreadPool* thread_pool; // 指向共享的业务线程池
std::unordered_map<int, std::shared_ptr<EventHandler>> handlers; // 管理此 Reactor 上的所有事件处理器
std::thread worker_thread; // 运行 event_loop 的 I/O 线程
std::mutex mtx; // 保护 handlers 的互斥锁

// 用于暂存主线程发来的新连接 fd 的队列
std::vector<int> pending_fds;
std::mutex pending_fds_mtx; // 保护 pending_fds 的互斥锁

public:
SubReactor(ThreadPool* pool) : thread_pool(pool) {
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) { perror("SubReactor epoll_create1"); exit(EXIT_FAILURE); }

// 创建一个 eventfd 用于跨线程唤醒
wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (wakeup_fd < 0) { perror("SubReactor eventfd"); exit(EXIT_FAILURE); }

// 为 wakeup_fd 创建一个专门的处理器,以便在 epoll 中监听它
struct WakeupHandler : EventHandler {
SubReactor* self;
WakeupHandler(SubReactor* s) : self(s) {}
int get_fd() const override { return self->wakeup_fd; }
void handle_event(uint32_t events) override {
uint64_t one;
// 读取 eventfd 的数据,清除事件通知,否则会一直触发
ssize_t n = read(self->wakeup_fd, &one, sizeof(one));
}
};
// 将 wakeup_handler 注册到 epoll 中,监听读事件
register_handler(std::make_shared<WakeupHandler>(this), EPOLLIN);
}

~SubReactor() {
close(epoll_fd);
close(wakeup_fd);
}

// 启动 I/O 线程,开始事件循环
void start() {
worker_thread = std::thread(&SubReactor::event_loop, this);
}

// 等待 I/O 线程结束
void join() {
if(worker_thread.joinable()) worker_thread.join();
}

// 在 SubReactor 自己的 I/O 线程中,将一个新连接注册到 epoll
void add_new_connection(int client_fd) {
set_nonblocking(client_fd);
// 创建 ConnectionHandler,并传递线程池指针
auto handler = std::make_shared<ConnectionHandler>(client_fd, this, thread_pool);
register_handler(handler, EPOLLIN | EPOLLET); // 监听读事件和边缘触发
}

// 供 MainReactor 调用的接口,用于投递新连接(跨线程)
void post_new_connection(int fd) {
{
// 将新连接的 fd 放入待处理队列
std::lock_guard<std::mutex> lock(pending_fds_mtx);
pending_fds.push_back(fd);
}
// 向 wakeup_fd 写入数据,以唤醒阻塞在 epoll_wait 的 I/O 线程
uint64_t one = 1;
ssize_t n = write(wakeup_fd, &one, sizeof(one));
if (n != sizeof(one)) {
perror("SubReactor write to wakeup_fd");
}
}

// 注册事件处理器到 epoll(线程安全)
void register_handler(std::shared_ptr<EventHandler> handler, uint32_t events) {
std::lock_guard<std::mutex> lock(mtx);
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = handler.get();
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { perror("SubReactor epoll_ctl: add"); return; }
handlers[fd] = handler;
}

// 修改 epoll 中已注册的事件(线程安全)
void modify_handler(std::shared_ptr<EventHandler> handler, uint32_t events) {
std::lock_guard<std::mutex> lock(mtx);
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = handler.get();
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0) { perror("SubReactor epoll_ctl: mod"); }
}

// 从 epoll 中移除事件处理器(线程安全)
void remove_handler(std::shared_ptr<EventHandler> handler) {
std::lock_guard<std::mutex> lock(mtx);
int fd = handler->get_fd();
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0) { perror("SubReactor epoll_ctl: del"); }
handlers.erase(fd);
}

// 事件循环,运行在 I/O 线程中
void event_loop() {
std::vector<struct epoll_event> ready_events(1024);
while (true) {
int n = epoll_wait(epoll_fd, ready_events.data(), ready_events.size(), -1);
if (n < 0) { if (errno == EINTR) continue; perror("SubReactor epoll_wait"); break; }

// 检查是否有从主线程发来的新连接需要处理
{
std::vector<int> new_fds;
std::lock_guard<std::mutex> lock(pending_fds_mtx);
new_fds.swap(pending_fds); // 高效交换,减少锁内时间
for (int fd : new_fds) {
add_new_connection(fd); // 注册新连接
}
}

// 遍历处理所有就绪的 I/O 事件
for (int i = 0; i < n; ++i) {
EventHandler* handler_ptr = static_cast<EventHandler*>(ready_events[i].data.ptr);
// 如果是唤醒事件,其 handler 已经处理过了,这里跳过
if (handler_ptr->get_fd() == wakeup_fd) continue;

// 从 map 中安全地获取 shared_ptr,确保 handler 在处理期间不会被销毁
std::shared_ptr<EventHandler> handler;
{
std::lock_guard<std::mutex> lock(mtx);
auto it = handlers.find(handler_ptr->get_fd());
if (it != handlers.end()) handler = it->second;
}
if (handler) handler->handle_event(ready_events[i].events);
}
}
}
};

// --- ConnectionHandler 成员函数实现 ---
void ConnectionHandler::handle_event(uint32_t events) {
if (events & (EPOLLHUP | EPOLLERR)) { handle_close(); return; }
if (events & EPOLLIN) { handle_read(); }
if (events & EPOLLOUT) { handle_write(); }
}

void ConnectionHandler::handle_read() {
char buf[1024];
ssize_t n;
// ET 模式,必须循环读取直到返回 EAGAIN
while ((n = read(fd, buf, sizeof(buf))) > 0) {
read_buffer.append(buf, n);
}
if (n == 0) { handle_close(); }
else if (n < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("read"); handle_close(); }}

// 如果读取到了数据,则将其提交给线程池处理
if (!read_buffer.empty()) {
std::string data = std::move(read_buffer);
read_buffer.clear();
// 使用 shared_from_this() 获取自身的 shared_ptr,安全地传递给工作线程
auto self = shared_from_this();
thread_pool->submit_task([self, data] {
self->process_data(data);
});
}
}

// 此函数由工作线程执行
void ConnectionHandler::process_data(std::string data) {
// 1. 执行耗时的业务逻辑(这里是转为大写)
for (char &c : data) {
c = toupper(c);
}
// 2. 将处理结果放入写缓冲区(需要加锁保护)
{
std::lock_guard<std::mutex> lock(mtx);
write_buffer += data;
}
// 3. 通知 I/O 线程(SubReactor),该连接现在有数据要写
// 通过修改 epoll 监听事件,增加对 EPOLLOUT 的关注
owner_reactor->modify_handler(shared_from_this(), EPOLLIN | EPOLLOUT | EPOLLET);
}

// 此函数由 I/O 线程执行
void ConnectionHandler::handle_write() {
std::lock_guard<std::mutex> lock(mtx); // 保护 write_buffer
ssize_t n;
// ET 模式,循环写入直到缓冲区为空或遇到 EAGAIN
while (!write_buffer.empty()) {
n = write(fd, write_buffer.c_str(), write_buffer.length());
if (n > 0) { write_buffer.erase(0, n); }
else if (n < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("write"); handle_close(); } break; }
}
// 如果写缓冲区已空,取消对写事件的关注,避免 CPU 空转
if (write_buffer.empty()) {
owner_reactor->modify_handler(shared_from_this(), EPOLLIN | EPOLLET);
}
}

void ConnectionHandler::handle_close() {
std::cout << "Client " << fd << " disconnected." << std::endl;
owner_reactor->remove_handler(shared_from_this());
}

// --- Acceptor (属于 MainReactor,负责接受新连接并分发) ---
class Acceptor : public EventHandler {
private:
int listen_fd;
std::vector<SubReactor*>& sub_reactors; // 持有所有 SubReactor 的指针
std::atomic<size_t> next_sub_reactor_idx; // 用于轮询分发新连接的原子索引

public:
Acceptor(int lfd, std::vector<SubReactor*>& subs) : listen_fd(lfd), sub_reactors(subs), next_sub_reactor_idx(0) {}
int get_fd() const override { return listen_fd; }
void handle_event(uint32_t events) override {
if (events & EPOLLIN) {
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd;
// ET 模式,循环 accept 直到返回 EAGAIN
while ((client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &len)) > 0) {
std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << std::endl;
// 使用轮询(Round-Robin)策略选择一个 SubReactor
size_t idx = next_sub_reactor_idx.fetch_add(1) % sub_reactors.size();
// 将新连接的 fd 投递给选中的 SubReactor
sub_reactors[idx]->post_new_connection(client_fd);
}
if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) { perror("accept"); }
}
}
};

// --- MainReactor (运行在主线程,只负责 accept) ---
class MainReactor {
private:
int epoll_fd;
std::unordered_map<int, std::shared_ptr<EventHandler>> handlers; // 只管理 Acceptor
public:
MainReactor() {
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) { perror("MainReactor epoll_create1"); exit(EXIT_FAILURE); }
}
~MainReactor() { close(epoll_fd); }
void register_handler(std::shared_ptr<EventHandler> handler, uint32_t events) {
int fd = handler->get_fd();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = handler.get();
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { perror("MainReactor epoll_ctl: add"); }
handlers[fd] = handler;
}
// 主 Reactor 的事件循环,非常简单,只处理 Acceptor 的事件
void event_loop() {
std::vector<struct epoll_event> ready_events(16);
while (true) {
int n = epoll_wait(epoll_fd, ready_events.data(), ready_events.size(), -1);
for (int i = 0; i < n; ++i) {
EventHandler* handler = static_cast<EventHandler*>(ready_events[i].data.ptr);
handler->handle_event(ready_events[i].events);
}
}
}
};

// --- 主函数 ---
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>\n";
return 1;
}

int port = atoi(argv[1]);
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (listen_fd < 0) { perror("socket"); return 1; }
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(port);
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { perror("bind"); return 1; }
if (listen(listen_fd, SOMAXCONN) < 0) { perror("listen"); return 1; }

unsigned int num_io_threads = std::thread::hardware_concurrency();
if (num_io_threads == 0) num_io_threads = 4;

// 1. 创建 Worker 线程池
ThreadPool pool(num_io_threads * 2); // 通常工作线程数可以多于 I/O 线程

// 2. 创建 SubReactors 并启动它们的线程
std::vector<std::unique_ptr<SubReactor>> sub_reactors_pool; // 使用 unique_ptr 管理生命周期
std::vector<SubReactor*> sub_reactors_ptr_pool; // 存储裸指针,方便传递
for (unsigned int i = 0; i < num_io_threads; ++i) {
// 将线程池指针传递给每个 SubReactor
auto sub_reactor = std::make_unique<SubReactor>(&pool);
sub_reactors_ptr_pool.push_back(sub_reactor.get());
sub_reactor->start(); // 启动 I/O 线程
sub_reactors_pool.push_back(std::move(sub_reactor));
}

// 3. 创建 MainReactor 和 Acceptor
MainReactor main_reactor;
auto acceptor = std::make_shared<Acceptor>(listen_fd, sub_reactors_ptr_pool);
main_reactor.register_handler(acceptor, EPOLLIN | EPOLLET);

std::cout << "Full Reactor Server is running on port " << port
<< " with " << num_io_threads << " I/O threads and " << num_io_threads * 2 << " worker threads..." << std::endl;

// 4. MainReactor 进入事件循环 (阻塞主线程,使其成为 Acceptor 线程)
main_reactor.event_loop();

// 等待所有子线程结束(实际上 event_loop 是死循环,这里不会执行到)
for(auto& sub : sub_reactors_pool) {
sub->join();
}

close(listen_fd);
return 0;
}

主 Reactor (Main Reactor):1 个线程。

  • 它只负责一件事:bind、listen,并 epoll_wait 在 listen_fd 上。
  • 它的 handle_event 只调用 accept()。
  • 它不处理任何 read/write。

从 Reactors (Sub-Reactors):N 个线程(通常等于 CPU 核心数)。

  • 每个“从 Reactor” 线程都有自己的 epoll 实例和自己的事件循环。
  • 当主 Reactor accept 一个新连接 client_fd 后,它不会把这个 client_fd 注册到自己的 epoll 里。
  • 相反,它会(例如通过轮询)选择一个“从 Reactor” 线程,并把这个 client_fd “移交” 过去。
  • “从 Reactor” 线程收到这个 client_fd 后,将其注册到自己的 epoll 实例中,开始监听 EPOLLIN 和 EPOLLOUT。

这样, accept 的负载被主 Reactor 承担。所有 read 和 write 的负载被平均分配到了 N 个“从 Reactor” 线程上。

这就把 I/O 操作本身的 CPU 开销分散到了多个 CPU 核心上,实现了真正的 I/O 并行处理。

proactor 模式

Proactor 模式,也称为“主动器模式”,是另一种处理并发 I/O 事件的设计模式。如果说 Reactor 是基于“就绪通知”(Readiness Notification)的模式,那么 Proactor 则是基于“完成通知”(Completion Notification)的模式。

它的核心思想是:应用程序发起一个异步 I/O 操作,然后立即继续执行其他任务;当这个 I/O 操作由操作系统(OS)在后台完成后,操作系统会通知应用程序,应用程序再对操作的结果进行处理。

Proactor 模式通常包含以下几个关键角色:

  • 句柄(Handle): 与 Reactor 中一样,代表一个 I/O 资源,如 socket 文件描述符。

  • 异步操作处理器(Asynchronous Operation Processor): 这是 Proactor 模式的引擎,通常由操作系统内核提供。它负责执行异步 I/O 操作,并在操作完成后,将结果放入一个完成事件队列中。

  • 异步操作(Asynchronous Operation): 指应用程序发起的非阻塞 I/O 请求,例如 async_read 或 async_write。这个请求包含了执行 I/O 所需的所有信息,如句柄、数据缓冲区、操作类型等。

  • 完成处理器(Completion Handler): 这是一个由应用程序定义的回调函数或对象,用于处理已完成的异步 I/O 操作的结果。它包含了 I/O 操作成功或失败后的业务逻辑。

  • 主动器(Proactor): 模式的中心组件,扮演“事件完成分发者”的角色。它在一个独立的线程中运行,主要负责:从“异步操作处理器”的完成事件队列中获取已完成的事件, 以及根据完成的事件,调用与之关联的“完成处理器”的相应方法。这个过程通常被称为“分发”(Dispatch)。

  • 发起者(Initiator): 应用程序的主体部分。它负责创建异步操作和完成处理器,并通过调用异步 I/O 接口来发起操作。发起后,它不会等待操作完成。

典型工作流程

Proactor 模式的典型工作流程如下:

  1. 发起操作:应用程序(Initiator)调用一个异步 I/O 接口(如 async_read)来发起一个操作。在调用时,它会提供:要操作的句柄(socket_fd), 一个用于存放数据的缓冲区(buffer)。一个完成处理器(CompletionHandler),用于在操作完成后被回调。

  2. 操作托管给 OS:调用立即返回,应用程序线程不会被阻塞。这个 I/O 请求被交给操作系统内核(Asynchronous Operation Processor)。

  3. OS 执行 I/O:操作系统内核在后台独立地执行这个读操作。它会等待数据到达,并将数据从内核空间直接拷贝到应用程序提供的 buffer 中。

  4. 等待完成通知:与此同时,Proactor 在自己的事件循环中阻塞等待,监听内核的完成事件队列。

  5. I/O 完成:当数据被成功拷贝到 buffer 中后,操作系统内核会将一个“完成事件”放入完成队列。

  6. Proactor 被唤醒:Proactor 检测到完成队列中有事件,从阻塞状态被唤醒。

  7. 分发事件:Proactor 从队列中取出完成事件,并根据事件信息(例如是哪个句柄上的什么操作完成了)找到对应的 CompletionHandler。

  8. 执行回调:Proactor 调用 CompletionHandler 的回调方法。

  9. 处理数据:CompletionHandler 在其回调方法中执行业务逻辑。此时,数据已经位于它之前提供的 buffer 中,可以直接使用,无需再进行 read 调用。

Reactor vs Proactor 对比

对比维度 Reactor 模式 (同步 I/O) Proactor 模式 (异步 I/O)
核心思想 就绪通知 (Readiness Notification) 完成通知 (Completion Notification)
通知内容 “文件描述符 fd 已准备好进行读/写操作。” “你发起的读/写操作已经完成。”
谁执行 I/O 应用程序线程。收到通知后,调用 read()/write() 操作系统内核。应用程序只需发起请求,内核负责所有 I/O
数据流 收到通知后,应用程序主动从内核缓冲区读取数据 内核直接将数据读到应用程序缓冲区,通知时数据已备好
编程模型 同步事件处理,应用程序主动执行 I/O 操作 异步事件处理,应用程序被动等待 I/O 完成通知
生活比喻 我告诉你菜备好了,你来炒 我把菜炒好了端给你,你直接吃
平台适用性 非常适合 Linux 的 epoll 机制 非常适合 Windows 的 IOCP 机制

Proactor 模式的优缺点

优点:

更高的并发性:I/O 操作由内核完成,不会占用或阻塞应用线程,使得应用线程可以专注于业务逻辑,理论上能达到更高的性能和并发。

简化的应用逻辑:应用程序的逻辑被清晰地分离到“发起操作”和“处理结果”两个部分,代码通常更简洁,避免了复杂的 I/O 就绪状态管理。

CPU 缓存友好:在数据拷贝方面,异步模型可能提供更优化的路径,减少 CPU 缓存失效。

缺点与挑战:

平台依赖性强:真正的 Proactor 模式严重依赖操作系统提供的底层异步 I/O(AIO)支持。Windows 通过 IOCP 提供了完美的 Proactor 模型支持。而Linux虽然有 AIO 接口,但长期以来对网络 socket 的支持不完善或效率不高。因此,在 Linux 上,高性能网络编程几乎都采用基于 epoll 的 Reactor 模式。

实现复杂:底层的 Proactor 模式实现起来非常复杂,需要深入理解操作系统内核机制。

调试困难:基于回调的异步编程模型可能导致“回调地狱”(Callback Hell),内存管理和状态跟踪也比同步模型更复杂,调试难度较大。

在这里还需要指明的是, proactor 模式将IO操作交给操作系统内核, 引发了与Reactor相比执行上下文的不同:

  • Reactor:当 epoll 通知你的应用程序“可以读了”,你的应用程序线程需要发起一个 read() 系统调用。这个调用会导致一次上下文切换(从用户态切换到内核态),由内核将数据从它的缓冲区拷贝到你的用户态缓冲区,然后再进行一次上下文切换(从内核态返回用户态)。这个过程虽然很快,但上下文切换是有成本的。

  • Proactor:你发起异步 async_read 后,这个请求就交给了内核。内核会在后台处理这一切——等待数据、从网卡 DMA 到内核缓冲区、再拷贝到你指定的用户态缓冲区。整个过程都在内核空间完成,直到全部结束后,才通过一次完成通知回到用户态。它用一次更“重”的内核内部处理,换取了多次用户态/内核态之间切换的开销

因此, Proactor 模式消耗的资源总量可能并不少,但它通过减少上下文切换和更有效地利用硬件(如 DMA),使得资源消耗的方式更高效,从而将 CPU 从繁琐的 I/O 等待和数据拷贝中解放出来

而且, Proactor 模式在某些场景下还可以带来额外的性能提升:

  • 更高的 CPU 缓存命中率:由于应用线程不必亲自处理 I/O,它可以持续地执行业务逻辑计算。这使得 CPU 的指令缓存和数据缓存更可能保持“热”状态,从而提高计算密集型业务的性能。而在 Reactor 模式中,I/O 代码和业务逻辑代码交替执行,可能会污染 CPU 缓存。

  • 更优的线程调度:在 Proactor 模式下,应用线程池里的线程只在 I/O 完成后才被唤醒去处理业务逻辑,线程的利用率非常高。而在 Reactor 模式中,I/O 线程既要处理 I/O 也要处理业务逻辑(或分发任务),职责相对不纯粹。

总结

Proactor 模式是一种理想的、性能极高的并发设计模式,它将 I/O 操作的负担完全交给了操作系统,从而解放了应用程序。它的设计哲学是“让专业的人做专业的事”,操作系统负责 I/O,应用程序负责业务。

然而,由于操作系统的支持度不同,Proactor 模式在实践中的应用远没有 Reactor 模式广泛。在 Windows 服务器开发中,基于 IOCP 的 Proactor 是主流选择。但在 Linux 世界,基于 epoll 的 Reactor 模式(特别是主从 Reactor 模型) 才是构建高性能网络服务的王者。

有趣的是,很多现代网络库(如 Boost.Asio)会在上层提供 Proactor 风格的异步 API,但在 Linux 底层,它们常常是通过 Reactor (epoll) + 线程池来模拟 Proactor 的行为。