高并发服务器编程经历了从同步IO到异步IO,从多进程或多线程模型到事件驱动的演变,基于事件的并发编程依赖于操作系统提供的IO多路复用技术。这篇文章从什么是IO多路复用谈起,列举基于事件的高并发服务器,并且对比了select,poll和epoll三种事件通知机制,libevent,libev和libuv三个事件框架,最后给出了分别使用select和epoll实现的echo server示例。
什么是IO多路复用(I/O Multiplexing)
这个概念来自通信领域,指一个信道上传输多路信号的技术,在计算机里表示使用一个线程监视多个描述符的就绪状态。IO多路复用的技术是由操作系统提供的功能,比如POSIX标准下的select或linux特有的的epoll以及BSD特有的kqueue。首先向操作系统注册一个描述符集合的可读或可写事件,如果某个(或某些)描述符就绪时,操作系统会通知。这样,多个描述符就能在一个线程内并发通信。
这里的描述符通常是socket,I/O多路复用也就是很多网络连接(多路),共(复)用一个线程。
有哪些事件
以select和tcp socket为例,所谓的可读事件是指:
- socket内核接收缓冲区中的可用字节数大于或等于其低水位SO_RCVLOWAT;
- socket通信的对方关闭了连接,这个时候在缓冲区里有个文件结束符EOF,此时读操作将返回0
- 监听socket的backlog队列有已经完成三次握手的连接请求,可以调用accept
- socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。
所谓可写事件,则是指:
- socket的内核发送缓冲区的可用字节数大于或等于其低水位SO_SNDLOWAIT;
- socket的写端被关闭,继续写会收到SIGPIPE信号;
- 非阻塞模式下,connect返回之后,发起连接成功或失败;
- socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。
select vs poll vs epoll
select
select是一种古老稳定的IO多路复用技术,最大的优点是兼容性好,几乎所有的平台都支持,它的缺点是:
- 当可读或可写事件发生时,需要通过手动遍历所有的描述符,判断FD_ISSET位是否被设置的方式来找出是哪个描述符就绪
- 描述符集合数有上限,由FD_SETSIZE定义,linux上是1024
- select会修改fd_sets,导致它们不能被复用。每次调用select都需要重新创建描述符集合
- 不能在另一个线程修改描述符,比如close
poll
poll是为了解决select的缺陷而发明的,它有以下优点:
- 没有描述符数的限制
- 不会修改pollfd,多次poll可以直接复用描述符集合
它同样包括以下缺点:
- 需遍历找出触发事件的描述符
- 处于监听的描述符不能被close
epoll
epoll是linux上特有的IO多路复用技术,它的内部机制与select或poll很不同,相比之下,它具有很多性能和功能方面的优势:
- 返回触发事件的描述符集合,不需要遍历
- 可以在被监视的事件上绑定额外的数据
- 任何时候都可以移除或加入socket
- 支持edge triggering 模式
但它并不是poll的改进版,相比poll它也有缺点:
- 修改事件的flag需要调用epoll_ctl,会带来用户态和内核态切换的开销
尽管epoll很高效,但并不是任何场景都适合,在以下场景应该使用poll而不是epoll:
- 不只在linux上使用
- socket数不超过1000
- socket数超过1000,这些连接的生命都很短暂
那么问题来了,epoll是怎么实现的?这是一个很好的面试题。
事件驱动与多线程对比
服务端程序的特点是IO操作频繁,大部分时间都在等待IO上。 多线程不仅占用更多的内存,而且线程切换也会带来一定的开销。另一方面,多个线程修改共享的数据会产生竞争条件,需要加锁,这就容易导致另一个严重的问题:死锁。
使用事件驱动则只有一个线程,没有线程切换的开销,效率高,并且不用考虑竞争条件。redis就是使用单进程单线程模型,所有的命令自然就是原子的。
这里有一个有趣的问题,如果大部分进程阻塞在socket I/O上,操作系统会继续调度一直处于阻塞状态的线程吗?
基于事件的应用举例
最成功的例子莫过于nginx。nginx内部封装了poll,epoll等各种事件模型,它会自动选择当前平台支持的最高效的方式,在linux上是epoll,也可以通过use指令手动指定。nginx最擅长的是做反向代理服务器,即转发用户的请求到后端服务器。一个work进程在等待后端返回时并不会被阻塞,而是继续处理其他请求,当后端返回数据时,该事件自动触发回调函数进行处理。
同样的,redis也实现了自己的事件框架。除此之外,翻墙软件shadowsocks的C语言版就是基于libev,memcached使用了libevent,nodejs使用libuv。
以上都是服务器端程序,作为客户端的爬虫框架scrapy,底层使用了twisted,同样是由事件驱动的。
如今,事件驱动的网络应用已经取得了巨大的成功。事实上,单进程单线程的事件驱动并发编程并不能充分利用多核CPU,因此很多情况下都是混合使用,比如nginx使用多个worker进程,每个进程内部是基于事件的,而memcached使用多线程,每个线程内部又是基于事件的。
libev vs libevent vs libuv
这些库为不同平台的事件模型编程封装了统一的接口。
libevent
libevent封装了现有的polling方法,使你只需要写一遍代码就可以在很多系统上编译运行。
libev
libev最初是为了解决libevent中的设计问题而开发的,它的设计哲学是”do one thing only”。与libevent相比,libev不使用全局变量,可以安全的在多线程环境使用;不同的事件类型使用不同的数据结构,比如有I/O,时间和信号等类型;移除了额外的组件,比如http服务器和DNS客户端。因此,libev是一个轻量的库。
libuv
libuv则是专门为node.js开发,在libev的基础上加入了对windows的支持,具有很好的跨平台兼容性。
无论是什么库,底层都是用了由操作系统提供的系统调用,比如select或epoll。
echo server示例
前面讲的都是理论,是时候来点干货了。下面分别使用select和epoll实现echo server,展示IO多路复用的用法。echo server就是服务端发回客户端发送的任何文本。CMU的课程网站上有一个简单的示例:http://www.cs.cmu.edu/afs/cs/academic/class/15213-f00/www/class24code/echoserver.c,它一次只能连接一个客户端。
echo server using epoll
这个示例来自How to use epoll? A complete example in C,原始的程序将客户端发送的内容打印出来,我将它改成发回给客户端。
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <netdb.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <sys/epoll.h> | |
#include <errno.h> | |
#define MAXEVENTS 64 | |
static int | |
make_socket_non_blocking(int sfd) | |
{ | |
int flags, s; | |
flags = fcntl(sfd, F_GETFL, 0); | |
if (flags == -1) | |
{ | |
perror("fcntl"); | |
return -1; | |
} | |
flags |= O_NONBLOCK; | |
s = fcntl(sfd, F_SETFL, flags); | |
if (s == -1) | |
{ | |
perror("fcntl"); | |
return -1; | |
} | |
return 0; | |
} | |
static int | |
create_and_bind(char *port) | |
{ | |
struct addrinfo hints; | |
struct addrinfo *result, *rp; | |
int s, sfd; | |
memset(&hints, 0, sizeof(struct addrinfo)); | |
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ | |
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ | |
hints.ai_flags = AI_PASSIVE; /* All interfaces */ | |
s = getaddrinfo(NULL, port, &hints, &result); | |
if (s != 0) | |
{ | |
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); | |
return -1; | |
} | |
for (rp = result; rp != NULL; rp = rp->ai_next) | |
{ | |
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); | |
if (sfd == -1) | |
continue; | |
s = bind(sfd, rp->ai_addr, rp->ai_addrlen); | |
if (s == 0) | |
{ | |
/* We managed to bind successfully! */ | |
break; | |
} | |
close(sfd); | |
} | |
if (rp == NULL) | |
{ | |
fprintf(stderr, "Could not bind\n"); | |
return -1; | |
} | |
freeaddrinfo(result); | |
return sfd; | |
} | |
int | |
main(int argc, char *argv[]) | |
{ | |
int sfd, s; | |
int efd; | |
// typedef union epoll_data | |
// { | |
// void *ptr; | |
// int fd; | |
// __uint32_t u32; | |
// __uint64_t u64; | |
// } epoll_data_t; | |
// struct epoll_event | |
// { | |
// __uint32_t events; /* Epoll events */ | |
// epoll_data_t data; /* User data variable */ | |
// }; | |
struct epoll_event event; | |
struct epoll_event *events; | |
if (argc != 2) | |
{ | |
fprintf(stderr, "Usage: %s [port]\n", argv[0]); | |
exit(EXIT_FAILURE); | |
} | |
sfd = create_and_bind(argv[1]); | |
if (sfd == -1) | |
abort(); | |
s = make_socket_non_blocking(sfd); | |
if (s == -1) | |
abort(); | |
s = listen(sfd, SOMAXCONN); | |
if (s == -1) | |
{ | |
perror("listen"); | |
abort(); | |
} | |
efd = epoll_create1(0); | |
if (efd == -1) | |
{ | |
perror("epoll_create"); | |
abort(); | |
} | |
event.data.fd = sfd; | |
event.events = EPOLLIN | EPOLLET; | |
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event); | |
if (s == -1) | |
{ | |
perror("epoll_ctl"); | |
abort(); | |
} | |
/* Buffer where events are returned */ | |
events = calloc(MAXEVENTS, sizeof event); | |
/* The event loop */ | |
while (1) | |
{ | |
int n, i; | |
n = epoll_wait(efd, events, MAXEVENTS, -1); | |
for (i = 0; i < n; i++) | |
{ | |
if ((events[i].events & EPOLLERR) || | |
(events[i].events & EPOLLHUP) || | |
(!(events[i].events & EPOLLIN))) | |
{ | |
/* An error has occured on this fd, or the socket is not | |
ready for reading (why were we notified then?) */ | |
if (events[i].events & EPOLLERR) | |
fprintf(stderr, "epoll error: EPOLLERR\n"); | |
if (events[i].events & EPOLLHUP) | |
fprintf(stderr, "epoll error: EPOLLHUP\n"); | |
close(events[i].data.fd); | |
continue; | |
} | |
else if (sfd == events[i].data.fd) | |
{ | |
/* We have a notification on the listening socket, which | |
means one or more incoming connections. */ | |
while (1) | |
{ | |
struct sockaddr in_addr; | |
socklen_t in_len; | |
int infd; | |
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; | |
in_len = sizeof in_addr; | |
infd = accept(sfd, &in_addr, &in_len); | |
if (infd == -1) | |
{ | |
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) | |
{ | |
/* We have processed all incoming | |
connections. */ | |
break; | |
} | |
else | |
{ | |
perror("accept"); | |
break; | |
} | |
} | |
s = getnameinfo(&in_addr, in_len, | |
hbuf, sizeof hbuf, | |
sbuf, sizeof sbuf, | |
NI_NUMERICHOST | NI_NUMERICSERV); | |
if (s == 0) | |
{ | |
printf("Accepted connection on descriptor %d " | |
"(host=%s, port=%s)\n", infd, hbuf, sbuf); | |
} | |
/* Make the incoming socket non-blocking and add it to the | |
list of fds to monitor. */ | |
s = make_socket_non_blocking(infd); | |
if (s == -1) | |
abort(); | |
event.data.fd = infd; | |
event.events = EPOLLIN | EPOLLET; | |
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event); | |
if (s == -1) | |
{ | |
perror("epoll_ctl"); | |
abort(); | |
} | |
} | |
continue; | |
} | |
else | |
{ | |
/* We have data on the fd waiting to be read. Read and | |
display it. We must read whatever data is available | |
completely, as we are running in edge-triggered mode | |
and won't get a notification again for the same | |
data. */ | |
int done = 0; | |
while (1) | |
{ | |
ssize_t count; | |
char buf[512]; | |
count = read(events[i].data.fd, buf, sizeof buf); | |
if (count == -1) | |
{ | |
/* If errno == EAGAIN, that means we have read all | |
data. So go back to the main loop. */ | |
if (errno != EAGAIN) { | |
perror("read"); | |
done = 1; | |
} | |
break; | |
} | |
else if (count == 0) | |
{ | |
/* End of file. The remote has closed the | |
connection. */ | |
done = 1; | |
break; | |
} | |
/* Write the buffer to standard output */ | |
s = write(events[i].data.fd, buf, count); | |
if (s == -1) | |
{ | |
perror("write"); | |
abort(); | |
} | |
} | |
if (done) | |
{ | |
printf("Closed connection on descriptor %d\n", events[i].data.fd); | |
/* Closing the descriptor will make epoll remove it | |
from the set of descriptors which are monitored. */ | |
close(events[i].data.fd); | |
} | |
} | |
} | |
} | |
free(events); | |
close(sfd); | |
return EXIT_SUCCESS; | |
} |
echo server using select
这个示例来自Socket Programming in C/C++: Handling multiple clients on server without multi threading,我修复了一个bug,并且支持通过参数指定端口号。
//Example code: A simple server side code, which echos back the received message. | |
//Handle multiple socket connections with select and fd_set on Linux | |
#include <stdio.h> | |
#include <string.h> //strlen | |
#include <stdlib.h> | |
#include <errno.h> | |
#include <unistd.h> //close | |
#include <arpa/inet.h> //close | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <netinet/in.h> | |
#include <sys/time.h> //FD_SET, FD_ISSET, FD_ZERO macros | |
#define TRUE 1 | |
#define FALSE 0 | |
int main(int argc , char *argv[]) | |
{ | |
int opt = TRUE; | |
int master_socket , addrlen , new_socket , client_socket[30] , | |
max_clients = 30 , activity, i , valread , sd; | |
int max_sd; | |
struct sockaddr_in address; | |
char buffer[1025]; //data buffer of 1K | |
if (argc != 2) | |
{ | |
fprintf(stderr, "Usage: %s [port]\n", argv[0]); | |
exit(EXIT_FAILURE); | |
} | |
//set of socket descriptors | |
fd_set readfds; | |
//a message | |
char *message = "ECHO Daemon v1.0 \r\n"; | |
//initialise all client_socket[] to 0 so not checked | |
for (i = 0; i < max_clients; i++) | |
{ | |
client_socket[i] = 0; | |
} | |
//create a master socket | |
if( (master_socket = socket(AF_INET , SOCK_STREAM , 0)) == 0) | |
{ | |
perror("socket failed"); | |
exit(EXIT_FAILURE); | |
} | |
//set master socket to allow multiple connections , | |
//this is just a good habit, it will work without this | |
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, | |
sizeof(opt)) < 0 ) | |
{ | |
perror("setsockopt"); | |
exit(EXIT_FAILURE); | |
} | |
//type of socket created | |
address.sin_family = AF_INET; | |
address.sin_addr.s_addr = INADDR_ANY; | |
address.sin_port = htons(atoi(argv[1])); | |
//bind the socket to localhost port 8888 | |
if (bind(master_socket, (struct sockaddr *)&address, sizeof(address))<0) | |
{ | |
perror("bind failed"); | |
exit(EXIT_FAILURE); | |
} | |
//try to specify maximum of 3 pending connections for the master socket | |
if (listen(master_socket, 3) < 0) | |
{ | |
perror("listen"); | |
exit(EXIT_FAILURE); | |
} | |
//accept the incoming connection | |
addrlen = sizeof(address); | |
puts("Waiting for connections ..."); | |
while(TRUE) | |
{ | |
//clear the socket set | |
FD_ZERO(&readfds); | |
//add master socket to set | |
FD_SET(master_socket, &readfds); | |
max_sd = master_socket; | |
//add child sockets to set | |
for ( i = 0 ; i < max_clients ; i++) | |
{ | |
//socket descriptor | |
sd = client_socket[i]; | |
//if valid socket descriptor then add to read list | |
if(sd > 0) | |
FD_SET( sd , &readfds); | |
//highest file descriptor number, need it for the select function | |
if(sd > max_sd) | |
max_sd = sd; | |
} | |
//wait for an activity on one of the sockets , timeout is NULL , | |
//so wait indefinitely | |
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL); | |
if ((activity < 0) && (errno!=EINTR)) | |
{ | |
printf("select error"); | |
} | |
//If something happened on the master socket , | |
//then its an incoming connection | |
if (FD_ISSET(master_socket, &readfds)) | |
{ | |
if ((new_socket = accept(master_socket, | |
(struct sockaddr *)&address, (socklen_t*)&addrlen))<0) | |
{ | |
perror("accept"); | |
exit(EXIT_FAILURE); | |
} | |
//inform user of socket number - used in send and receive commands | |
printf("New connection , socket fd is %d , ip : %s , port: %d\n" , | |
new_socket , inet_ntoa(address.sin_addr) , ntohs(address.sin_port)); | |
//send new connection greeting message | |
if( send(new_socket, message, strlen(message), 0) != strlen(message) ) | |
{ | |
perror("send"); | |
} | |
puts("Welcome message sent successfully"); | |
//add new socket to array of sockets | |
for (i = 0; i < max_clients; i++) | |
{ | |
//if position is empty | |
if( client_socket[i] == 0 ) | |
{ | |
client_socket[i] = new_socket; | |
printf("Adding to list of sockets as %d\n" , i); | |
break; | |
} | |
} | |
} | |
//else its some IO operation on some other socket | |
for (i = 0; i < max_clients; i++) | |
{ | |
sd = client_socket[i]; | |
if (FD_ISSET( sd , &readfds)) | |
{ | |
//Somebody disconnected | |
if ((valread = read( sd , buffer, 1024)) == 0) | |
{ | |
printf("Socket %d closed\n", sd); | |
//Close the socket and mark as 0 in list for reuse | |
close(sd); | |
client_socket[i] = 0; | |
} | |
//Echo back the message that came in | |
else | |
{ | |
//set the string terminating NULL byte on the end | |
//of the data read | |
buffer[valread] = '\0'; | |
send(sd , buffer , strlen(buffer) , 0); | |
} | |
} | |
} | |
} | |
return 0; | |
} |