多进程并发和多线程并发

ZaynPei Lv6

并发是指在单个CPU核心上,通过快速切换任务,使得多个任务看起来像同时进行。 并行是指在多个CPU核心上,多个任务真正在物理上同时进行

这两种都是经典的并发服务器设计模式,它们的核心目标相同:同时处理多个客户端请求, 实现并发处理, 提高服务器的吞吐量和响应速度。

特性维度 多进程模型 (Fork) 多线程模型 (Pthread)
基本单元 进程 (Process) 线程 (Thread)
资源开销 高。创建进程是重型操作,内存和CPU开销大,切换慢。 低。创建线程是轻型操作,资源占用少,切换快。
数据共享与通信 困难。进程地址空间独立,通信需借助IPC(管道、共享内存等)。 简单。所有线程共享同一地址空间(全局变量、堆、静态变量等)。
稳定性与隔离性 高。一个子进程崩溃不会影响父进程或其他子进程。 低。任何一个线程的非法操作都可能导致整个进程崩溃。
文件描述符 独立。fork后父子进程各有独立的文件描述符表。 共享。所有线程共享同一张文件描述符表,一个线程关闭会影响所有线程。
编程模型与挑战 编程相对简单,主要挑战在于进程间通信(IPC)的实现。 编程更复杂,主要挑战在于处理线程安全和数据同步(如互斥锁)。
并发能力 受限于系统进程数上限,通常只能支持几百个并发连接。 理论上可支持成千上万个并发连接,是高并发服务器的主流选择。

多进程并发

多进程并发服务器模型通过创建多个子进程来处理客户端请求。每当有新的客户端连接时,服务器会调用 fork() 系统调用创建一个新的子进程,这个子进程专门负责与该客户端进行通信和处理请求。父进程继续监听新的连接请求。

基本流程如下:

  1. 服务器启动,创建一个监听套接字 (listening socket),绑定到指定端口,并开始监听连接请求。
  2. 当有新的客户端连接时,服务器调用 fork() 创建一个新的子进程
  3. 子进程继承父进程的资源(如文件描述符),并专门处理该客户端的请求。
  4. 父进程继续监听新的连接请求,重复上述过程。
  5. 子进程处理完客户端请求后,关闭连接并退出。
  6. 父进程通过 wait() 或 waitpid() 回收子进程资源,防止僵尸进程(这里可以使用信号处理 SIGCHLD 来自动回收)。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <errno.h>

#define PORT 8888
#define BUFFER_SIZE 1024

/*
* 信号捕捉函数,用于回收子进程
* 这是处理 SIGCHLD 信号的回调函数
*/
void recycle_child(int signum) {
// 使用 while 循环和 waitpid 是为了处理多个子进程在短时间内同时结束的情况
// WNOHANG 选项表示非阻塞,如果没有已退出的子进程,则立即返回,不会卡住
while (waitpid(-1, NULL, WNOHANG) > 0) {
printf("A child process has been recycled.\n");
}
}

int main() {
int lfd, cfd; // lfd: 监听文件描述符; cfd: 连接文件描述符
struct sockaddr_in serv_addr, cli_addr;
socklen_t cli_addr_len;
pid_t pid;
char buf[BUFFER_SIZE];
int n;

// 1. 创建监听套接字
lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd == -1) {
perror("socket error");
exit(1);
}

// 设置端口复用,以便服务器快速重启
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 2. 绑定IP地址和端口
// bzero(&serv_addr, sizeof(serv_addr)); // bzero 已不推荐使用,改用 memset
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有IP地址

if (bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1) {
perror("bind error");
exit(1);
}

// 3. 设置监听上限
if (listen(lfd, 128) == -1) {
perror("listen error");
exit(1);
}

// 4. 注册 SIGCHLD 信号捕捉函数,用于回收子进程
struct sigaction sa;
sa.sa_handler = recycle_child;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART; // 自动重启被信号中断的系统调用
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
perror("sigaction error");
exit(1);
}

printf("Server is running on port %d, waiting for connections...\n", PORT);

// 5. 主循环,接收客户端连接
while (1) {
cli_addr_len = sizeof(cli_addr);
cfd = accept(lfd, (struct sockaddr *)&cli_addr, &cli_addr_len);
if (cfd == -1) {
// 如果是被信号中断,则继续 accept,否则报错退出
if (errno == EINTR) {
continue;
} else {
perror("accept error");
exit(1);
}
}

// 打印客户端连接信息
char client_ip[16];
inet_ntop(AF_INET, &cli_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
printf("Received connection from %s at port %d\n", client_ip, ntohs(cli_addr.sin_port));

// 6. 创建子进程
pid = fork();
if (pid < 0) {
perror("fork error");
exit(1);
}

// 7. 子进程的工作
else if (pid == 0) {
// 子进程不需要监听,关闭监听文件描述符
close(lfd);

while ((n = read(cfd, buf, sizeof(buf))) > 0) {
// 将接收到的数据转换为大写
for (int i = 0; i < n; i++) {
buf[i] = toupper(buf[i]);
}
// 将处理后的数据写回客户端
write(cfd, buf, n);
}
if (n == 0) {
printf("Client %s closed the connection.\n", client_ip);
} else if (n < 0) {
perror("read error");
}

// 关闭连接描述符,并退出子进程
close(cfd);
exit(0);
}

// 8. 父进程的工作
else {
// 父进程不需要与客户端通信,关闭连接文件描述符
close(cfd);
// 继续循环,等待下一个客户端连接
// 子进程的回收由信号处理函数完成, 无需在这里调用 wait()
}
}

// 关闭监听描述符(实际上主循环是死循环,代码不会执行到这里)
close(lfd);
return 0;
}

多线程并发

多线程并发服务器模型通过创建多个线程来处理客户端请求。每当有新的客户端连接时,服务器会创建一个新的线程,这个线程专门负责与该客户端进行通信和处理请求。主线程继续监听新的连接请求。

基本流程如下: 1. 服务器启动,创建一个监听套接字 (listening socket),绑定到指定端口,并开始监听连接请求。 2. 当有新的客户端连接时,服务器创建一个新的线程,这个线程专门处理该客户端的请求 3. 主线程继续监听新的连接请求,重复上述过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <stdio.h>  
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#define PORT 8888
#define BUFFER_SIZE 1024

// 线程处理函数
void* handle_client(void* arg) {
int cfd = *(int*)arg; // 获取连接文件描述符
free(arg); // 释放动态分配的内存
char buf[BUFFER_SIZE];
int n;

while ((n = read(cfd, buf, sizeof(buf))) > 0) {
// 将接收到的数据转换为大写
for (int i = 0; i < n; i++) {
buf[i] = toupper(buf[i]);
}
// 将处理后的数据写回客户端
write(cfd, buf, n);
}
if (n == 0) {
printf("Client closed the connection.\n");
} else if (n < 0) {
perror("read error");
}

close(cfd); // 关闭连接描述符
return NULL; // 线程退出
}

int main() {
int lfd, cfd; // lfd: 监听文件描述符; cfd: 连接文件描述符
struct sockaddr_in serv_addr, cli_addr;
socklen_t cli_addr_len;
pthread_t tid;

// 1. 创建监听套接字
lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd == -1) {
perror("socket error");
exit(1);
}

// 设置端口复用,以便服务器快速重启
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 2. 绑定IP地址和端口
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有IP地址

if (bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1) {
perror("bind error");
exit(1);
}

// 3. 设置监听上限
if (listen(lfd, 128) == -1) {
perror("listen error");
exit(1);
}

printf("Server is running on port %d, waiting for connections...\n", PORT);

// 4. 主循环,接收客户端连接
while (1) {
cli_addr_len = sizeof(cli_addr);
cfd = accept(lfd, (struct sockaddr *)&cli_addr, &cli_addr_len);
if (cfd == -1) {
// 如果是被信号中断,则继续 accept,否则报错退出
if (errno == EINTR) {
continue;
} else {
perror("accept error");
exit(1);
}
}

// 打印客户端连接信息
char client_ip[16];
inet_ntop(AF_INET, &cli_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
printf("Received connection from %s at port %d\n", client_ip, ntohs(cli_addr.sin_port));

// 动态分配内存保存连接文件描述符,传递给线程函数
int* p_cfd = malloc(sizeof(int));
*p_cfd = cfd;
// 创建线程处理客户端请求
if (pthread_create(&tid, NULL, handle_client, p_cfd) !=
0) {
perror("pthread_create error");
close(cfd);
free(p_cfd);
continue;
}
// 分离线程,避免僵尸线程
pthread_detach(tid);
// 主线程继续监听新的连接请求
}
// 关闭监听描述符(实际上主循环是死循环,代码不会执行到这里)
close(lfd);
return 0;
}

线程池和任务队列

线程池是一种预先创建一定数量线程的技术,这些线程在需要时可以被重复使用来处理多个任务,从而避免了频繁创建和销毁线程的开销。任务队列则是一个线程安全的数据结构,用于存储待处理的任务,工作线程从中取出任务并执行。

On this page
多进程并发和多线程并发