惊群和OP_REUSEPORT

Published On September 07, 2017

category linux | tags socket epoll


惊群(thundering herd)问题通俗的说就是多个进程等待同一个事件(比如同一个socket的可读事件),当事件发生时,内核唤醒所有的进程,但该事件只需要被一个进程处理。这显然不是我们希望的,因为白白浪费了CPU资源。本文试图复现这个问题,然后结合nginx的代码给出它的解决办法。最后给出一个使用epoll和reuseport的多进程echo server。

惊群现象的由来

nginx的master-worker模式

nginx采用master-worker进程的模式,master负责解析配置,启动worker进程和处理信号,比如restart重启worker进程,worker负责真正处理请求。当有多个worker进程时,一个请求将被哪个worker进程处理呢?更具体一点,发送请求的客户端会与哪个worker进程建立TCP连接呢?

我们知道服务端socket的典型流程如下:

int listen_socket = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(8000);
int ret = bind(listen_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
listen(listen_socket, 5);
while (1)
{
    int cli_sock = accept(listen_socket, (struct sockaddr *) &cli_addr, (socklen_t *) &addr_length);
    // todo
}
并且,同一ip端口对只能被一个程序(进程)监听,否则会冲突。 nginx的做法是master进程创建、绑定、监听服务端socket,worker进程从master进程“继承”该socket,然后循环调用accept接收并处理请求。因此,通过netstat命令可以看到只有master进程监听80端口:
$ ps -ef|grep nginx
root      48664      1  0 19:57 ?        00:00:00 nginx: master process /opt/nginx/sbin/nginx
nobody    48665  48664  0 19:57 ?        00:00:00 nginx: worker process
nobody    48666  48664  0 19:57 ?        00:00:00 nginx: worker process

$ netstat -tnlp|grep 80
tcp        0      0 0.0.0.0:80              0.0.0.0:*               LISTEN      48664/nginx: master

简单示例

使用这种方式的示例如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>

#include <netinet/in.h>
#include <arpa/inet.h>

#include <sys/wait.h>

void error(char *msg)
{
  perror(msg);
  exit(1);
}

int main( int argc, char *argv[] )
{
    int sockfd; /* server socket */
    int newsockfd; /* client socket */
    int portno; /* port to listen on */
    int clilen; /* byte size of client's address */
    int n; /* message byte size */
    struct sockaddr_in serv_addr; /* server's addr */
    struct sockaddr_in cli_addr; /* client addr */
    char buffer[256]; /* message buffer */

    if (argc < 2) {
        fprintf(stderr,"usage %s port\n", argv[0]);
        exit(0);
    }

    portno = atoi(argv[1]);

    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    if (sockfd < 0)
        error("ERROR opening socket");

    bzero((char *) &serv_addr, sizeof(serv_addr));    
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(portno);

    if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
        error("ERROR on binding");

    if (listen(sockfd,5) < 0)
        error("ERROR on listening");

    clilen = sizeof(cli_addr);

    int pid = getpid();
    printf("parent process %d\n", pid);

    for (int i = 0; i < 2; i++)
    {
        pid = fork();
        if (pid < 0)
        {
            error("failed to fork");
        }
        else if (pid==0) // child
        {
            break;
        }
        else // parent
        {
            printf("start child process %d\n", pid);
        }
    }

    if (pid > 0 ) // parent
    {
        int wstatus;
        // just suspend until one child process exits
        if ((pid = wait(&wstatus)) == -1)
            error("Error on wait");
        printf("Process %d terminated\n", pid);
        return 0;
    }

    while(1)
    {
        newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
        if (newsockfd < 0)
            error("ERROR on accept");

        printf("process %d get connection %d\n", getpid(), newsockfd);

        bzero(buffer, 256);
        n = recv(newsockfd, buffer, 255, 0);
        if (n < 0)
            error("ERROR reading from socket");
        if (n >0 ) // n==0 means client close connection
        {
            n = write(newsockfd, buffer, strlen(buffer));
            if (n < 0)
                error("ERROR writing to socket");
        }   
        close(newsockfd);
    }

    return 0;
}
上面的代码创建了两个子进程,它们从父进程获得了listening socket的副本,然后调用accept等待新的连接。因为没有使用异步IO,同时只能处理2个连接。那么问题来了,当有新的请求时,两个进程的accept是否都会返回?如果多个accept都返回,但只有一个返回成功,这种现象就是惊群。好在linux上似乎已经不存在这个问题了,当有多个子进程对共享的listening socket调用accept处于阻塞状态时,一个连接请求只会唤醒一个进程的accept调用

测试一下:

在一个shell窗口里编译并运行(如果linux版本比较低,可能需要使用-std=gnu99才能编译通过,下面的例子也一样):

gcc example1.c
./a.out 8000

在另一个shell窗口里使用nc连续发送请求:

for i in {1..5}; do echo "hello" | nc localhost 8000; done
可以看到下面的输出,说明accept能自动在不同进程中进行负载均衡。
$ ./a.out 8000
parent process 122761
parent process 3327
start child process 3328
start child process 3329
process 3328 get connection 4
process 3329 get connection 4
process 3328 get connection 4
process 3329 get connection 4
process 3328 get connection 4
执行pkill a.out结束程序。

上面的测试在linux内核2.6.32和4.11.7上的结果一致。查看linux内核版本的命令:cat /proc/version

使用epoll的示例

nginx在linux上是使用epoll进行异步IO的,将上面的例子使用epoll重写成异步的echo server:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>

#define MAXEVENTS 64

void
error(char *msg)
{
    perror(msg);
    exit(1);
}

static int
create_and_bind(int port)
{
    struct sockaddr_in serv_addr;
    int s, sfd;

    sfd = socket(AF_INET, SOCK_STREAM, 0);

    if (sfd < 0)
    {
        perror("socket failed");
        return -1;
    }

    bzero((char *) &serv_addr, sizeof(serv_addr));    
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(port);

    if (bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
    {
        perror("bind failed");
        return -1;
    }
    return sfd;
}

static int
make_socket_non_blocking(int sfd)
{
    int flags, s;

    flags = fcntl(sfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl get");
        return -1;
    }

    flags |= O_NONBLOCK;
    s = fcntl(sfd, F_SETFL, flags);
    if (s == -1)
    {
        perror("fcntl set");
        return -1;
    }

    return 0;
}

int
main(int argc, char *argv[])
{
    int sfd, s;
    int efd;
    struct epoll_event ep;
    struct epoll_event *eps;

    if (argc != 2)
    {
        fprintf(stderr, "Usage: %s port\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    sfd = create_and_bind(atoi(argv[1]));
    if (sfd == -1)
        exit(1);

    s = make_socket_non_blocking(sfd);
    if (s == -1)
        exit(1);

    s = listen(sfd, SOMAXCONN);
    if (s == -1)
    {
        error("listen");
    }


    int pid = getpid();
    printf("parent process %d\n", pid);

    for(int i=0; i < 2; i++)
    {
        pid = fork();
        if (pid<0)
        {
            error("fork");
        }
        else if (pid == 0) // child
        {
            break;
        }
        else // parent
        {
            printf("start child process %d\n", pid);
        }
    }

    if (pid > 0 ) // parent
    {
        int wstatus;
        // just suspend until one child process exits
        if ((pid = wait(&wstatus)) == -1)
            error("Error on wait");
        printf("Process %d terminated\n", pid);
        return 0;
    }

    efd = epoll_create1(0);
    if (efd == -1)
    {
        error("epoll_create");
    }

    ep.data.fd = sfd;
    ep.events = EPOLLIN | EPOLLET;
    s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ep);
    if (s == -1)
    {
        error("epoll_ctl");
    }

    eps = calloc(MAXEVENTS, sizeof ep);

    /* The event loop */
    while (1)
    {
        int n, i;

        n = epoll_wait(efd, eps, MAXEVENTS, -1);
        for (i = 0; i < n; i++)
        {
            if ((eps[i].events & EPOLLERR) ||
                (eps[i].events & EPOLLHUP) ||
                (!(eps[i].events & EPOLLIN)))
            {
                if (eps[i].events & EPOLLERR)
                    fprintf(stderr, "epoll error: EPOLLERR\n");
                if (eps[i].events & EPOLLHUP)
                    fprintf(stderr, "epoll error: EPOLLHUP\n");
                close(eps[i].data.fd);
                continue;
            }
            else if (sfd == eps[i].data.fd)
            {
                struct sockaddr in_addr;
                socklen_t in_len;
                int infd;

                in_len = sizeof in_addr;
                infd = accept(sfd, &in_addr, &in_len);
                printf("Process %d accept return %d\n", getpid(), infd);
                if (infd == -1)
                {
                    if (errno != EAGAIN || errno != EWOULDBLOCK)
                    {
                        error("accept");
                    }
                    continue;
                }

                s = make_socket_non_blocking(infd);
                if (s == -1)
                    exit(1);

                ep.data.fd = infd;
                ep.events = EPOLLIN | EPOLLET;
                s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &ep);
                if (s == -1)
                {
                    error("epoll_ctl");
                }
            }
            else
            {
                int done = 0;

                while (1)
                {
                    ssize_t count;
                    char buf[512];

                    count = read(eps[i].data.fd, buf, sizeof buf);
                    if (count == -1)
                    {
                        if (errno != EAGAIN) {
                            perror("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        /* End of file. The remote has closed the
                           connection. */
                        done = 1;
                        break;
                    }

                    if (!strncmp(buf, "quit\n", 5))
                    {
                        done = 1;
                        break;
                    }

                    s = write(eps[i].data.fd, buf, count);
                    if (s == -1)
                    {
                        error("write");
                    }
                }

                if (done)
                {
                    close(eps[i].data.fd);
                }
            }
        }
    }

    free(eps);

    close(sfd);

    return EXIT_SUCCESS;
}
使用上述同样的方法测试,在linux4.11.7上仍未发现惊群现象,一个连接请求只会唤醒一个进程的epoll_wait。但是在linux2.6.32上,偶尔会出现两个进程同时被唤醒,其中一个进程accept返回成功,另一个返回-1,errnno被置为EAGAIN,表示 there is no data available right now, try again later

$ ./a.out 8000
parent process 11648
start child process 11649
start child process 11650
Process 11649 accept return 5
Process 11650 accept return -1
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return 5
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return 5
Process 11649 accept return 5
Process 11649 accept return 5
说明惊群确实存在于epoll中,而且只在老的linux内核中才会出现。

惊群的解决办法

nginx的accept mutex

以下是nginx启动流程中的关键步骤,使用缩进表示函数的调用关系。

main
  ngx_init_cycle
    ngx_open_listening_sockets
      socket
      bind
      listen
  ngx_master_process_cycle
    ngx_start_worker_processes
      for (i = 0; i < worker_processes; i++) {
        ngx_spawn_process
          pid = fork();
          switch (pid) {
          // ...
          case 0:
            ngx_worker_process_cycle
              ngx_worker_process_init
                ngx_event_process_init(=module.init_process)
                  if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
                    ngx_use_accept_mutex = 1;
                    // ...
                  } else {
                    ngx_use_accept_mutex = 0;
                  }
                  ngx_epoll_init (=event_module.actions.init)
                    epoll_create
              for ( ;; ) {
                ngx_process_events_and_timers
                  if (ngx_use_accept_mutex) {
                    ngx_trylock_accept_mutex
                      if (ngx_shmtx_trylock)
                        ngx_enable_accept_events
                          epoll_ctl
                  ngx_epoll_process_events(=ngx_process_events)
                    epoll_wait
                    handler(ev)
多个worker进程抢占一个互斥的accept锁,获得该锁的进程将listening socket加入到自己的epfd中监听可读事件,新的连接将由这个worker进程处理。

将nginx的配置指令accept_mutex设置为on才会启用这种锁机制,它的默认值是off,nginx的文档是这样解释的:

There is no need to enable accept_mutex on systems that support the EPOLLEXCLUSIVE flag (1.11.3) or when using reuseport.

EPOLLEXCLUSIVE是linux 4.5引入的epoll参数:

When a wakeup event occurs and multiple epoll file descriptors are attached to the same target file using EPOLLEXCLUSIVE, one or more of the epoll file descriptors will receive an event with epoll_wait(2). The default in this scenario (when EPOLLEXCLUSIVE is not set) is for all epoll file descriptors to receive an event. EPOLLEXCLUSIVE is thus useful for avoiding thundering herd problems in certain scenarios.

然而在linux 4.11上的实验似乎与文档的解释相悖——并非所有的进程都会被唤醒。

SO_REUSEPORT

SO_REUSEPORT是linux 3.9引入的socket参数,该功能的引入是为了解决两个问题:

  1. allow multiple servers (processes or threads) to bind to the same port
  2. when multiple threads are waiting in the accept() call on a single listening socket, traditional wake-ups are not fair, so that, under high load, incoming connections may be distributed across threads in a very unbalanced fashion.

简单的说就是它允许不同进程或线程的listening socket绑定到同一个ip端口对上,内核会将请求在所有进程(线程)中均衡,每次唤醒一个进程(线程)。

nginx启用reuseport

将nginx的listen指令改成

listen 80 reuseport;
使用reuseport前后的对比图如下: listen_accept.png 当SO_REUSEPORT选项没有开启时,master进程创建listening socket,worker进程对这个共享的socket调用accept接收请求(可能会有惊群问题,需要使用accept mutex)。

listen_accept_reuseport.png 当SO_REUSEPORT选项启用时,每个worker进程都创建独立的listening socket,监听相同的ip端口,accept的时候只有一个进程会获得连接。这样就可以避免加锁的开销,提高CPU利用率。

使用epoll和reuseport的示例

将上面的echo server稍作调整,通过使用SO_REUSEPORT参数,每个worker进程创建自己的listening socket,并监听同一个ip端口对

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>

#define MAXEVENTS 64

void
error(char *msg)
{
    perror(msg);
    exit(1);
}

static int
create_and_bind(int port)
{
    struct sockaddr_in serv_addr;
    int s, sfd;

    sfd = socket(AF_INET, SOCK_STREAM, 0);

    if (sfd < 0)
    {
        perror("socket failed");
        return -1;
    }

    int optval = 1;
    setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));

    bzero((char *) &serv_addr, sizeof(serv_addr));    
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(port);

    if (bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
    {
        perror("bind failed");
        return -1;
    }
    return sfd;
}

static int
make_socket_non_blocking(int sfd)
{
    int flags, s;

    flags = fcntl(sfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl get");
        return -1;
    }

    flags |= O_NONBLOCK;
    s = fcntl(sfd, F_SETFL, flags);
    if (s == -1)
    {
        perror("fcntl set");
        return -1;
    }

    return 0;
}

int
main(int argc, char *argv[])
{
    int sfd, s;
    int efd;

   //  typedef union epoll_data {
   //     void        *ptr;
   //     int          fd;
   //     uint32_t     u32;
   //     uint64_t     u64;
   // } epoll_data_t;

   // struct epoll_event {
   //     uint32_t     events;      /* Epoll events */
   //     epoll_data_t data;        /* User data variable */
   // };
    struct epoll_event ep;
    struct epoll_event *eps;

    if (argc != 2)
    {
        fprintf(stderr, "Usage: %s port\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    int pid = getpid();
    printf("parent process %d\n", pid);

    for(int i=0; i < 2; i++)
    {
        pid = fork();
        if (pid<0)
        {
            error("fork");
        }
        else if (pid == 0) // child
        {
            break;
        }
        else // parent
        {
            printf("start child process %d\n", pid);
        }
    }

    if (pid > 0 ) // parent
    {
        int wstatus;
        // just suspend until one child process exits
        if ((pid = wait(&wstatus)) == -1)
            error("Error on wait");
        printf("Process %d terminated\n", pid);
        return 0;
    }

    sfd = create_and_bind(atoi(argv[1]));
    if (sfd == -1)
        exit(1);

    s = make_socket_non_blocking(sfd);
    if (s == -1)
        exit(1);

    s = listen(sfd, SOMAXCONN);
    if (s == -1)
    {
        error("listen");
    }

    efd = epoll_create1(0);
    if (efd == -1)
    {
        error("epoll_create");
    }

    ep.data.fd = sfd;
    ep.events = EPOLLIN | EPOLLET;
    s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ep);
    if (s == -1)
    {
        error("epoll_ctl");
    }

    /* Buffer where events are returned */
    eps = calloc(MAXEVENTS, sizeof ep);

    /* The event loop */
    while (1)
    {
        int n, i;

        n = epoll_wait(efd, eps, MAXEVENTS, -1);
        for (i = 0; i < n; i++)
        {
            if ((eps[i].events & EPOLLERR) ||
                (eps[i].events & EPOLLHUP) ||
                (!(eps[i].events & EPOLLIN)))
            {
                /* An error has occured on this fd, or the socket is not
                   ready for reading (why were we notified then?) */
                if (eps[i].events & EPOLLERR)
                    fprintf(stderr, "epoll error: EPOLLERR\n");
                if (eps[i].events & EPOLLHUP)
                    fprintf(stderr, "epoll error: EPOLLHUP\n");
                close(eps[i].data.fd);
                continue;
            }
            else if (sfd == eps[i].data.fd)
            {
                /* Is is possible that more than one connections only wake up one time? 
                   https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering */
                struct sockaddr in_addr;
                socklen_t in_len;
                int infd;

                in_len = sizeof in_addr;
                infd = accept(sfd, &in_addr, &in_len);
                printf("Process %d accept return %d\n", getpid(), infd);
                if (infd == -1)
                {
                    if (errno != EAGAIN || errno != EWOULDBLOCK)
                    {
                        error("accept");
                    }
                    continue;
                }

                /* Make the incoming socket non-blocking and add it to the
                   list of fds to monitor. */
                s = make_socket_non_blocking(infd);
                if (s == -1)
                    exit(1);

                ep.data.fd = infd;
                ep.events = EPOLLIN | EPOLLET;
                s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &ep);
                if (s == -1)
                {
                    error("epoll_ctl");
                }
            }
            else
            {
                /* We have data on the fd waiting to be read. Read and
                   display it. We must read whatever data is available
                   completely, as we are running in edge-triggered mode
                   and won't get a notification again for the same
                   data. */
                int done = 0;

                while (1)
                {
                    ssize_t count;
                    char buf[512];

                    count = read(eps[i].data.fd, buf, sizeof buf);
                    if (count == -1)
                    {
                        /* If errno == EAGAIN, that means we have read all
                           data. So go back to the main loop. */
                        if (errno != EAGAIN) {
                            perror("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        /* End of file. The remote has closed the
                           connection. */
                        done = 1;
                        break;
                    }

                    if (!strncmp(buf, "quit\n", 5))
                    {
                        done = 1;
                        break;
                    }

                    s = write(eps[i].data.fd, buf, count);
                    if (s == -1)
                    {
                        error("write");
                    }
                }

                if (done)
                {
                    // printf("Closed connection on descriptor %d\n", eps[i].data.fd);

                    /* Closing the descriptor will make epoll remove it
                       from the set of descriptors which are monitored. */
                    close(eps[i].data.fd);
                }
            }
        }
    }

    free(eps);

    close(sfd);

    return EXIT_SUCCESS;
}
在linux 2.6.32上成功编译,说明我用的内核已经打了SO_REUSEPORT的补丁包。进行上面的测试后未发现惊群现象

通过netstat命令可以看到两个子进程同时监听80端口:

$ ps -ef|grep a.out
yanxurui  69376  61940  0 15:22 pts/3    00:00:00 ./a.out 8000
yanxurui  69377  69376  0 15:22 pts/3    00:00:00 ./a.out 8000
yanxurui  69378  69376  0 15:22 pts/3    00:00:00 ./a.out 8000
$ netstat -tunlp|grep 8000
tcp        0      0 0.0.0.0:8000            0.0.0.0:*               LISTEN      69377/./a.out
tcp        0      0 0.0.0.0:8000            0.0.0.0:*               LISTEN      69378/./a.out

参考


qq email facebook github
© 2018 - Xurui Yan. All rights reserved
Built using pelican