IO多路复用-多线程并发通信
文档转自:IO多路复用之select/poll
IO多路转接也称为IO多路复用,它是一种网络通信的手段(机制),通过这种方式可以同时监测多个文件描述符并且这个过程是阻塞的,一旦检测到有文件描述符就绪( 可以读数据或者可以写数据)程序的阻塞就会被解除,之后就可以基于这些(一个或多个)就绪的文件描述符进行通信了。通过这种方式在单线程/进程的场景下也可以在服务器端实现并发。常见的IO多路转接方式有:select
、poll
、epoll
下面先对多线程/多进程并发和IO多路转接的并发处理流程进行对比(服务器端):
服务端用于监听的文件描述符只有一个,而用于通信的文件描述符有多个
多线程/多进程并发:
- 主线程/主进程:调用
accept()
检测客户端请求
accept
函数检测用于监听的文件描述符的读缓冲区,若没有数据表示 没有新的客户端的连接请求,当前线程/进程会阻塞
- 有数据表示有新的客户端的连接请求解除阻塞,建立连接
- 子线程/子进程:和建立连接的客户端通信
- 调用
read() / recv()
接收客户端发送的通信数据(读缓冲区),如果没有通信数据,当前线程/进程会阻塞,数据到达之后阻塞自动解除
- 调用
write() / send()
给客户端发送数据,如果写缓冲区已满,当前线程/进程会阻塞,否则将待发送数据写入写缓冲区中
IO多路转接并发:
使用IO多路转接函数委托内核检测服务器端所有的文件描述符(通信和监听两类),这个检测过程会导致进程/线程的阻塞,如果检测到已就绪的文件描述符阻塞解除,并将这些已就绪的文件描述符传出
根据类型对传出的所有已就绪文件描述符进行判断,并做出不同的处理
监听的文件描述符:和客户端建立连接
此时调用accept()
是不会导致程序阻塞的,因为监听的文件描述符是已就绪的(有新请求)
通信的文件描述符:调用通信函数和已建立连接的客户端通信
调用 read() / recv()
不会阻塞程序,因为通信的文件描述符是就绪的,读缓冲区内已有数据
调用 write() / send()
不会阻塞程序,因为通信的文件描述符是就绪的,写缓冲区不满,可以往里面写数据
对这些文件描述符继续进行下一轮的检测(循环往复。。。)
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销
一、select
使用select
这种IO多路转接方式需要调用一个同名函数select
,这个函数是跨平台的,Linux、Mac、Windows
都是支持的。通过调用这个函数可以委托内核帮助我们检测若干个文件描述符的状态,其实就是检测这些文件描述符对应的读写缓冲区的状态
:
- 读缓冲区:检测里边有没有数据,如果有数据该缓冲区对应的文件描述符就绪
- 写缓冲区:检测写缓冲区是否可以写(有没有容量),如果有容量可以写,缓冲区对应的文件描述符就绪
- 读写异常:检测读写缓冲区是否有异常,如果有该缓冲区对应的文件描述符就绪
委托检测的文件描述符被遍历检测完毕之后,已就绪的这些满足条件的文件描述符会通过select()
的参数分3个集合传出,设计人员可以得到这几个集合之后就可以分情况依次处理了
函数原型:
1 2 3 4 5 6 7 8
| #include <sys/select.h> struct timeval { time_t tv_sec; suseconds_t tv_usec; };
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval * timeout);
|
参数:
nfds
:委托内核检测的这三个集合中最大的文件描述符 + 1
- 内核需要线性遍历这些集合中的文件描述符,这个值是循环结束的条件
- 在Window中这个参数是无效的,指定为-1即可
readfds
:文件描述符的集合, 内核只检测这个集合中文件描述符对应的读缓冲区
- 传入传出参数,读集合一般情况下都是需要检测的,这样才知道通过哪个文件描述符接收数据
writefds
:文件描述符的集合,内核只检测这个集合中文件描述符对应的写缓冲区
- 传入传出参数,如果不需要使用这个参数可以指定为NULL
exceptfds
:文件描述符的集合, 内核检测集合中文件描述符是否有异常状态
- 传入传出参数,如果不需要使用这个参数可以指定为NULL
timeout
:超时时长,用来强制解除select()
函数的阻塞的
- NULL:函数检测不到就绪的文件描述符会一直阻塞
- 等待固定时长(秒):函数检测不到就绪的文件描述符,在指定时长之后强制解除阻塞,函数返回0
- 不等待:函数不会阻塞,直接将该参数对应的结构体初始化为0即可
返回值:
- 大于0:成功,返回集合中已就绪的文件描述符的总个数
- 等于-1:函数调用失败
- 等于0:超时,没有检测到就绪的文件描述符
另外初始化fd_set
类型的参数还需要使用相关的一些列操作函数,具体如下:
1 2 3 4 5 6 7 8
| void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
|
1.细节描述
在select()
函数中第2、3、4
个参数都是fd_set
类型,它表示一个文件描述符的集合,类似于信号集 sigset_t
,这个类型的数据有128
个字节(一个字节8位),也就是1024
个标志位,和内核中文件描述符表中的文件描述符个数(1024个)是一样的
1
| sizeof(fd_set) = 128 字节 * 8 = 1024 bit
|
这块内存中的每一个bit 和 文件描述符表中的每一个文件描述符是一一对应的关系,这样就可以使用最小的存储空间将要表达的意思描述出来了
下图中的fd_set
中存储了要委托内核检测读缓冲区的文件描述符集合
- 下图中的
fd_set
中存储了要委托内核检测读缓冲区的文件描述符集合
- 如果集合中的标志位为
1
代表检测这个文件描述符状态

内核在遍历这个读集合的过程中,如果被检测的文件描述符对应的读缓冲区中没有数据,内核将修改这个文件描述符在读集合fd_set
中对应的标志位,改为0,如果有数据那么这个标志位的值不变,还是1

当select()
函数解除阻塞之后,被内核修改过的读集合通过参数传出,此时集合中只要标志位的值为1,那么它对应的文件描述符肯定是就绪的,我们就可以基于这个文件描述符和客户端建立新连接或者通信了
2.select
并发处理
单线程/单进程,使用select
实现并发
(1)处理流程
如果在服务器基于select
实现并发,其处理流程如下:
创建监听的套接字 lfd = socket()
;
将监听的套接字和本地的IP
和端口绑定 bind()
给监听的套接字设置监听 listen()
创建一个文件描述符集合 fd_set
,用于存储需要检测读事件的所有的文件描述符
循环调用select()
,周期性的对所有的文件描述符进行检测
select()
解除阻塞返回,得到内核传出的满足条件的就绪的文件描述符集合
- 通过
FD_ISSET()
判断集合中的标志位是否为 1
- 如果这个文件描述符是监听的文件描述符,调用
accept()
和客户端建立连接
- 将得到的新的通信的文件描述符,通过
FD_SET()
放入到检测集合中
- 如果这个文件描述符是通信的文件描述符,调用通信函数和客户端通信
- 如果客户端和服务器断开了连接,使用
FD_CLR()
将这个文件描述符从检测集合中删除
- 如果没有断开连接,正常通信即可
重复第6步
流程图


(2)通信代码
服务端代码:
server_Iomultiplexing.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <ctype.h>
#define BUFSIZE 10
int main() { int lfd = socket(AF_INET, SOCK_STREAM, 0); int cfd;
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); addr.sin_addr.s_addr = INADDR_ANY; bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
int val = 1; if(setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val)) < 0) { perror("setsockopt()"); exit(1); }
listen(lfd, 128);
int maxfd = lfd; fd_set rdset; fd_set rdtemp; FD_ZERO(&rdset); FD_SET(lfd, &rdset); while(1) { rdtemp = rdset; int num = select(maxfd+1, &rdtemp, NULL, NULL, NULL);
if(FD_ISSET(lfd, &rdtemp)) { struct sockaddr_in cliaddr; int cliLen = sizeof(cliaddr); cfd = accept(lfd, (struct sockaddr*)&cliaddr, &cliLen);
FD_SET(cfd, &rdset); maxfd = cfd > maxfd ? cfd : maxfd; }
for(int i=0; i<maxfd+1; ++i) { if(i != lfd && FD_ISSET(i, &rdtemp)) { char buf[BUFSIZE] = {0}; int len = recv(i, buf, sizeof(buf),0); if(len == 0) { printf("客户端关闭了连接...\n"); FD_CLR(i, &rdset); close(i); } else if(len > 0) { printf("read buf = %s",buf); for(int i = 0;i<len;i++) { buf[i] = toupper(buf[i]); } printf("after buf = %s",buf);
int ret = send(i,buf, strlen(buf)+1,0); if(ret < 0) { perror("send()"); break; } printf("\n"); } else { perror("read()"); break; } } } }
close(lfd);
return 0; }
|
client_Iomultiplexing.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
while(1) { char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); send(fd, recvBuf, strlen(recvBuf)+1,0); recv(fd, recvBuf, sizeof(recvBuf),0); printf("recv buf: %s\n", recvBuf); }
close(fd);
return 0; }
|
编译
1 2
| $ make client_Iomultiplexing $ make server_Iomultiplexing
|
打开终端运行,服务端程序
1
| $ ./server_Iomultiplexing
|
打开多个终端运行,客户端程序,并且在客户端终端输入需要发送的信息
1
| $ ./client_Iomultiplexing
|
客户端1显示:

客户端2显示:

服务端显示:

(3)总结
这种方式,可以实现并发,但是效率比较低,如果存在多个客户端同时与服务端进行通信,服务端需要依次处理客户端请求,若客户端比较多,那么客户端等待响应的时间就会比较长
3.多线程select并发
为了单线程select并发
中低效的情况,可以在上面单线程select并发服务端的基础上,使用多线程,主线程,用于调用select
检测集合中是否有就绪的套接字文件描述符。创建子线程调用accept
接受客户端的连接,在创建其他子线程用于与已经接受连接的客户端进行通信
client.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
#define BUFSIZE 1024
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
int num = 0; while(1) { char recvBuf[BUFSIZE]; sprintf(recvBuf, "hello world, %d\n", num++); send(fd, recvBuf, strlen(recvBuf)+1,0); recv(fd, recvBuf, sizeof(recvBuf),0); printf("recv buf: %s\n", recvBuf); sleep(1); }
close(fd);
return 0; }
|
server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <ctype.h> #include <pthread.h> #include <sys/select.h>
#define BUFSIZE 1024
pthread_mutex_t mutex;
typedef struct fdinfo{ int fd; int *maxfd; fd_set * rdset; }FDInfo;
void *acceptConn(void *arg) { printf("子线程ID:%ld\n",pthread_self()); FDInfo *fdInfo = (FDInfo *)arg; struct sockaddr_in cliaddr; int cliLen = sizeof(cliaddr); int cfd = accept(fdInfo->fd, (struct sockaddr*)&cliaddr, &cliLen);
pthread_mutex_lock(&mutex); FD_SET(cfd, fdInfo->rdset); *(fdInfo->maxfd) = cfd > *(fdInfo->maxfd) ? cfd : *(fdInfo->maxfd); pthread_mutex_unlock(&mutex);
free(fdInfo); return NULL; }
void *Communication(void *arg) { FDInfo *info = (FDInfo *)arg; char buf[BUFSIZE] = {0};
int len = recv(info->fd, buf, sizeof(buf),0); if(len == 0) { printf("[%ld]:客户端关闭了连接...\n",pthread_self()); pthread_mutex_lock(&mutex); FD_CLR(info->fd, info->rdset); pthread_mutex_unlock(&mutex);
close(info->fd); free(info); pthread_exit(NULL); } else if(len > 0) { printf("[%ld]:read buf = %s\n",pthread_self(),buf); for(int i = 0;i<len;i++) { buf[i] = toupper(buf[i]); } printf("[%ld]:after buf = %s\n",pthread_self(),buf);
int ret = send(info->fd,buf, strlen(buf)+1,0); if(ret < 0) { perror("send()"); free(info); pthread_exit(NULL); } printf("\n"); } else { perror("recv()"); free(info); pthread_exit(NULL); }
return NULL; }
int main() { pthread_mutex_init(&mutex,NULL);
int lfd = socket(AF_INET, SOCK_STREAM, 0); int cfd;
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); addr.sin_addr.s_addr = INADDR_ANY; bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
int val = 1; if(setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val)) < 0) { perror("setsockopt()"); exit(1); }
listen(lfd, 128);
int maxfd = lfd; fd_set rdset; fd_set rdtemp; FD_ZERO(&rdset); FD_SET(lfd, &rdset);
while(1) { pthread_mutex_lock(&mutex); rdtemp = rdset; pthread_mutex_unlock(&mutex);
int num = select(maxfd+1, &rdtemp, NULL, NULL, NULL);
if(FD_ISSET(lfd, &rdtemp)) { pthread_t tid; FDInfo *info = (FDInfo *) malloc(sizeof(FDInfo)); info->fd = lfd; info->maxfd = &maxfd; info->rdset = &rdset; pthread_create(&tid,NULL,acceptConn,info); pthread_detach(tid); }
for(int i=0; i<maxfd+1; ++i) { if(i != lfd && FD_ISSET(i, &rdtemp)) { pthread_t tid; FDInfo *info = (FDInfo *) malloc(sizeof(FDInfo)); info->fd = i; info->rdset = &rdset; pthread_create(&tid,NULL,Communication,info); pthread_detach(tid); } } }
close(lfd); pthread_mutex_destroy(&mutex);
return 0; }
|
在使用多线程select的服务端中,每次只要检测到监听套接字文件描述符的读缓冲区有数据,就会创建一个子线程接受客户端的连接请求
二、poll
poll与select机制类似,但是poll无法跨平台,一般更加倾向于使用select
select的可以查询的文件描述符的数量上限为1024
而poll的可以查询的文件描述符的数量没有上限,但是poll只能在Linux系统上使用
poll博客链接,爱编程的大丙
三、epoll
1.概述
epoll
全称 eventpoll
,是linux
内核实现IO
多路转接/复用(IO multiplexing
)的一个实现。IO
多路转接的意思是在一个操作里同时监听多个输入输出源,在其中一个或多个输入输出源可用的时候返回,然后对其的进行读写操作。epoll
是select
和poll
的升级版,相较于select
与poll
,epoll
改进了工作方式,因此它更加高效,但是与poll
一样,epoll
不可以跨平台,只能在Linux
上面使用
- 对于待检测集合
select
和poll
是基于线性方式(时间复杂度为O(n)
)处理的,epoll
是基于红黑树来管理待检测集合的
select
和poll
每次都会线性扫描整个待检测集合,集合越大速度越慢,epoll
使用的是回调机制,效率高,处理效率也不会随着检测集合的变大而下降
select
和poll
工作过程中存在内核/用户空间数据的频繁拷贝问题(每次调用 select
/poll
时,都需要把整个文件描述符集从用户空间复制到内核空间(因为内核需要知道哪些文件描述符被监视以及对应的事件类型)。此外,select
/poll
每次返回时都会返回整个文件描述符集(完成扫描后,内核需要将更新后的文件描述符集,即状态改变的描述符集,整个文件描述符集,复制回用户空间,以便应用程序可以了解哪些描述符已经就绪),应用程序需要线性遍历整个集合来找出活动的文件描述符)
epoll
的机制涉及用户空间与内核空间的数据交换,但这种交换是通过系统调用进行的,而不是通过共享内存
epoll
在内核中维护一个事件表,这个事件表是一个数据结构,用于存储所有被监控的文件描述符及其对应的事件。当用户空间应用程序调用 epoll_ctl()
添加、修改或删除文件描述符时,内核会更新这个事件表
epoll
的设计减少了需要复制的数据量,因为只有发生事件的文件描述符信息会被复制回用户空间,与 select
或 poll
相比,这大大减少了不必要的数据传输和处理。此外,因为事件表是在内核中维护的,所以不需要每次调用时都传递大量数据结构,也就减少了系统调用的开销
- 当监控的文件描述符发生了注册的事件(如可读、可写等),内核将这些事件复制到一个用户空间的缓冲区中。这个过程是通过
epoll_wait()
调用完成的。在 epoll_wait()
调用中,内核检查事件表,查找发生了事件的文件描述符,然后将这些事件的信息复制到用户空间提供的内存中
- 设计人员需要对
select
和poll
返回的集合进行判断才能知道哪些文件描述符是就绪的,通过epoll
可以直接得到已就绪的文件描述符集合,无需再次检测
- 使用
epoll
没有最大文件描述符的限制,仅受系统中进程能打开的最大文件数目限制
当多路复用的文件数量庞大、IO流量频繁的时候,一般不太适合使用select()和poll(),这种情况下select()和poll()表现较差,**推荐使用epoll()
**。
epoll
维护树状结构,里面还维护了一系列事件处理机制,事件处理机制核心是回调
epoll
的核心组件:
- epoll 文件描述符:通过调用
epoll_create
创建,它本身是一个文件描述符,代表了一个 epoll
实例。这个描述符会用来管理和跟踪一组文件描述符(sockets、文件、pipes等)的事件
- 事件注册表:应用程序通过
epoll_ctl
调用将一个或多个文件描述符添加到 epoll
文件描述符指定的兴趣列表中,或从中移除文件描述符。你可以指定你感兴趣的事件类型,如读、写、关闭等
- 事件通知区:调用
epoll_wait
时,内核将所有已就绪的事件(即满足条件的文件描述符事件)复制到用户空间的事件数组中。这允许应用程序仅处理那些有事件发生的文件描述符
主要特点:
- 效率高:
epoll
使用一种基于事件的编程模式(事件通知的机制和内核中的事件表) ,并且只告诉用户哪些文件描述符是活跃的,这减少了应用程序检查文件描述符的数量,从而提高了效率。
- 可扩展性强:
epoll
的性能几乎不会随着文件描述符数量的增加而降低,这使得它在处理大量并发连接时表现出色
- 功能强大: 支持边缘触发和水平触发两种模式,让用户可以根据具体需求选择合适的工作模式
工作模式:
- 边缘触发 (ET): 只有当状态改变时,即从非活跃变为活跃时,才会通知用户。这种模式效率很高,但用户需要负责读取所有的数据直到没有更多数据可读。
- 水平触发 (LT): 只要有数据可读,
epoll
就会通知用户,不管之前是否已经被通知过。这种模式更容易使用,但可能效率稍低
2.操作函数
在epoll
中一共提供是三个API
函数,分别处理不同的操作,函数原型如下:
1 2 3 4 5 6 7
| #include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
|
select/poll
低效的原因之一是将“添加/维护待检测任务”和“阻塞进程/线程”两个步骤合二为一。每次调用select
都需要这两步操作,然而大多数应用场景中,需要监视的socket
个数相对固定,并不需要每次都修改。epoll
将这两个操作分开,先用epoll_ctl()
维护等待队列,再调用epoll_wait()
阻塞进程(解耦)。通过下图的对比显而易见,epoll的效率得到了提升

(1)epoll_create()
epoll_create()
函数的作用是创建一个红黑树模型的实例,用于管理待检测的文件描述符的集合
1
| int epoll_create(int size);
|
参数:
- size:在Linux内核2.6.8版本以后,这个参数是被忽略的,只需要指定一个大于0的数值就可以了
返回值:
- 失败:返回-1
- 成功:返回一个有效的文件描述符,通过这个文件描述符就可以访问创建的
epoll
实例了
(2)epoll_ctl()
函数的作用是管理红黑树实例上的节点,可以进行添加、删除、修改操作
1 2 3 4 5 6 7 8 9 10 11 12 13
| typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
struct epoll_event { uint32_t events; epoll_data_t data; }; int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
|
参数:
epfd
:epoll_create()
函数的返回值,通过这个参数找到epoll
实例
op
:这是一个枚举值,控制通过该函数执行什么操作
EPOLL_CTL_ADD
:往epoll
模型中添加新的节点
EPOLL_CTL_MOD
:修改epoll
模型中已经存在的节点
EPOLL_CTL_DEL
:删除epoll
模型中的指定的节点
fd
:文件描述符,即要添加/修改/删除的文件描述符
event
:epoll
事件,用来修饰第三个参数对应的文件描述符的,指定检测这个文件描述符的什么事件
events
:委托epoll
检测的事件, struct epoll_event
结构体中的字段 events
EPOLLIN
:读事件, 接收数据, 检测读缓冲区,如果有数据该文件描述符就绪
EPOLLOUT
:写事件, 发送数据, 检测写缓冲区,如果可写该文件描述符就绪
EPOLLERR
:异常事件
data
:用户数据变量,这是一个联合体类型,通常情况下使用里边的fd
成员,用于存储待检测的文件描述符的值,在调用epoll_wait()
函数的时候这个值会被传出
返回值:
(3)epoll_wait()
作用是检测创建的epoll
实例中有没有就绪的文件描述符
1
| int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
|
参数:
epfd
:epoll_create()
函数的返回值, 通过这个参数找到epoll
实例
events
:传出参数, 这是一个结构体数组的地址, 里边存储了已就绪的文件描述符的信息
maxevents
:修饰第二个参数, 结构体数组的容量(元素个数)
timeout
:如果检测的epoll
实例中没有已就绪的文件描述符,该函数阻塞的时长, 单位ms
毫秒
- 0:函数不阻塞,不管
epoll
实例中有没有就绪的文件描述符,函数被调用后都直接返回
- 大于0:如果
epoll
实例中没有已就绪的文件描述符,函数阻塞对应的毫秒数再返回
- -1:函数一直阻塞,直到
epoll
实例中有已就绪的文件描述符之后才解除阻塞
返回值:
- 成功,等于0:函数是阻塞被强制解除了, 没有检测到满足条件的文件描述符;大于0:检测到的已就绪的文件描述符的总个数
- 失败,返回-1
3.epoll
的使用
(1)操作步骤
在服务器端使用epoll
进行IO
多路转接的操作步骤如下:
1
| int lfd = socket(AF_INET, SOCK_STR EAM, 0);
|
1 2
| int opt = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
1
| int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
|
- 5.创建
epoll
实例对象(epoll
树),epoll_create
参数大于0即可
1
| int epfd = epoll_create(100);
|
1 2 3 4
| struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
|
(2)示例代码
server_epoll.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
| #include <stdio.h> #include <ctype.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h>
#define SERVERPORT "1989" #define BUFSIZE 1024
int main(int argc,char *argv[]) { int lfd = socket(AF_INET,SOCK_STREAM,0); if(lfd < 0) { perror("socket()"); exit(1); }
struct sockaddr_in server_addr; memset(&server_addr,0,sizeof server_addr); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,"0.0.0.0",&server_addr.sin_addr); int ret = bind(lfd,(void *)&server_addr,sizeof server_addr); if(ret < 0) { perror("bind()"); exit(1); }
int opt = 1; setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
ret = listen(lfd,128); if(ret < 0) { perror("bind()"); exit(1); }
int para = 100; int epfd = epoll_create(para); if(epfd < 0) { perror("epoll_create()"); exit(1); }
struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev); if(ret < 0) { perror("epoll_ctl()"); exit(1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(struct epoll_event); while(1) { int num = epoll_wait(epfd,evs,size,-1); for(int i = 0;i < num;i++) { int curfd = evs[i].data.fd;
if(curfd == lfd) { struct sockaddr_in raddr; socklen_t raddr_len = sizeof raddr; int cfd = accept(curfd,(void *)&raddr,&raddr_len); ev.events = EPOLLIN; ev.data.fd = cfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev); if(ret < 0) { perror("epoll_ctl():add"); exit(1); } }
else { char buf[BUFSIZE]; memset(buf,0,sizeof buf); int len = recv(curfd,buf,sizeof buf,0); if(len == 0) { printf("客户端断开连接...\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL); close(curfd); } else if(len > 0) { printf("接收成功,客户端数据: %s \n",buf); send(curfd,buf,len,0); } else { perror("recv()"); exit(1); } } } } return 0; }
|
client.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
#define BUFSIZE 1024
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "0.0.0.0", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
int num = 0; while(1) { char recvBuf[BUFSIZE]; sprintf(recvBuf, "hello world, %d\n", num++); send(fd, recvBuf, strlen(recvBuf)+1,0); recv(fd, recvBuf, sizeof(recvBuf),0); printf("recv buf: %s\n", recvBuf); sleep(1); }
close(fd);
return 0; }
|
编译运行,运行结果如下:
客户端1,显示:

客户端2,显示:

服务端,显示:

解析:
当在服务器端循环调用epoll_wait()
的时候,就会得到一个就绪列表,并通过该函数的第二个参数传出:
1 2
| struct epoll_event evs[1024]; int num = epoll_wait(epfd, evs, size, -1);
|
每当epoll_wait()
函数返回一次,在evs
中最多可以存储size
个已就绪的文件描述符信息,但是在这个数组中实际存储的有效元素个数为num
个,如果在这个epoll
实例的红黑树中已就绪的文件描述符很多,并且evs
数组无法将这些信息全部传出,那么这些信息会在下一次epoll_wait()
函数返回的时候被传出
通过evs
数组被传递出的每一个有效元素里边都包含了已就绪的文件描述符的相关信息,这些信息并不是凭空得来的,这取决于我们在往epoll
实例中添加节点的时候,往节点中初始化了哪些数据:
1 2 3 4 5 6
| struct epoll_event ev;
ev.events = EPOLLIN; ev.data.fd = lfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
|
在添加节点的时候,需要对这个struct epoll_event
类型的节点进行初始化,当这个节点对应的文件描述符变为已就绪状态,这些被传入的初始化信息就会被原样传出,这个对应关系必须要搞清楚
4.epoll
工作模式
(1)水平工作模式
水平模式可以简称为LT
模式,LT(level triggered)
是缺省的工作方式,并且同时支持block
和no-block socket
。在这种做法中,内核通知使用者哪些文件描述符已经就绪,之后就可以对这些已就绪的文件描述符进行IO
操作了。如果我们不作任何操作,内核还是会继续通知使用者
水平模式:
- 读事件:如果文件描述符对应的读缓冲区还有数据,读事件就会被触发,
epoll_wait()
解除阻塞
- 当读事件被触发,
epoll_wait()
解除阻塞,之后就可以接收数据了
- 如果接收数据的
buf
很小,不能全部将缓冲区数据读出,那么读事件会继续被触发,直到数据被全部读出,如果接收数据的内存相对较大,读数据的效率也会相对较高(减少了读数据的次数)
- 因为读数据是被动的,必须要通过读事件才能知道有数据到达了,因此对于读事件的检测是必须的
- 写事件:如果文件描述符对应的写缓冲区可写,写事件就会被触发,
epoll_wait()
解除阻塞
- 当写事件被触发,
epoll_wait()
解除阻塞,之后就可以将数据写入到写缓冲区了
- 写事件的触发发生在写数据之前而不是之后,被写入到写缓冲区中的数据是由内核自动发送出去的
- 如果写缓冲区没有被写满,写事件会一直被触发
- 因为写数据是主动的,并且写缓冲区一般情况下都是可写的(缓冲区不满),因此对于写事件的检测不是必须的
水平模式触发的特点:
- 重复通知:只要文件描述符仍然处于可读写状态,
epoll
会不断地通知应用程序,无论是否已经对该描述符执行过读写操作。这意味着,如果数据可读,应用程序如果没有读取,epoll_wait
将会再次返回该文件描述符
- 简化的事件处理:由于
epoll
在描述符准备好时会不断通知,应用程序的逻辑可以比较简单,无需担心错过事件。这也意味着应用程序在每次 epoll_wait
调用后都必须适当处理事件,否则会陷入重复通知的循环
- 兼容性:水平触发模式的行为类似于传统的
select
和 poll
系统调用,使得从这些系统调用迁移到 epoll
的过程中,逻辑转换较为直接
使用场景:
- 不确定是否能一次性处理完所有数据:例如,当你希望接收缓冲区非空时就读取一些数据,但不想(或不需要)一次性将其清空
- 多个线程处理同一文件描述符:在这种情况下,水平触发可以减少因竞态条件造成的复杂性,因为多个线程可能都会被告知文件描述符准备好了
实例程序:
server_epoll_LT.c
重点:在这个实例程序中,因为通信逻辑部分用于接收数据的字符数组非常小,若客户端一次发送的数据非常多,那么一次无法将缓冲区的数据全部读取出来,由于水平出发模式关于这个通信的套接字文件描述符的读事件会继续被触发,直到数据全部读取出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
| #include <stdio.h> #include <ctype.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h>
#define SERVERPORT "1989" #define BUFSIZE 5
int main(int argc,char *argv[]) { int lfd = socket(AF_INET,SOCK_STREAM,0); if(lfd < 0) { perror("socket()"); exit(1); }
struct sockaddr_in server_addr; memset(&server_addr,0,sizeof server_addr); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,"0.0.0.0",&server_addr.sin_addr); int ret = bind(lfd,(void *)&server_addr,sizeof server_addr); if(ret < 0) { perror("bind()"); exit(1); }
int opt = 1; setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
ret = listen(lfd,128); if(ret < 0) { perror("bind()"); exit(1); }
int para = 100; int epfd = epoll_create(para); if(epfd < 0) { perror("epoll_create()"); exit(1); }
struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev); if(ret < 0) { perror("epoll_ctl()"); exit(1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(struct epoll_event); while(1) { int num = epoll_wait(epfd,evs,size,-1); for(int i = 0;i < num;i++) { int curfd = evs[i].data.fd;
if(curfd == lfd) { struct sockaddr_in raddr; socklen_t raddr_len = sizeof raddr; int cfd = accept(curfd,(void *)&raddr,&raddr_len); ev.events = EPOLLIN; ev.data.fd = cfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev); if(ret < 0) { perror("epoll_ctl():add"); exit(1); } }
else { char buf[BUFSIZE]; memset(buf,0,sizeof buf); int len = recv(curfd,buf,sizeof buf,0); if(len == 0) { printf("客户端断开连接...\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL); close(curfd); } else if(len > 0) { printf("接收成功,客户端数据: %s \n",buf); send(curfd,buf,len,0); } else { perror("recv()"); exit(1); } } } }
return 0; }
|
client_epoll_LT.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
while(1) { int len; size_t send_len; char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); send_len = strlen(recvBuf)+1; send(fd, recvBuf, strlen(recvBuf)+1,0); do{ len = recv(fd, recvBuf, sizeof(recvBuf),0); send_len -= len; printf("recv buf: %s\n", recvBuf); }while(send_len > 0); }
close(fd);
return 0; }
|
编译运行如下:
客户端显示:

服务端显示:

(2)边沿工作模式
边沿模式可以简称为ET
模式,ET(edge-triggered)
是高速工作方式,只支持no-block socket
。在这种模式下,当文件描述符从未就绪变为就绪时,内核会通过epoll
通知使用者。然后它会假设使用者知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知(only once)
。如果我们对这个文件描述符做IO
操作,从而导致它再次变成未就绪,当这个未就绪的文件描述符再次变成就绪状态,内核会再次进行通知,并且还是只通知一次。ET
模式在很大程度上减少了epoll
事件被重复触发的次数,因此效率要比LT
模式高
边沿模式:
综上所述:epoll
的边沿模式下 epoll_wait()
检测到文件描述符有新事件才会通知,如果不是新的事件就不通知,通知的次数比水平模式少,效率比水平模式要高
边沿触发模式特点:
- 单次通知:当文件描述符从非就绪状态变为就绪状态时,
epoll
会通知应用程序一次。一旦通知发生,即使文件描述符仍然处于就绪状态,epoll
不会再次通知应用程序,除非状态再次发生变化
- 高效率:边沿触发模式可以减少系统调用的次数,因为它只在状态变化时发出通知。这减少了应用程序不必要的检查和轮询,特别是在高负载情况下
- 复杂的事件处理:使用边沿触发模式要求应用程序必须能够处理所有的数据,直到资源耗尽(例如读取直到遇到
EAGAIN
错误)。这需要应用程序具有更复杂的逻辑来确保数据的完全处理
使用场景:
- 高并发服务器:在高负载环境下,边沿触发可以显著减少不必要的事件处理,提高服务器效率。
- 应用程序能够一次处理所有数据:应用程序需要设计为能够处理尽可能多的数据,直到没有更多数据可读(或可写),以避免遗漏数据。
ET模式的设置
边沿模式不是默认的epoll
模式,需要额外进行设置。epoll
设置边沿模式是非常简单的,epoll
管理的红黑树示例中每个节点都是struct epoll_event
类型,只需要将EPOLLET
添加到结构体的events
成员中即可:
1 2
| struct epoll_event ev; ev.events = EPOLLIN | EPOLLET;
|
示例代码,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| int num = epoll_wait(epfd, evs, size, -1); for(int i=0; i<num; ++i) { int curfd = evs[i].data.fd; if(curfd == lfd) { int cfd = accept(curfd, NULL, NULL); struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; ev.data.fd = cfd; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); if(ret == -1) { perror("epoll_ctl-accept"); exit(0); } } }
|
设置非阻塞
对于写事件的触发一般情况下是不需要进行检测的,因为写缓冲区大部分情况下都是有足够的空间可以进行数据的写入。对于读事件的触发就必须要检测了,因为服务器也不知道客户端什么时候发送数据,如果使用epoll
的边沿模式进行读事件的检测,有新数据达到只会通知一次,那么必须要保证得到通知后将数据全部从读缓冲区中读出。那么,应该如何读这些数据呢?
方法1:准备一块特别大的内存,用于存储从读缓冲区中读出的数据,但是这种方式有很大的弊端:
- 内存的大小没有办法界定,太大浪费内存,太小又不够用
- 系统能够分配的最大堆内存也是有上限的,栈内存就更不必多言了
方法2:循环接收数据
1 2 3 4 5
| int len = 0; while((len = recv(curfd, buf, sizeof(buf), 0)) > 0) { }
|
这样做也是有弊端的,因为套接字操作默认是阻塞的,当读缓冲区数据被读完之后,读操作就阻塞了也就是调用的read()/recv()
函数被阻塞了,当前进程/线程被阻塞之后就无法处理其他操作了
要解决阻塞问题,就需要将套接字默认的阻塞行为修改为非阻塞,需要使用fcntl()
函数进行处理:
1 2 3 4
| int flag = fcntl(cfd, F_GETFL); flag |= O_NONBLOCK; fcntl(cfd, F_SETFL, flag);
|
通过上述分析就可以得出一个结论:**epoll
在边沿模式下,必须要将套接字设置为非阻塞模式**,但是,这样就会引发另外的一个bug
,在非阻塞模式下,循环地将读缓冲区数据读到本地内存中,当缓冲区数据被读完了,调用的read()/recv()
函数还会继续从缓冲区中读数据,此时函数调用就失败了,返回-1,对应的全局变量 errno
值为 EAGAIN
或者 EWOULDBLOCK
如果打印错误信息会得到如下的信息:Resource temporarily unavailable
因此可以通过非阻塞状态下的套接字errno
的判断,什么时候缓冲区的数据全部读取完毕
检查recv
的返回值:
recv
函数的返回值可以提供关于读取状态的直接信息:
- 如果返回正数,这代表读取到的字节数。如果这个数小于你请求的字节数(第三个参数),这通常意味着缓冲区中没有更多的数据可读(或者数据正好被读完)
- 如果返回0,表示对端已经关闭了连接。在TCP协议中,这意味着对端的套接字已经执行了正常的关闭过程
- 如果返回-1,并且
errno
设置为 EAGAIN
或 EWOULDBLOCK
,表示没有数据可读,且套接字被设置为非阻塞模式。此时,可以认为之前已经读取了所有可用数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| int len = recv(curfd, buf, sizeof(buf), 0); if(len == -1) { if(errno == EAGAIN) { printf("数据读完了...\n"); } else { perror("recv"); exit(0); } }
|
实例程序:
server_epoll_ET.c
将用于通信的套接字文件描述符的事件触发方式设置为边沿触发,并且将套接字的文件属性设置为非阻塞,这样在使用recv
函数读取,通信套接字文件描述符的读缓冲区,没有数据时,不会存在阻塞的情况;但是这时,需要通过errno
来判断,读缓冲区的数据是否全部读取完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
| #include <stdio.h> #include <ctype.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <fcntl.h>
#define SERVERPORT "1989" #define BUFSIZE 5
int main(int argc,char *argv[]) { int lfd = socket(AF_INET,SOCK_STREAM,0); if(lfd < 0) { perror("socket()"); exit(1); }
struct sockaddr_in server_addr; memset(&server_addr,0,sizeof server_addr); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,"0.0.0.0",&server_addr.sin_addr); int ret = bind(lfd,(void *)&server_addr,sizeof server_addr); if(ret < 0) { perror("bind()"); exit(1); }
int opt = 1; setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
ret = listen(lfd,128); if(ret < 0) { perror("bind()"); exit(1); }
int para = 100; int epfd = epoll_create(para); if(epfd < 0) { perror("epoll_create()"); exit(1); }
struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev); if(ret < 0) { perror("epoll_ctl()"); exit(1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(struct epoll_event); while(1) { int num = epoll_wait(epfd,evs,size,-1); for(int i = 0;i < num;i++) { int curfd = evs[i].data.fd;
if(curfd == lfd) { struct sockaddr_in raddr; socklen_t raddr_len = sizeof raddr; int cfd = accept(curfd,(void *)&raddr,&raddr_len); int flag = fcntl(cfd,F_GETFL); flag |= O_NONBLOCK; fcntl(cfd,F_SETFL,flag); ev.events = EPOLLIN|EPOLLET; ev.data.fd = cfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev); if(ret < 0) { perror("epoll_ctl():add"); exit(1); } }
else { char buf[BUFSIZE]; memset(buf,0,sizeof buf); while(1) { int len = recv(curfd,buf,sizeof buf,0); if(len == 0) { printf("客户端断开连接...\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL); close(curfd); break; } else if(len > 0) { printf("接收成功,客户端数据: %s \n",buf); send(curfd,buf,len,0); } else { if(errno == EAGAIN || errno == EWOULDBLOCK) { printf("数据读完了...\n"); break; } else { perror("recv()"); exit(0); } } } } } }
return 0; }
|
client_epoll_ET.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
while(1) { int len; size_t send_len; char recvBuf[1024]; fgets(recvBuf, sizeof(recvBuf), stdin); send_len = strlen(recvBuf)+1; send(fd, recvBuf, strlen(recvBuf)+1,0); do{ len = recv(fd, recvBuf, sizeof(recvBuf),0); send_len -= len; printf("recv buf: %s\n", recvBuf); }while(send_len > 0); }
close(fd);
return 0; }
|
编译运行:
服务端显示:

客户端1显示:

客户端2显示:

基于多线程的边沿非阻塞服务端
server_epoll_LT_Thread.c
当调用epoll_wait
得知此时有套接字文件描述符处于就绪状态,并且进一步判断发现就绪的套接字文件描述符为用于监听的,则将接受客户端连接,并且创建用于通信的套接字文件描述符,将之添加至epoll
实例中等操作,全部放到子线程中去操作;判断发现就绪的套接字文件描述符为用于通信的,则将与客户端进行通信的逻辑操作全部放到子线程中执行
重点:epoll
函数均是线程安全的,因此对使用epoll
相关函数不需要添加线程锁,但是若存在其他与epoll
无关的共享资源,仍然需要加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
| #include <stdio.h> #include <ctype.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <fcntl.h> #include <pthread.h>
#define SERVERPORT "1989" #define BUFSIZE 1024
typedef struct socketinfo{ int fd; int epfd; }SocketInfo;
void *acceptConn(void *arg) { printf("acceptConn id: %ld\n",pthread_self()); struct socketinfo *info = (struct socketinfo *)arg; struct sockaddr_in raddr; socklen_t raddr_len = sizeof raddr; int cfd = accept(info->fd,(void *)&raddr,&raddr_len); int flag = fcntl(cfd,F_GETFL); flag |= O_NONBLOCK; fcntl(cfd,F_SETFL,flag); struct epoll_event ev; ev.events = EPOLLIN|EPOLLET; ev.data.fd = cfd; int ret = epoll_ctl(info->epfd,EPOLL_CTL_ADD,cfd,&ev); if(ret < 0) { perror("epoll_ctl():add"); exit(1); } free(info); pthread_exit(NULL); }
void *Communication(void *arg) { struct socketinfo *info = (struct socketinfo *)arg; char buf[BUFSIZE]; memset(buf,0,sizeof buf); while(1) { int len = recv(info->fd,buf,sizeof buf,0); if(len == 0) { printf("[%ld]: 客户端断开连接...\n",pthread_self()); epoll_ctl(info->epfd, EPOLL_CTL_DEL, info->fd, NULL); close(info->fd); break; } else if(len > 0) { printf("[%ld]: 接收成功,客户端数据: %s \n",pthread_self(),buf); send(info->fd,buf,len,0); } else { if(errno == EAGAIN) { printf("[%ld]: 数据读完了...\n",pthread_self()); break; } else { perror("recv()"); break; } } } free(info); pthread_exit(NULL); }
int main(int argc,char *argv[]) { int lfd = socket(AF_INET,SOCK_STREAM,0); if(lfd < 0) { perror("socket()"); exit(1); }
struct sockaddr_in server_addr; memset(&server_addr,0,sizeof server_addr); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,"0.0.0.0",&server_addr.sin_addr); int ret = bind(lfd,(void *)&server_addr,sizeof server_addr); if(ret < 0) { perror("bind()"); exit(1); }
int opt = 1; setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
ret = listen(lfd,128); if(ret < 0) { perror("bind()"); exit(1); }
int para = 100; int epfd = epoll_create(para); if(epfd < 0) { perror("epoll_create()"); exit(1); }
struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = lfd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev); if(ret < 0) { perror("epoll_ctl()"); exit(1); }
struct epoll_event evs[1024]; int size = sizeof(evs) / sizeof(struct epoll_event); while(1) { int num = epoll_wait(epfd,evs,size,-1); for(int i = 0;i < num;i++) { int curfd = evs[i].data.fd; SocketInfo *info = (SocketInfo *) malloc(sizeof(SocketInfo)); info->fd = curfd; info->epfd = epfd;
if(curfd == lfd) { pthread_t acceptConn_tid; pthread_create(&acceptConn_tid,NULL,acceptConn,info); pthread_detach(acceptConn_tid); }
else { pthread_t Communicat_tid; pthread_create(&Communicat_tid,NULL,Communication,info); pthread_detach(Communicat_tid); } } }
return 0; }
|
client_epoll_LT_Thread.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>
#define BUFSIZE 1024
int main() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) { perror("socket"); exit(0); }
struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(1989); inet_pton(AF_INET, "0.0.0.0", &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("connect"); exit(0); }
int num = 0; while(1) { char recvBuf[BUFSIZE]; sprintf(recvBuf, "hello world, %d\n", num++); send(fd, recvBuf, strlen(recvBuf)+1,0); recv(fd, recvBuf, sizeof(recvBuf),0); printf("recv buf: %s\n", recvBuf); sleep(5); }
close(fd);
return 0; }
|
编译运行,服务端需要链接线程库,其编译指令如下:
1
| cc server_epoll_LT_Thread.c -o server_epoll_LT_Thread -pthread
|
运行服务端,以及客户端,如下:
服务端显示:

客户端1显示:

客户端2显示:

重点:
在上面的程序中,每次接受一条数据都需要创建一个子线程,子线程的频繁创建,使得服务端效率低,因此可以在上面的基础上,使用线程池,提升服务端效率**epoll
的IO
多路复用结合线程池的方式的服务端**