icon-cookie
The website uses cookies to optimize your user experience. Using this website grants us the permission to collect certain information essential to the provision of our services to you, but you may change the cookie settings within your browser any time you wish. Learn more
I agree
blank_error__heading
blank_error__body
Text direction?

linux select/epoll

0.091字数 1,563阅读 3,026

一、Unix/Linux网络IO模型
在经典的Unix网络编程中,总结出了5种不同的网络IO模型,分别是阻塞式IO,非阻塞IO, IO多路复用,信号驱动IO,以及异步IO模型。

二、为什么IO多路复用应用最广泛
理论上来说异步IO模型性能更好,但是目前阶段在linux平台下,操作系统底层并没有真正实现完全异步IO,当然有可能在未来版本中会支持
而对于信号驱动IO,因为信号没有附加信息,如果一个信号源有多种产生信号的原因,信号接受者就无法区分,但是TCP协议里的事件类型有多种(read, write, accept),另外一个原因是如果基于java语言开发的话,似乎还不支持信号驱动处理。
然后是非阻塞IO, 这个虽然避免了IO阻塞,但是需要不断的主动轮询,浪费CPU资源,效率不高
阻塞式IO的场景一般对每个连接分配一个线程,但是当连接数太大的情况下(比如c10k,c100k),系统不可能创建这么多的线程。(或许协程在某种程度能改善这个问题)
所以最终一比较,至少在linux平台下,目前主流的方案大多基于IO多路复用技术。

linux平台提供的主要的IO多路复用技术有select, poll, epoll,主要目的是为了能让一个或少量的select线程(或reactor线程),来管理多个连接,本质上是基于一个真实生产环境中的特性,比如虽然存在几万个连接,但是在某一时间范围内,有数据可读或者可写的socket并不会很多,既active的连接不会很多。当然如果在一个极端环境下面,比如是一个高速的局域网,并且每个client连接都会一直不断的发送数据,既每个连接都可以看成active的连接,那么基于IO多路复用技术未必是最佳方案。

三、select
linux系统提供select函数来实现多路复用输入/输出模型,select系统调用是用来让我们的应用程序监视多个文件句柄(包括socket文件句柄)的状态变化,程序会在select这里等待,直到被监视的文件句柄有一个或多个发生了状态改变如可读或可写

API原型为:

 int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数说明:
n 一般为最大文件描述符 + 1, 既STDIN_FILENO + 1 该值会有limit限制,一般为1024,所以生产环境基本用epoll,kqueue等代替
readfds 监视是否有可读的文件描述符集合
writefds 监视是否有可写的文件描述符集合
exceptfds 监视是否有异常情况发生的文件描述符集合
timeout 超时时间,如果在某一段时间内依然没有相应事件触发,则会阻塞直到timeout时间过期 timeout.tv_sec单位为秒, timeout.tv_usec单位为微秒

四、poll
poll算select的加强版,但基本原理跟select类似,暂不赘述,后续在补充

五、epoll
由于select和poll系统调用存在以下几个问题,Linux内核2.6环境新增Event Poll的方式。

  1. select/poll每次检查的时候是通过遍历所有的文件描述符(fd), 尤其是对于网络scoket而言,大部分存在这么一个特性,既某一个时间点里,只有很少一部分的socket是“活跃”状态,如果每次都是遍历所有的网络文件描述符的话,当文件描述符变大之后,性能就会随着连接数变大之后线型下降

  2. select存在最大文件描述符的限制,具体取决于常量FD_SETSIZE,默认大小为1024, 不能满足大量的客户端连接.

反观epoll,则改进了以上不足的地方

  1. 在内核实现中epoll是根据每个fd上面的callback函数实现的。可以做到只有“活跃”的socket才会主动的去调用 callback函数,其他idle状态socket则不会

  2. epoll没有对fd描述符有限制,理论上取决于系统内存大小, 可以通过命令 cat /proc/sys/fs/file-max查看,大概1G内存可以创建10w个连接

  3. epoll的具体实现使用mmap加速内核与用户空间的消息传递,进一步提高性能

epoll实际包含3个系统调用组成,分别为epoll_create(), epoll_ctl(), epoll_wait()

  1. epoll_create用于创建epoll的实例,其中参数size只要大于0即可,内核会动态获取大小,函数返回epoll本身的描述符
int epoll_create(int size);   
  1. epoll_ctl用于添加,修改,删除要监听的event事件
    参数op为EPOLL_CTL_ADD代表添加,EPOLL_CTL_MOD代表修改,EPOLL_CTL_DEL代表删除
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);    
  1. epoll_wait用于监视等待是否有IO事件发生,直到timeout过期
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);   

epoll中存在两种工作模式 LT 和 ET
二者的差异在于 level-trigger (LT) 模式下只要某个 socket 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 socket;而 edge-trigger (ET) 模式下只有某个 socket 从 unreadable 变为 readable 或从unwritable 变为 writable 时,epoll_wait 才会返回该 socket。

如下两个示意图:

所以, 在epoll的ET模式下, 正确的读写方式为:
读: 只要可读, 就一直读, 直到返回0, 或者 errno = EAGAIN
写: 只要可写, 就一直写, 直到数据发送完, 或者 errno = EAGAIN

六、epoll demo server端

//epoll_server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
 
#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char *buf);
 
int main(int argc, char *argv[])
{
   int serv_sock, clnt_sock;
   struct sockaddr_in serv_adr, clnt_adr;
   socklen_t adr_sz;
   int str_len, i;
   char buf[BUF_SIZE];
 
   struct epoll_event *ep_events;
   struct epoll_event event;
   int epfd, event_cnt;
 
   if(argc!=2) {
      printf("Usage : %s <port>\n", argv[0]);
      exit(1);
   }
 
   serv_sock=socket(PF_INET, SOCK_STREAM, 0);
   memset(&serv_adr, 0, sizeof(serv_adr));
   serv_adr.sin_family=AF_INET;
   serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
   serv_adr.sin_port=htons(atoi(argv[1]));
    
   if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
      error_handling("bind() error");
   if(listen(serv_sock, 5)==-1)
      error_handling("listen() error");
 
   epfd=epoll_create(EPOLL_SIZE);
   ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
 
   event.events=EPOLLIN;
   event.data.fd=serv_sock;  
   epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
 
   while(1)
   {
      event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
      if(event_cnt==-1)
      {
         puts("epoll_wait() error");
         break;
      }
 
      for(i=0; i<event_cnt; i++)
      {
         if(ep_events[i].data.fd==serv_sock)
         {
            adr_sz=sizeof(clnt_adr);
            clnt_sock=
               accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
            event.events=EPOLLIN;
            event.data.fd=clnt_sock;
            epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
            printf("connected client: %d \n", clnt_sock);
         }
         else
         {
               str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
               if(str_len==0)    // close request!
               {
                  epoll_ctl(
                     epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
                  close(ep_events[i].data.fd);
                  printf("closed client: %d \n", ep_events[i].data.fd);
               }
               else
               {
                  write(ep_events[i].data.fd, buf, str_len);    // echo!
               }
    
         }
      }
   }
   close(serv_sock);
   close(epfd);
   return 0;
}
 
void error_handling(char *buf)
{
   fputs(buf, stderr);
   fputc('\n', stderr);
   exit(1);
}

七、epoll demo client端

//epoll_client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
 
#define BUF_SIZE 1024
void error_handling(char *message);
 
int main(int argc, char *argv[])
{
   int sock;
   char message[BUF_SIZE];
   int str_len;
   struct sockaddr_in serv_adr;
 
   if(argc!=3) {
      printf("Usage : %s <IP> <port>\n", argv[0]);
      exit(1);
   }
    
   sock=socket(PF_INET, SOCK_STREAM, 0);  
   if(sock==-1)
      error_handling("socket() error");
    
   memset(&serv_adr, 0, sizeof(serv_adr));
   serv_adr.sin_family=AF_INET;
   serv_adr.sin_addr.s_addr=inet_addr(argv[1]);
   serv_adr.sin_port=htons(atoi(argv[2]));
    
   if(connect(sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr))==-1)
      error_handling("connect() error!");
   else
      puts("Connected...........");
    
   while(1)
   {
      fputs("Input message(Q to quit): ", stdout);
      fgets(message, BUF_SIZE, stdin);
       
      if(!strcmp(message,"q\n") || !strcmp(message,"Q\n"))
         break;
 
      write(sock, message, strlen(message));
      str_len=read(sock, message, BUF_SIZE-1);
      message[str_len]=0;
      printf("Message from server: %s", message);
   }
    
   close(sock);
   return 0;
}
 
void error_handling(char *message)
{
   fputs(message, stderr);
   fputc('\n', stderr);
   exit(1);
}
Measure
Measure
Related Notes
Get a free MyMarkup account to save this article and view it later on any device.
Create account

End User License Agreement

Summary | 3 Annotations
遍历所有的文件描述符(fd)
2020/09/04 06:03
fd上面的callback函数实现的
2020/09/04 06:03
mmap加速内核与用户空间的消息传递
2020/09/04 06:03