Reactor 模式,也称为“反应器模式”或“分发者模式”(Dispatcher
Pattern),是一种用于处理并发服务请求 的事件驱动设计模式 。它的核心思想是将所有事件(如
I/O
事件、定时器事件等)的监听 和分发 任务交给一个中心化的组件 (Reactor ),当事件发生时,Reactor
负责将事件分发给预先注册的、特定的处理程序 (Event
Handler )来进行处理 。
这种模式特别适用于需要高效处理大量并发、短时连接的 I/O
密集 型应用,例如网络服务器。
它是一种设计模式,而IO多路复用+线程池则是Reactor模式的一种具体实现技术。
这里中心化的组件就是epoll, 而预先注册的处理程序就是线程池中的线程。
组件
Reactor 模式主要由以下五个组件构成:
句柄(Handle): 代表一个可以产生事件的 I/O
资源 。在网络编程中,这通常是一个文件描述符 (File
Descriptor, fd),例如一个 socket 连接。它是操作系统内核用来唯一标识 I/O
资源的整数。
同步事件多路分发器 (Synchronous Event
Demultiplexer): 这是 Reactor
模式的引擎。它是一个系统调用,能够同时监听多个句柄上的事件 。这个调用是同步阻塞 的,即程序会在这里等待,直到至少有一个句柄上发生了它所关心的事件。
为何需要它:这是避免忙轮询(busy-waiting)的关键。如果没有它,程序就需要不断地轮询所有句柄,极大地浪费
CPU 资源。
常见实现:Linux 系统下的 select、poll、epoll,以及 BSD/macOS 下的
kqueue。其中 epoll 是目前 Linux 上性能最高的实现, 因此下面主要以
epoll 为例进行说明。
事件处理器 接口(Event Handler):
定义了一系列用于处理不同类型事件 的方法(回调函数 )的接口。例如,handle_read()、handle_write()、handle_error()
等。它是一个抽象的基类,具体的业务逻辑由其子类实现。
具体事件处理器(Concrete Event Handler):
实现了事件处理器接口的类。它负责执行与特定事件相关的具体业务逻辑 。例如,一个
HttpEventHandler 在 handle_read() 方法中可能会读取 HTTP
请求数据、解析请求头,然后在 handle_write() 中发送 HTTP
响应。例如在网络模型中, 有至少下列两种具体处理器:
Acceptor (接收器):一种特殊的
Handler,它只与监听套接字 (listen_sockfd)
绑定。它的唯一职责就是处理“新连接”事件,即调用
accept(),然后创建一个新的 Connection Handler 来处理这个新连接。
Connection Handler
(连接处理器):与客户端套接字 (client_sockfd)
绑定。它负责处理这个连接上的所有读写事件:解析请求、执行业务逻辑、准备并发送响应。
在具体实现中, 这些函数会在一个新的线程中执行,
以避免阻塞主事件循环。
反应器 (Reactor):
模式的中心组件,扮演着“分发者”的角色。它主要负责以下工作:
注册与注销 :提供接口,让应用程序可以将一个事件处理器和其关心的句柄绑定(注册)到
Reactor 上,或是在不需要时解绑(注销)。
运行事件循环 (Event Loop):Reactor
内部包含一个循环。在每次循环中,它会调用“同步事件多路分发器”来等待事件的发生。
事件分发 :当“同步事件多路分发器”返回时,表明有事件发生。Reactor
会根据返回的句柄和事件类型,查找之前注册的事件处理器,并调用其对应的方法来处理事件。
工作流程
下面是 Reactor 模式的典型工作流程,以一个网络服务器为例:
初始化:服务器启动,创建一个 Reactor
对象 和一个用于监听新连接的句柄 (listen_fd)。
注册处理器:服务器创建一个
AcceptorEventHandler (一个具体的事件处理器),并将其与
listen_fd 上 READ
事件一起注册到 Reactor 中 。这是在告诉 Reactor,如果
listen_fd 上有新连接请求(表现为可读事件),就调用 AcceptorEventHandler
的处理方法。
启动事件循环:服务器调用 Reactor 的 event_loop()
方法,启动事件循环。
等待事件:Reactor 在循环中调用
epoll_wait() (同步事件多路分发器),阻塞程序,等待事件发生。
事件发生与分发:
一个客户端发起连接,listen_fd 变为可读。
epoll_wait() 从阻塞中返回,并告知 Reactor listen_fd 上有 READ
事件。
Reactor 查找注册表,发现这个事件应该由 AcceptorEventHandler
处理。
Reactor 调用 AcceptorEventHandler
的 handle_read() 方法。
处理连接事件:AcceptorEventHandler 的 handle_read() 方法内部调用
accept() 函数,接受新的客户端连接,并获得一个新的句柄
client_fd。然后,它会创建一个新的
ConnectionEventHandler (用于处理与该客户端的数据交互),并将其与
client_fd 上的 READ 或 WRITE 事件一起注册 到同一个
Reactor 中。
处理数据事件:
如果客户端发送数据,client_fd 变为可读。
epoll_wait() 再次返回,告知 Reactor client_fd 上有 READ 事件。
Reactor 分发事件给 ConnectionEventHandler,调用其 handle_read()
方法来读取和处理数据。
循环往复:Reactor 不断重复第 4 到第 7
步,持续地等待和分发事件。
Reactor 的几种演进模型
根据线程模型的不同,Reactor 模式可以分为以下几种:
alt text
单线程 Reactor 模型 :
所有操作(接受连接、事件分发、I/O
读写、业务逻辑处理)都在同一个线程中完成。
优点在于实现简单,没有多线程带来的锁竞争 和上下文切换开销 ;
逻辑清晰,所有状态都在一个线程内管理。
缺点是无法利用多核 CPU
的性能。如果某个业务逻辑处理非常耗时(例如数据库查询),会阻塞整个事件循环,导致其他所有客户端的请求都无法被及时响应。
适用于业务逻辑非常简单、快速的场景。例如,Redis
就是一个典型的单线程 Reactor
模型,因为它的绝大部分操作都是内存操作,速度极快。
alt text
多线程 Reactor 模型 :
一个主线程 (Reactor
线程)负责监听和分发事件 (accept、demultiplex、dispatch),
业务逻辑 的处理(非 I/O
操作)则交给一个工作线程池 (Worker Thread
Pool)来执行。
主线程在接收到 I/O
事件后,将数据读取出来,封装 成一个任务 ,然后提交给工作线程池。工作线程池中的线程执行完业务逻辑后,可能会将结果交还给主线程去发送。
优点是充分利用了多核 CPU 的计算能力, 将耗时的业务逻辑与 I/O
处理分离,避免了 I/O 线程的阻塞。
缺点是引入了多线程,需要处理线程安全 问题(如共享数据的同步)。而且主线程仍然是性能瓶颈 ,因为它需要处理所有连接的
I/O 事件 。
alt text
主从 Reactor 模型 (Main-Sub Reactor):
这是对多线程模型的一种优化,也是现代高性能网络框架(如
Netty)普遍采用的模型。
其 Reactor 部分由两部分组成:
主 Reactor (Main
Reactor):一个独立的线程,只负责一件事——监听并接受新的客户端连接
(accept)。
从 Reactor (Sub
Reactors):一个或多个独立的线程,每个从 Reactor
都有自己的事件多路分发器(epoll实例), 负责处理已连接客户端的 I/O
事件 (读写)。
也就是说, 主 Reactor
接受新连接后,不自己处理,而是通过某种负载均衡策略(如轮询)将这个新的连接 分配给一个从
Reactor 。被选中的从 Reactor
将这个连接的句柄加入到自己的事件监听集合中。之后,这个连接上所有的 I/O
事件(读、写)都由这个从 Reactor 来负责处理。
而业务逻辑可以由从 Reactor
线程自己执行,也可以再分发给另外的工作线程池 。
优点:职责单一, 主 Reactor 只管连接,从 Reactor 只管 I/O,职责清晰;
扩展性强, 可以通过增加从 Reactor
的数量 来线性地提升处理能力,充分利用多核 CPU; 性能优异:主
Reactor 不会因为处理 I/O 而成为瓶颈,从 Reactors
之间也基本没有数据竞争。
代表实现是Netty、Memcached 等。
单线程 Reactor 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 #include <iostream> #include <string> #include <vector> #include <unordered_map> #include <cstring> #include <cstdlib> #include <unistd.h> #include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <cerrno> #include <cctype> class Reactor ;class EventHandler {public : virtual ~EventHandler () {} virtual void handle_event (uint32_t events) = 0 ; virtual int get_fd () const = 0 ; }; class Reactor {private : int epoll_fd; std::unordered_map<int , EventHandler*> handlers; public : Reactor () { epoll_fd = epoll_create1 (0 ); if (epoll_fd < 0 ) { perror ("epoll_create1" ); exit (EXIT_FAILURE); } } ~Reactor () { close (epoll_fd); } void register_handler (EventHandler* handler, uint32_t events) { int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler; if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0 ) { perror ("epoll_ctl: add" ); return ; } handlers[fd] = handler; } void modify_handler (EventHandler* handler, uint32_t events) { int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler; if (epoll_ctl (epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0 ) { perror ("epoll_ctl: mod" ); } } void remove_handler (EventHandler* handler) { int fd = handler->get_fd (); if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL ) < 0 ) { perror ("epoll_ctl: del" ); } handlers.erase (fd); } void event_loop () { std::vector<struct epoll_event> ready_events (1024 ) ; while (true ) { int n = epoll_wait (epoll_fd, ready_events.data (), ready_events.size (), -1 ); if (n < 0 ) { if (errno == EINTR) continue ; perror ("epoll_wait" ); break ; } for (int i = 0 ; i < n; ++i) { EventHandler* handler = static_cast <EventHandler*>(ready_events[i].data.ptr); handler->handle_event (ready_events[i].events); } } } }; bool set_nonblocking (int fd) { int flags = fcntl (fd, F_GETFL, 0 ); if (flags == -1 ) { perror ("fcntl: F_GETFL" ); return false ; } if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1 ) { perror ("fcntl: F_SETFL" ); return false ; } return true ; } class ConnectionHandler : public EventHandler {private : int fd; Reactor* reactor; std::string read_buffer; std::string write_buffer; public : ConnectionHandler (int cfd, Reactor* r) : fd (cfd), reactor (r) {} ~ConnectionHandler () { close (fd); } int get_fd () const override { return fd; } void handle_event (uint32_t events) override { if (events & (EPOLLHUP | EPOLLERR)) { handle_close (); return ; } if (events & EPOLLIN) { handle_read (); } if (events & EPOLLOUT) { handle_write (); } } private : void handle_read () { char buf[1024 ]; ssize_t n; while ((n = read (fd, buf, sizeof (buf))) > 0 ) { read_buffer.append (buf, n); } if (n == 0 ) { handle_close (); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("read" ); handle_close (); } } if (!read_buffer.empty ()) { for (char &c : read_buffer) { c = toupper (c); } write_buffer += read_buffer; read_buffer.clear (); reactor->modify_handler (this , EPOLLIN | EPOLLOUT | EPOLLET); } } void handle_write () { ssize_t n; while (!write_buffer.empty ()) { n = write (fd, write_buffer.c_str (), write_buffer.length ()); if (n > 0 ) { write_buffer.erase (0 , n); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("write" ); handle_close (); } break ; } } if (write_buffer.empty ()) { reactor->modify_handler (this , EPOLLIN | EPOLLET); } } void handle_close () { std::cout << "Client " << fd << " disconnected." << std::endl; reactor->remove_handler (this ); delete this ; } }; class Acceptor : public EventHandler {private : int listen_fd; Reactor* reactor; public : Acceptor (int lfd, Reactor* r) : listen_fd (lfd), reactor (r) {} int get_fd () const override { return listen_fd; } void handle_event (uint32_t events) override { if (events & EPOLLIN) { struct sockaddr_in client_addr; socklen_t len = sizeof (client_addr); int client_fd; while ((client_fd = accept (listen_fd, (struct sockaddr*)&client_addr, &len)) > 0 ) { std::cout << "Accepted connection from " << inet_ntoa (client_addr.sin_addr) << ":" << ntohs (client_addr.sin_port) << std::endl; set_nonblocking (client_fd); ConnectionHandler* handler = new ConnectionHandler (client_fd, reactor); reactor->register_handler (handler, EPOLLIN | EPOLLET); } if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) { perror ("accept" ); } } } }; int main (int argc, char * argv[]) { if (argc != 2 ) { std::cerr << "Usage: " << argv[0 ] << " <port>\n" ; return 1 ; } int port = atoi (argv[1 ]); int listen_fd = socket (AF_INET, SOCK_STREAM, 0 ); if (listen_fd < 0 ) { perror ("socket" ); return 1 ; } int opt = 1 ; setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); struct sockaddr_in serv_addr; memset (&serv_addr, 0 , sizeof (serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); serv_addr.sin_port = htons (port); if (bind (listen_fd, (struct sockaddr*)&serv_addr, sizeof (serv_addr)) < 0 ) { perror ("bind" ); return 1 ; } if (listen (listen_fd, SOMAXCONN) < 0 ) { perror ("listen" ); return 1 ; } set_nonblocking (listen_fd); Reactor reactor; Acceptor acceptor (listen_fd, &reactor) ; reactor.register_handler (&acceptor, EPOLLIN | EPOLLET); std::cout << "Reactor Server is running on port " << port << "..." << std::endl; reactor.event_loop (); close (listen_fd); return 0 ; }
多线程 Reactor 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 #include <iostream> #include <string> #include <vector> #include <unordered_map> #include <cstring> #include <cstdlib> #include <unistd.h> #include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <cerrno> #include <cctype> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> class ThreadPool {private : std::vector<std::thread> workers; std::queue<std::function<void ()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; public : ThreadPool (size_t threads) : stop (false ) { for (size_t i = 0 ; i < threads; ++i) { workers.emplace_back ([this ] { while (true ) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex); this ->condition.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); }); if (this ->stop && this ->tasks.empty ()) return ; task = std::move (this ->tasks.front ()); this ->tasks.pop (); } task (); } }); } } template <class F> void submit_task (F&& f) { { std::unique_lock<std::mutex> lock (queue_mutex) ; if (stop) throw std::runtime_error ("submit on stopped ThreadPool" ); tasks.emplace (std::forward<F>(f)); } condition.notify_one (); } ~ThreadPool () { { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } condition.notify_all (); for (std::thread &worker : workers) { worker.join (); } } }; class Reactor ;class EventHandler {public : virtual ~EventHandler () {} virtual void handle_event (uint32_t events) = 0 ; virtual int get_fd () const = 0 ; }; class Reactor {private : int epoll_fd; std::unordered_map<int , std::shared_ptr<EventHandler>> handlers; std::mutex mtx; public : Reactor () { epoll_fd = epoll_create1 (0 ); if (epoll_fd < 0 ) { perror ("epoll_create1" ); exit (EXIT_FAILURE); } } ~Reactor () { close (epoll_fd); } void register_handler (std::shared_ptr<EventHandler> handler, uint32_t events) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler.get (); if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0 ) { perror ("epoll_ctl: add" ); return ; } handlers[fd] = handler; } void modify_handler (std::shared_ptr<EventHandler> handler, uint32_t events) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler.get (); if (epoll_ctl (epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0 ) { perror ("epoll_ctl: mod" ); } } void remove_handler (std::shared_ptr<EventHandler> handler) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL ) < 0 ) { perror ("epoll_ctl: del" ); } handlers.erase (fd); } void event_loop () { std::vector<struct epoll_event> ready_events (1024 ) ; while (true ) { int n = epoll_wait (epoll_fd, ready_events.data (), ready_events.size (), -1 ); if (n < 0 ) { if (errno == EINTR) continue ; perror ("epoll_wait" ); break ; } for (int i = 0 ; i < n; ++i) { EventHandler* handler_ptr = static_cast <EventHandler*>(ready_events[i].data.ptr); std::shared_ptr<EventHandler> handler; { std::lock_guard<std::mutex> lock (mtx) ; auto it = handlers.find (handler_ptr->get_fd ()); if (it != handlers.end ()) { handler = it->second; } } if (handler) { handler->handle_event (ready_events[i].events); } } } } }; bool set_nonblocking (int fd) { int flags = fcntl (fd, F_GETFL, 0 ); if (flags == -1 ) { perror ("fcntl: F_GETFL" ); return false ; } if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1 ) { perror ("fcntl: F_SETFL" ); return false ; } return true ; } class ConnectionHandler : public EventHandler, public std::enable_shared_from_this<ConnectionHandler> {private : int fd; Reactor* reactor; ThreadPool* thread_pool; std::string read_buffer; std::string write_buffer; std::mutex mtx; public : ConnectionHandler (int cfd, Reactor* r, ThreadPool* tp) : fd (cfd), reactor (r), thread_pool (tp) {} ~ConnectionHandler () { close (fd); } int get_fd () const override { return fd; } void handle_event (uint32_t events) override { if (events & (EPOLLHUP | EPOLLERR)) { handle_close (); return ; } if (events & EPOLLIN) { handle_read (); } if (events & EPOLLOUT) { handle_write (); } } private : void handle_read () { char buf[1024 ]; ssize_t n; while ((n = read (fd, buf, sizeof (buf))) > 0 ) { read_buffer.append (buf, n); } if (n == 0 ) { handle_close (); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("read" ); handle_close (); } } if (!read_buffer.empty ()) { std::string data = std::move (read_buffer); read_buffer.clear (); auto self = shared_from_this (); thread_pool->submit_task ([self, data] { self->process_and_send (data); }); } } void process_and_send (std::string data) { for (char &c : data) { c = toupper (c); } { std::lock_guard<std::mutex> lock (mtx) ; write_buffer += data; } reactor->modify_handler (shared_from_this (), EPOLLIN | EPOLLOUT | EPOLLET); } void handle_write () { std::lock_guard<std::mutex> lock (mtx) ; ssize_t n; while (!write_buffer.empty ()) { n = write (fd, write_buffer.c_str (), write_buffer.length ()); if (n > 0 ) { write_buffer.erase (0 , n); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("write" ); handle_close (); } break ; } } if (write_buffer.empty ()) { reactor->modify_handler (shared_from_this (), EPOLLIN | EPOLLET); } } void handle_close () { std::cout << "Client " << fd << " disconnected." << std::endl; reactor->remove_handler (shared_from_this ()); } }; class Acceptor : public EventHandler, public std::enable_shared_from_this<Acceptor> {private : int listen_fd; Reactor* reactor; ThreadPool* thread_pool; public : Acceptor (int lfd, Reactor* r, ThreadPool* tp) : listen_fd (lfd), reactor (r), thread_pool (tp) {} int get_fd () const override { return listen_fd; } void handle_event (uint32_t events) override { if (events & EPOLLIN) { struct sockaddr_in client_addr; socklen_t len = sizeof (client_addr); int client_fd; while ((client_fd = accept (listen_fd, (struct sockaddr*)&client_addr, &len)) > 0 ) { std::cout << "Accepted connection from " << inet_ntoa (client_addr.sin_addr) << ":" << ntohs (client_addr.sin_port) << std::endl; set_nonblocking (client_fd); auto handler = std::make_shared <ConnectionHandler>(client_fd, reactor, thread_pool); reactor->register_handler (handler, EPOLLIN | EPOLLET); } if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) { perror ("accept" ); } } } }; int main (int argc, char * argv[]) { if (argc != 2 ) { std::cerr << "Usage: " << argv[0 ] << " <port>\n" ; return 1 ; } int port = atoi (argv[1 ]); int listen_fd = socket (AF_INET, SOCK_STREAM, 0 ); if (listen_fd < 0 ) { perror ("socket" ); return 1 ; } int opt = 1 ; setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); struct sockaddr_in serv_addr; memset (&serv_addr, 0 , sizeof (serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); serv_addr.sin_port = htons (port); if (bind (listen_fd, (struct sockaddr*)&serv_addr, sizeof (serv_addr)) < 0 ) { perror ("bind" ); return 1 ; } if (listen (listen_fd, SOMAXCONN) < 0 ) { perror ("listen" ); return 1 ; } set_nonblocking (listen_fd); Reactor reactor; unsigned int num_threads = std::thread::hardware_concurrency (); ThreadPool pool (num_threads > 0 ? num_threads : 4 ) ; auto acceptor = std::make_shared <Acceptor>(listen_fd, &reactor, &pool); reactor.register_handler (acceptor, EPOLLIN | EPOLLET); std::cout << "Multi-threaded Reactor Server is running on port " << port << " with " << num_threads << " worker threads..." << std::endl; reactor.event_loop (); close (listen_fd); return 0 ; }
主从 Reactor 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 #include <iostream> #include <string> #include <vector> #include <unordered_map> #include <cstring> #include <cstdlib> #include <unistd.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <cerrno> #include <cctype> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> #include <atomic> class ThreadPool {private : std::vector<std::thread> workers; std::queue<std::function<void ()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; public : ThreadPool (size_t threads) : stop (false ) { for (size_t i = 0 ; i < threads; ++i) { workers.emplace_back ([this ] { while (true ) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex); this ->condition.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); }); if (this ->stop && this ->tasks.empty ()) return ; task = std::move (this ->tasks.front ()); this ->tasks.pop (); } task (); } }); } } template <class F> void submit_task (F&& f) { { std::unique_lock<std::mutex> lock (queue_mutex) ; if (stop) throw std::runtime_error ("submit on stopped ThreadPool" ); tasks.emplace (std::forward<F>(f)); } condition.notify_one (); } ~ThreadPool () { { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } condition.notify_all (); for (std::thread &worker : workers) { worker.join (); } } }; class SubReactor ;class EventHandler ;bool set_nonblocking (int fd) { int flags = fcntl (fd, F_GETFL, 0 ); if (flags == -1 ) { perror ("fcntl: F_GETFL" ); return false ; } if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1 ) { perror ("fcntl: F_SETFL" ); return false ; } return true ; } class EventHandler {public : virtual ~EventHandler () {} virtual void handle_event (uint32_t events) = 0 ; virtual int get_fd () const = 0 ; }; class ConnectionHandler : public EventHandler, public std::enable_shared_from_this<ConnectionHandler> {private : int fd; SubReactor* owner_reactor; ThreadPool* thread_pool; std::string read_buffer; std::string write_buffer; std::mutex mtx; public : ConnectionHandler (int cfd, SubReactor* reactor, ThreadPool* pool) : fd (cfd), owner_reactor (reactor), thread_pool (pool) {} ~ConnectionHandler () { close (fd); } int get_fd () const override { return fd; } void handle_event (uint32_t events) override ; private : void handle_read () ; void handle_write () ; void handle_close () ; void process_data (std::string data) ; }; class SubReactor {private : int epoll_fd; int wakeup_fd; ThreadPool* thread_pool; std::unordered_map<int , std::shared_ptr<EventHandler>> handlers; std::thread worker_thread; std::mutex mtx; std::vector<int > pending_fds; std::mutex pending_fds_mtx; public : SubReactor (ThreadPool* pool) : thread_pool (pool) { epoll_fd = epoll_create1 (0 ); if (epoll_fd < 0 ) { perror ("SubReactor epoll_create1" ); exit (EXIT_FAILURE); } wakeup_fd = eventfd (0 , EFD_NONBLOCK | EFD_CLOEXEC); if (wakeup_fd < 0 ) { perror ("SubReactor eventfd" ); exit (EXIT_FAILURE); } struct WakeupHandler : EventHandler { SubReactor* self; WakeupHandler (SubReactor* s) : self (s) {} int get_fd () const override { return self->wakeup_fd; } void handle_event (uint32_t events) override { uint64_t one; ssize_t n = read (self->wakeup_fd, &one, sizeof (one)); } }; register_handler (std::make_shared <WakeupHandler>(this ), EPOLLIN); } ~SubReactor () { close (epoll_fd); close (wakeup_fd); } void start () { worker_thread = std::thread (&SubReactor::event_loop, this ); } void join () { if (worker_thread.joinable ()) worker_thread.join (); } void add_new_connection (int client_fd) { set_nonblocking (client_fd); auto handler = std::make_shared <ConnectionHandler>(client_fd, this , thread_pool); register_handler (handler, EPOLLIN | EPOLLET); } void post_new_connection (int fd) { { std::lock_guard<std::mutex> lock (pending_fds_mtx) ; pending_fds.push_back (fd); } uint64_t one = 1 ; ssize_t n = write (wakeup_fd, &one, sizeof (one)); if (n != sizeof (one)) { perror ("SubReactor write to wakeup_fd" ); } } void register_handler (std::shared_ptr<EventHandler> handler, uint32_t events) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler.get (); if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0 ) { perror ("SubReactor epoll_ctl: add" ); return ; } handlers[fd] = handler; } void modify_handler (std::shared_ptr<EventHandler> handler, uint32_t events) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler.get (); if (epoll_ctl (epoll_fd, EPOLL_CTL_MOD, fd, &ev) < 0 ) { perror ("SubReactor epoll_ctl: mod" ); } } void remove_handler (std::shared_ptr<EventHandler> handler) { std::lock_guard<std::mutex> lock (mtx) ; int fd = handler->get_fd (); if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL ) < 0 ) { perror ("SubReactor epoll_ctl: del" ); } handlers.erase (fd); } void event_loop () { std::vector<struct epoll_event> ready_events (1024 ) ; while (true ) { int n = epoll_wait (epoll_fd, ready_events.data (), ready_events.size (), -1 ); if (n < 0 ) { if (errno == EINTR) continue ; perror ("SubReactor epoll_wait" ); break ; } { std::vector<int > new_fds; std::lock_guard<std::mutex> lock (pending_fds_mtx) ; new_fds.swap (pending_fds); for (int fd : new_fds) { add_new_connection (fd); } } for (int i = 0 ; i < n; ++i) { EventHandler* handler_ptr = static_cast <EventHandler*>(ready_events[i].data.ptr); if (handler_ptr->get_fd () == wakeup_fd) continue ; std::shared_ptr<EventHandler> handler; { std::lock_guard<std::mutex> lock (mtx) ; auto it = handlers.find (handler_ptr->get_fd ()); if (it != handlers.end ()) handler = it->second; } if (handler) handler->handle_event (ready_events[i].events); } } } }; void ConnectionHandler::handle_event (uint32_t events) { if (events & (EPOLLHUP | EPOLLERR)) { handle_close (); return ; } if (events & EPOLLIN) { handle_read (); } if (events & EPOLLOUT) { handle_write (); } } void ConnectionHandler::handle_read () { char buf[1024 ]; ssize_t n; while ((n = read (fd, buf, sizeof (buf))) > 0 ) { read_buffer.append (buf, n); } if (n == 0 ) { handle_close (); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("read" ); handle_close (); }} if (!read_buffer.empty ()) { std::string data = std::move (read_buffer); read_buffer.clear (); auto self = shared_from_this (); thread_pool->submit_task ([self, data] { self->process_data (data); }); } } void ConnectionHandler::process_data (std::string data) { for (char &c : data) { c = toupper (c); } { std::lock_guard<std::mutex> lock (mtx) ; write_buffer += data; } owner_reactor->modify_handler (shared_from_this (), EPOLLIN | EPOLLOUT | EPOLLET); } void ConnectionHandler::handle_write () { std::lock_guard<std::mutex> lock (mtx) ; ssize_t n; while (!write_buffer.empty ()) { n = write (fd, write_buffer.c_str (), write_buffer.length ()); if (n > 0 ) { write_buffer.erase (0 , n); } else if (n < 0 ) { if (errno != EAGAIN && errno != EWOULDBLOCK) { perror ("write" ); handle_close (); } break ; } } if (write_buffer.empty ()) { owner_reactor->modify_handler (shared_from_this (), EPOLLIN | EPOLLET); } } void ConnectionHandler::handle_close () { std::cout << "Client " << fd << " disconnected." << std::endl; owner_reactor->remove_handler (shared_from_this ()); } class Acceptor : public EventHandler {private : int listen_fd; std::vector<SubReactor*>& sub_reactors; std::atomic<size_t > next_sub_reactor_idx; public : Acceptor (int lfd, std::vector<SubReactor*>& subs) : listen_fd (lfd), sub_reactors (subs), next_sub_reactor_idx (0 ) {} int get_fd () const override { return listen_fd; } void handle_event (uint32_t events) override { if (events & EPOLLIN) { struct sockaddr_in client_addr; socklen_t len = sizeof (client_addr); int client_fd; while ((client_fd = accept (listen_fd, (struct sockaddr*)&client_addr, &len)) > 0 ) { std::cout << "Accepted connection from " << inet_ntoa (client_addr.sin_addr) << ":" << ntohs (client_addr.sin_port) << std::endl; size_t idx = next_sub_reactor_idx.fetch_add (1 ) % sub_reactors.size (); sub_reactors[idx]->post_new_connection (client_fd); } if (client_fd == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)) { perror ("accept" ); } } } }; class MainReactor {private : int epoll_fd; std::unordered_map<int , std::shared_ptr<EventHandler>> handlers; public : MainReactor () { epoll_fd = epoll_create1 (0 ); if (epoll_fd < 0 ) { perror ("MainReactor epoll_create1" ); exit (EXIT_FAILURE); } } ~MainReactor () { close (epoll_fd); } void register_handler (std::shared_ptr<EventHandler> handler, uint32_t events) { int fd = handler->get_fd (); struct epoll_event ev; ev.events = events; ev.data.ptr = handler.get (); if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0 ) { perror ("MainReactor epoll_ctl: add" ); } handlers[fd] = handler; } void event_loop () { std::vector<struct epoll_event> ready_events (16 ) ; while (true ) { int n = epoll_wait (epoll_fd, ready_events.data (), ready_events.size (), -1 ); for (int i = 0 ; i < n; ++i) { EventHandler* handler = static_cast <EventHandler*>(ready_events[i].data.ptr); handler->handle_event (ready_events[i].events); } } } }; int main (int argc, char * argv[]) { if (argc != 2 ) { std::cerr << "Usage: " << argv[0 ] << " <port>\n" ; return 1 ; } int port = atoi (argv[1 ]); int listen_fd = socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0 ); if (listen_fd < 0 ) { perror ("socket" ); return 1 ; } int opt = 1 ; setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); struct sockaddr_in serv_addr; memset (&serv_addr, 0 , sizeof (serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); serv_addr.sin_port = htons (port); if (bind (listen_fd, (struct sockaddr*)&serv_addr, sizeof (serv_addr)) < 0 ) { perror ("bind" ); return 1 ; } if (listen (listen_fd, SOMAXCONN) < 0 ) { perror ("listen" ); return 1 ; } unsigned int num_io_threads = std::thread::hardware_concurrency (); if (num_io_threads == 0 ) num_io_threads = 4 ; ThreadPool pool (num_io_threads * 2 ) ; std::vector<std::unique_ptr<SubReactor>> sub_reactors_pool; std::vector<SubReactor*> sub_reactors_ptr_pool; for (unsigned int i = 0 ; i < num_io_threads; ++i) { auto sub_reactor = std::make_unique <SubReactor>(&pool); sub_reactors_ptr_pool.push_back (sub_reactor.get ()); sub_reactor->start (); sub_reactors_pool.push_back (std::move (sub_reactor)); } MainReactor main_reactor; auto acceptor = std::make_shared <Acceptor>(listen_fd, sub_reactors_ptr_pool); main_reactor.register_handler (acceptor, EPOLLIN | EPOLLET); std::cout << "Full Reactor Server is running on port " << port << " with " << num_io_threads << " I/O threads and " << num_io_threads * 2 << " worker threads..." << std::endl; main_reactor.event_loop (); for (auto & sub : sub_reactors_pool) { sub->join (); } close (listen_fd); return 0 ; }
主 Reactor (Main Reactor):1 个线程。
它只负责一件事:bind、listen,并 epoll_wait 在 listen_fd 上。
它的 handle_event 只调用 accept()。
它不处理任何 read/write。
从 Reactors (Sub-Reactors):N 个线程(通常等于 CPU 核心数)。
每个“从 Reactor” 线程都有自己的 epoll 实例和自己的事件循环。
当主 Reactor accept 一个新连接 client_fd 后,它不会把这个 client_fd
注册到自己的 epoll 里。
相反,它会(例如通过轮询)选择一个“从 Reactor” 线程,并把这个
client_fd “移交” 过去。
“从 Reactor” 线程收到这个 client_fd 后,将其注册到自己的 epoll
实例中,开始监听 EPOLLIN 和 EPOLLOUT。
这样, accept 的负载被主 Reactor 承担。所有 read 和 write
的负载被平均分配到了 N 个“从 Reactor” 线程上。
这就把 I/O 操作本身的 CPU 开销分散到了多个 CPU 核心上,实现了真正的
I/O 并行处理。
proactor 模式
Proactor 模式,也称为“主动器模式”,是另一种处理并发 I/O
事件的设计模式。如果说 Reactor 是基于“就绪通知”(Readiness
Notification)的模式,那么 Proactor 则是基于“完成通知”(Completion
Notification)的模式。
它的核心思想是:应用程序发起一个异步 I/O
操作,然后立即继续执行其他任务;当这个 I/O
操作由操作系统(OS)在后台完成后,操作系统会通知应用程序,应用程序再对操作的结果进行处理。
Proactor 模式通常包含以下几个关键角色:
句柄 (Handle): 与 Reactor 中一样,代表一个 I/O
资源,如 socket 文件描述符。
异步操作处理器 (Asynchronous Operation
Processor): 这是 Proactor
模式的引擎,通常由操作系统内核 提供。它负责执行异步
I/O
操作 ,并在操作完成后,将结果放入一个完成事件队列 中。
异步操作 (Asynchronous Operation):
指应用程序发起的非阻塞 I/O 请求 ,例如 async_read 或
async_write。这个请求包含了执行 I/O
所需的所有信息,如句柄、数据缓冲区、操作类型等。
完成处理器 (Completion Handler):
这是一个由应用程序定义的回调函数或对象 ,用于处理已完成的异步
I/O 操作的结果 。它包含了 I/O
操作成功或失败后的业务逻辑。
主动器 (Proactor):
模式的中心组件,扮演“事件完成分发者 ”的角色。它在一个独立的线程中运行,主要负责:从“异步操作处理器”的完成事件队列中获取已完成的事件,
以及根据完成的事件,调用与之关联的“完成处理器”的相应方法。这个过程通常被称为“分发”(Dispatch)。
发起者 (Initiator):
应用程序的主体部分。它负责创建异步操作和完成处理器,并通过调用异步 I/O
接口来发起操作。发起后,它不会等待操作完成。
典型工作流程
Proactor 模式的典型工作流程如下:
发起操作:应用程序 (Initiator)调用一个异步
I/O 接口 (如
async_read)来发起一个读 操作。在调用时,它会提供:要操作的句柄 (socket_fd),
一个用于存放数据的缓冲区 (buffer)。一个完成处理器 (CompletionHandler),用于在操作完成后被回调。
操作托管给 OS:调用立即返回,应用程序线程不会被阻塞。这个 I/O
请求被交给操作系统内核(Asynchronous Operation Processor)。
OS 执行
I/O:操作系统内核在后台独立地执行这个读操作。它会等待数据到达,并将数据从内核空间直接拷贝到应用程序提供的
buffer 中。
等待完成通知:与此同时,Proactor
在自己的事件循环中阻塞等待,监听内核的完成事件队列。
I/O 完成:当数据被成功拷贝到 buffer
中后,操作系统内核会将一个“完成事件”放入完成队列。
Proactor 被唤醒:Proactor
检测到完成队列中有事件,从阻塞状态被唤醒。
分发事件:Proactor
从队列中取出完成事件,并根据事件信息(例如是哪个句柄上的什么操作完成了)找到对应的
CompletionHandler。
执行回调:Proactor 调用 CompletionHandler 的回调方法。
处理数据:CompletionHandler
在其回调方法中执行业务逻辑。此时,数据已经位于它之前提供的 buffer
中,可以直接使用,无需再进行 read 调用。
Reactor vs Proactor 对比
对比维度
Reactor 模式 (同步 I/O)
Proactor 模式 (异步 I/O)
核心思想
就绪通知 (Readiness Notification)
完成通知 (Completion Notification)
通知内容
“文件描述符 fd 已准备好进行读/写操作。”
“你发起的读/写操作已经完成。”
谁执行 I/O
应用程序线程。收到通知后,调用 read()/write()
操作系统内核。应用程序只需发起请求,内核负责所有 I/O
数据流
收到通知后,应用程序主动从内核缓冲区读取数据
内核直接将数据读到应用程序缓冲区,通知时数据已备好
编程模型
同步事件处理,应用程序主动执行 I/O 操作
异步事件处理,应用程序被动等待 I/O 完成通知
生活比喻
我告诉你菜备好了,你来炒
我把菜炒好了端给你,你直接吃
平台适用性
非常适合 Linux 的 epoll 机制
非常适合 Windows 的 IOCP 机制
Proactor 模式的优缺点
优点:
更高的并发性 :I/O
操作由内核完成,不会占用或阻塞应用线程,使得应用线程可以专注于业务逻辑,理论上能达到更高的性能和并发。
简化的应用逻辑 :应用程序的逻辑被清晰地分离到“发起操作”和“处理结果”两个部分,代码通常更简洁,避免了复杂的
I/O 就绪状态管理。
CPU
缓存友好 :在数据拷贝方面,异步模型可能提供更优化的路径,减少
CPU 缓存失效。
缺点与挑战:
平台依赖性强 :真正的 Proactor
模式严重依赖操作系统提供的底层异步 I/O(AIO)支持。Windows 通过 IOCP
提供了完美的 Proactor 模型支持。而Linux虽然有 AIO 接口,但长期以来对网络
socket 的支持不完善或效率不高。因此,在 Linux
上,高性能网络编程几乎都采用基于 epoll 的 Reactor 模式。
实现复杂 :底层的 Proactor
模式实现起来非常复杂,需要深入理解操作系统内核机制。
调试困难 :基于回调的异步编程模型可能导致“回调地狱”(Callback
Hell),内存管理和状态跟踪也比同步模型更复杂,调试难度较大。
在这里还需要指明的是, proactor 模式将IO操作交给操作系统内核,
引发了与Reactor相比执行上下文的不同:
Reactor:当 epoll
通知你的应用程序“可以读了”,你的应用程序线程需要发起一个 read()
系统调用。这个调用会导致一次上下文切换 (从用户态切换到内核态),由内核将数据从它的缓冲区拷贝到你的用户态缓冲区,然后再进行一次上下文切换 (从内核态返回用户态)。这个过程虽然很快,但上下文切换是有成本的。
Proactor:你发起异步 async_read
后,这个请求就交给了内核。内核会在后台处理这一切——等待数据、从网卡 DMA
到内核缓冲区、再拷贝到你指定的用户态缓冲区。整个过程都在内核空间完成,直到全部结束后,才通过一次完成通知回到用户态。它用一次更“重”的内核内部处理,换取了多次用户态/内核态之间切换的开销 。
因此, Proactor
模式消耗的资源总量可能并不少,但它通过减少上下文切换和更有效地利用硬件(如
DMA),使得资源消耗的方式更高效,从而将 CPU 从繁琐的 I/O
等待和数据拷贝中解放出来 。
而且, Proactor 模式在某些场景下还可以带来额外的性能提升:
更高的 CPU 缓存命中率:由于应用线程不必亲自处理
I/O,它可以持续地执行业务逻辑计算。这使得 CPU
的指令缓存和数据缓存更可能保持“热”状态,从而提高计算密集型业务 的性能。而在
Reactor 模式中,I/O 代码和业务逻辑代码交替执行,可能会污染 CPU
缓存。
更优的线程调度:在 Proactor 模式下,应用线程池里的线程只在 I/O
完成后才被唤醒去处理业务逻辑,线程的利用率非常高。而在 Reactor
模式中,I/O 线程既要处理 I/O
也要处理业务逻辑(或分发任务),职责相对不纯粹。
总结
Proactor 模式是一种理想的、性能极高的并发设计模式,它将 I/O
操作的负担完全交给了操作系统,从而解放了应用程序。它的设计哲学是“让专业的人做专业的事”,操作系统负责
I/O,应用程序负责业务。
然而,由于操作系统的支持度不同,Proactor 模式在实践中的应用远没有
Reactor 模式广泛。在 Windows 服务器开发中,基于 IOCP 的 Proactor
是主流选择。但在 Linux 世界,基于 epoll 的 Reactor 模式(特别是主从
Reactor 模型) 才是构建高性能网络服务的王者。
有趣的是,很多现代网络库(如 Boost.Asio)会在上层提供 Proactor
风格的异步 API,但在 Linux 底层,它们常常是通过 Reactor (epoll) +
线程池来模拟 Proactor 的行为。