IO多路复用漫谈(附epoll示例)

Published On 六月 03, 2017

category linux | tags socket epoll libev echo server


高并发服务器编程经历了从同步IO到异步IO,从多进程或多线程模型到事件驱动的演变,基于事件的并发编程依赖于操作系统提供的IO多路复用技术。这篇文章从什么是IO多路复用谈起,列举基于事件的高并发服务器,并且对比了select,poll和epoll三种事件通知机制,libevent,libev和libuv三个事件框架,最后给出了分别使用select和epoll实现的echo server示例。

什么是IO多路复用(I/O Multiplexing)

这个概念来自通信领域,指一个信道上传输多路信号的技术,在计算机里表示使用一个线程监视多个描述符的就绪状态。IO多路复用的技术是由操作系统提供的功能,比如POSIX标准下的select或linux特有的的epoll以及BSD特有的kqueue。首先向操作系统注册一个描述符集合的可读或可写事件,如果某个(或某些)描述符就绪时,操作系统会通知。这样,多个描述符就能在一个线程内并发通信。

这里的描述符通常是socket,I/O多路复用也就是很多网络连接(多路),共(复)用一个线程。

有哪些事件

以select和tcp socket为例,所谓的可读事件是指:

  1. socket内核接收缓冲区中的可用字节数大于或等于其低水位SO_RCVLOWAT;
  2. socket通信的对方关闭了连接,这个时候在缓冲区里有个文件结束符EOF,此时读操作将返回0
  3. 监听socket的backlog队列有已经完成三次握手的连接请求,可以调用accept
  4. socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。

所谓可写事件,则是指:

  1. socket的内核发送缓冲区的可用字节数大于或等于其低水位SO_SNDLOWAIT;
  2. socket的写端被关闭,继续写会收到SIGPIPE信号;
  3. 非阻塞模式下,connect返回之后,发起连接成功或失败;
  4. socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。

select vs poll vs epoll

select

select是一种古老稳定的IO多路复用技术,最大的优点是兼容性好,几乎所有的平台都支持,它的缺点是:

  • 当可读或可写事件发生时,需要通过手动遍历所有的描述符,判断FD_ISSET位是否被设置的方式来找出是哪个描述符就绪
  • 描述符集合数有上限,由FD_SETSIZE定义,linux上是1024
  • select会修改fd_sets,导致它们不能被复用。每次调用select都需要重新创建描述符集合
  • 不能在另一个线程修改描述符,比如close

poll

poll是为了解决select的缺陷而发明的,它有以下优点:

  • 没有描述符数的限制
  • 不会修改pollfd,多次poll可以直接复用描述符集合

它同样包括以下缺点:

  • 需遍历找出触发事件的描述符
  • 处于监听的描述符不能被close

epoll

epoll是linux上特有的IO多路复用技术,它的内部机制与select或poll很不同,相比之下,它具有很多性能和功能方面的优势:

  • 返回触发事件的描述符集合,不需要遍历
  • 可以在被监视的事件上绑定额外的数据
  • 任何时候都可以移除或加入socket
  • 支持edge triggering 模式

但它并不是poll的改进版,相比poll它也有缺点:

  • 修改事件的flag需要调用epoll_ctl,会带来用户态和内核态切换的开销

尽管epoll很高效,但并不是任何场景都适合,在以下场景应该使用poll而不是epoll:

  • 不只在linux上使用
  • socket数不超过1000
  • socket数超过1000,这些连接的生命都很短暂

那么问题来了,epoll是怎么实现的?这是一个很好的面试题。

事件驱动与多线程对比

服务端程序的特点是IO操作频繁,大部分时间都在等待IO上。 多线程不仅占用更多的内存,而且线程切换也会带来一定的开销。另一方面,多个线程修改共享的数据会产生竞争条件,需要加锁,这就容易导致另一个严重的问题:死锁。

使用事件驱动则只有一个线程,没有线程切换的开销,效率高,并且不用考虑竞争条件。redis就是使用单进程单线程模型,所有的命令自然就是原子的。

这里有一个有趣的问题,如果大部分进程阻塞在socket I/O上,操作系统会继续调度一直处于阻塞状态的线程吗?

基于事件的应用举例

最成功的例子莫过于nginx。nginx内部封装了poll,epoll等各种事件模型,它会自动选择当前平台支持的最高效的方式,在linux上是epoll,也可以通过use指令手动指定。nginx最擅长的是做反向代理服务器,即转发用户的请求到后端服务器。一个work进程在等待后端返回时并不会被阻塞,而是继续处理其他请求,当后端返回数据时,该事件自动触发回调函数进行处理。

同样的,redis也实现了自己的事件框架。除此之外,翻墙软件shadowsocks的C语言版就是基于libev,memcached使用了libevent,nodejs使用libuv。

以上都是服务器端程序,作为客户端的爬虫框架scrapy,底层使用了twisted,同样是由事件驱动的。

如今,事件驱动的网络应用已经取得了巨大的成功。事实上,单进程单线程的事件驱动并发编程并不能充分利用多核CPU,因此很多情况下都是混合使用,比如nginx使用多个worker进程,每个进程内部是基于事件的,而memcached使用多线程,每个线程内部又是基于事件的。

libev vs libevent vs libuv

这些库为不同平台的事件模型编程封装了统一的接口。

libevent

libevent封装了现有的polling方法,使你只需要写一遍代码就可以在很多系统上编译运行。

libev

libev最初是为了解决libevent中的设计问题而开发的,它的设计哲学是”do one thing only”。与libevent相比,libev不使用全局变量,可以安全的在多线程环境使用;不同的事件类型使用不同的数据结构,比如有I/O,时间和信号等类型;移除了额外的组件,比如http服务器和DNS客户端。因此,libev是一个轻量的库。

libuv

libuv则是专门为node.js开发,在libev的基础上加入了对windows的支持,具有很好的跨平台兼容性。

无论是什么库,底层都是用了由操作系统提供的系统调用,比如select或epoll。

echo server示例

前面讲的都是理论,是时候来点干货了。下面分别使用select和epoll实现echo server,展示IO多路复用的用法。echo server就是服务端发回客户端发送的任何文本。CMU的课程网站上有一个简单的示例:http://www.cs.cmu.edu/afs/cs/academic/class/15213-f00/www/class24code/echoserver.c,它一次只能连接一个客户端。

echo server using epoll

这个示例来自How to use epoll? A complete example in C,原始的程序将客户端发送的内容打印出来,我将它改成发回给客户端。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAXEVENTS 64
static int
make_socket_non_blocking(int sfd)
{
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1)
{
perror("fcntl");
return -1;
}
return 0;
}
static int
create_and_bind(char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE; /* All interfaces */
s = getaddrinfo(NULL, port, &hints, &result);
if (s != 0)
{
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
return -1;
}
for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;
s = bind(sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
/* We managed to bind successfully! */
break;
}
close(sfd);
}
if (rp == NULL)
{
fprintf(stderr, "Could not bind\n");
return -1;
}
freeaddrinfo(result);
return sfd;
}
int
main(int argc, char *argv[])
{
int sfd, s;
int efd;
// typedef union epoll_data
// {
// void *ptr;
// int fd;
// __uint32_t u32;
// __uint64_t u64;
// } epoll_data_t;
// struct epoll_event
// {
// __uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// };
struct epoll_event event;
struct epoll_event *events;
if (argc != 2)
{
fprintf(stderr, "Usage: %s [port]\n", argv[0]);
exit(EXIT_FAILURE);
}
sfd = create_and_bind(argv[1]);
if (sfd == -1)
abort();
s = make_socket_non_blocking(sfd);
if (s == -1)
abort();
s = listen(sfd, SOMAXCONN);
if (s == -1)
{
perror("listen");
abort();
}
efd = epoll_create1(0);
if (efd == -1)
{
perror("epoll_create");
abort();
}
event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror("epoll_ctl");
abort();
}
/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof event);
/* The event loop */
while (1)
{
int n, i;
n = epoll_wait(efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
if (events[i].events & EPOLLERR)
fprintf(stderr, "epoll error: EPOLLERR\n");
if (events[i].events & EPOLLHUP)
fprintf(stderr, "epoll error: EPOLLHUP\n");
close(events[i].data.fd);
continue;
}
else if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
/* We have processed all incoming
connections. */
break;
}
else
{
perror("accept");
break;
}
}
s = getnameinfo(&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}
/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking(infd);
if (s == -1)
abort();
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror("epoll_ctl");
abort();
}
}
continue;
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;
while (1)
{
ssize_t count;
char buf[512];
count = read(events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN) {
perror("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
/* Write the buffer to standard output */
s = write(events[i].data.fd, buf, count);
if (s == -1)
{
perror("write");
abort();
}
}
if (done)
{
printf("Closed connection on descriptor %d\n", events[i].data.fd);
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close(events[i].data.fd);
}
}
}
}
free(events);
close(sfd);
return EXIT_SUCCESS;
}

echo server using select

这个示例来自Socket Programming in C/C++: Handling multiple clients on server without multi threading,我修复了一个bug,并且支持通过参数指定端口号。

//Example code: A simple server side code, which echos back the received message.
//Handle multiple socket connections with select and fd_set on Linux
#include <stdio.h>
#include <string.h> //strlen
#include <stdlib.h>
#include <errno.h>
#include <unistd.h> //close
#include <arpa/inet.h> //close
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/time.h> //FD_SET, FD_ISSET, FD_ZERO macros
#define TRUE 1
#define FALSE 0
int main(int argc , char *argv[])
{
int opt = TRUE;
int master_socket , addrlen , new_socket , client_socket[30] ,
max_clients = 30 , activity, i , valread , sd;
int max_sd;
struct sockaddr_in address;
char buffer[1025]; //data buffer of 1K
if (argc != 2)
{
fprintf(stderr, "Usage: %s [port]\n", argv[0]);
exit(EXIT_FAILURE);
}
//set of socket descriptors
fd_set readfds;
//a message
char *message = "ECHO Daemon v1.0 \r\n";
//initialise all client_socket[] to 0 so not checked
for (i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
//create a master socket
if( (master_socket = socket(AF_INET , SOCK_STREAM , 0)) == 0)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
//type of socket created
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(atoi(argv[1]));
//bind the socket to localhost port 8888
if (bind(master_socket, (struct sockaddr *)&address, sizeof(address))<0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
//try to specify maximum of 3 pending connections for the master socket
if (listen(master_socket, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
//accept the incoming connection
addrlen = sizeof(address);
puts("Waiting for connections ...");
while(TRUE)
{
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(master_socket, &readfds);
max_sd = master_socket;
//add child sockets to set
for ( i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(master_socket, &readfds))
{
if ((new_socket = accept(master_socket,
(struct sockaddr *)&address, (socklen_t*)&addrlen))<0)
{
perror("accept");
exit(EXIT_FAILURE);
}
//inform user of socket number - used in send and receive commands
printf("New connection , socket fd is %d , ip : %s , port: %d\n" ,
new_socket , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//send new connection greeting message
if( send(new_socket, message, strlen(message), 0) != strlen(message) )
{
perror("send");
}
puts("Welcome message sent successfully");
//add new socket to array of sockets
for (i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
//else its some IO operation on some other socket
for (i = 0; i < max_clients; i++)
{
sd = client_socket[i];
if (FD_ISSET( sd , &readfds))
{
//Somebody disconnected
if ((valread = read( sd , buffer, 1024)) == 0)
{
printf("Socket %d closed\n", sd);
//Close the socket and mark as 0 in list for reuse
close(sd);
client_socket[i] = 0;
}
//Echo back the message that came in
else
{
//set the string terminating NULL byte on the end
//of the data read
buffer[valread] = '\0';
send(sd , buffer , strlen(buffer) , 0);
}
}
}
}
return 0;
}

参考


qq email facebook github
© 2024 - Xurui Yan. All rights reserved
Built using pelican