UNIX epoll 与 Node.js 事件循环多路分解器

关于I / O复用,能够先查阅《UNIX网络编程》第六章selectpoll相关内容。html

selectpollepoll都是I / O复用的机制,在《UNIX网络编程》里重点讲了selectpoll的机制,但selectpoll并非现代高性能服务器的最佳选择。包括如今的Node.js中的事件循环机制(event loop)也是基于epoll实现的。linux

select和poll的缺点

按照《UNIX网络编程》中所述,pollselect相似,没有解决如下的问题:git

  • 每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大
  • 同时每次调用select都须要在内核遍历传递进来的全部fd,这个开销在fd不少时也很大
  • select支持的文件描述符数量过小了,默认是1024

epoll对于上述缺点的改进

epoll既然是对selectpoll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此以前,咱们先看一下epollselectpoll的调用接口上的不一样,selectpoll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctlepoll_waitepoll_create是建立一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。   对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把全部的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每一个fd在整个过程当中只会拷贝一次。   对于第二个缺点,epoll的解决方案不像selectpoll同样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每一个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工做实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是相似的)。   对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大能够打开文件的数目,这个数字通常远大于2048,举个例子,在1GB内存的机器上大约是10万左右,通常来讲这个数目和系统内存关系很大。github

epoll接口

epoll操做过程须要三个接口,分别以下:编程

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
复制代码

epoll_create方法

#include <sys/epoll.h>
int epoll_create(int size);
复制代码

建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不一样于select()中的第一个参数,给出最大监听的fd+1的值,参数size并非限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。 当建立好epoll句柄后,它就会占用一个fd值,在linux下若是查看/proc/进程id/fd/,是可以看到这个fd的,因此在使用完epoll后,必须调用close()关闭,不然可能致使fd被耗尽。segmentfault

#include <sys/epoll.h>
#define FDSIZE 1024
// ...
int main(int argc,char *argv[]) {
  int epollfd = epoll_create(FDSIZE); // 这里并非指最大文件描述符数量为1024,而是给内核初始化数据结构的一个建议。
  return 0;
}
复制代码

epoll_ctl方法

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
复制代码

epoll_ctl方法是epoll的事件注册函数,它不一样与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。设计模式

  • epfd:是epoll_create()的返回值。
  • op:表示对对应的fd文件描述符的操做,通常状况下表示想要监听事件、删除事件和修改事件处理函数,用三个宏来表示:
    • EPOLL_CTL_ADD,表示对于对应的fd文件描述符添加一组事件监听;
    • EPOLL_CTL_DEL,表示对于对应的fd文件描述符删除该组事件监听;
    • EPOLL_CTL_MOD,表示对于对应的fd文件描述符修改该组事件监听为新的events
  • fd:表示须要监听的fd(文件描述符)
  • event:是告诉内核须要监听的事件集合,传入一个指针,指向事件集合的第一项,struct epoll_event的结构以下:
struct epoll_event {
  __uint32_t events;  // 表示一类epoll事件
  epoll_data_t data;  // 用户传递的数据
}

复制代码

由于events表示一类epoll事件,它能够是如下几个宏的集合:服务器

  • EPOLLIN:表示对应的文件描述符能够读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符能够写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来讲的;
  • EPOLLONESHOT:只监听一次事件,当监听完此次事件以后,若是还须要继续监听这个socket的话,须要再次把这个socket加入到EPOLL队列里;

例如,若是想让epoll对于对应的文件描述符fd添加一组事件,监听对应的文件描述符可读的状况:网络

static void add_event_epoll_in(int epollfd, int fd) {
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd;
  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
复制代码

epoll_wait方法

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
复制代码

epoll_wait方法等待事件的产生,相似于select()调用,返回须要处理的事件数目。数据结构

  • epfd:是epoll_create()的返回值;
  • events:用来从内核获得事件的集合,咱们通常把须要处理的事件对应的文件描述符fd放到events结构体下data参数内,这样咱们能够在事件处理函数中取到对应的文件描述符fd,执行对应操做(例如对于TCP套接字,咱们调用readwrite等);
  • maxevents:告以内核这个events的数量,这个maxevents的值不能大于建立epoll_create()时的size,不然会形成溢出的风险;
  • timeout:超时时间,以毫秒为单位,若是设置为-1,表示一直等待,设置为0表示不等待;

举例:应用程序通常阻塞与epoll_wait调用,一旦events中任意一事件触发,epoll_wait执行,等待指定的timeout超时时间,若是I / O完成则当即返回须要处理的事件数目。

static void do_epoll(int listenfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //建立一个描述符
    epollfd = epoll_create(FDSIZE);
    //添加监听描述符事件
    add_event_epoll_in(epollfd, listenfd);
    for ( ; ; )
    {
        // 获取已经准备好的描述符事件数目
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd, events, ret, listenfd, buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        // 在这里取到须要处理的文件描述符
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
复制代码

使用epoll重构服务器回射程序

服务端

// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100

//函数声明
//建立套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//处理接收到的链接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd,char *buf);
//写处理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);

int main(int argc,char *argv[]) {
    int  listenfd;
    listenfd = socket_bind(IPADDRESS,PORT);
    listen(listenfd,LISTENQ);
    do_epoll(listenfd);
    return 0;
}

static int socket_bind(const char* ip,int port) {
    int  listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET,SOCK_STREAM,0);
    if (listenfd == -1)
    {
        perror("socket error:");
        exit(1);
    }
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET,ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error: ");
        exit(1);
    }
    return listenfd;
}

static void do_epoll(int listenfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //建立一个描述符
    epollfd = epoll_create(FDSIZE);
    //添加监听描述符事件
    add_event(epollfd,listenfd,EPOLLIN);
    for ( ; ; )
    {
        //获取已经准备好的描述符事件
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,listenfd,buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf) {
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
static void handle_accpet(int epollfd,int listenfd) {
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t  cliaddrlen;
    clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
    if (clifd == -1)
        perror("accpet error:");
    else
    {
        printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
        //添加一个客户描述符和事件
        add_event(epollfd,clifd,EPOLLIN);
    }
}

static void do_read(int epollfd,int fd,char *buf) {
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)
    {
        perror("read error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"client close.\n");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else
    {
        printf("read message is : %s",buf);
        //修改描述符对应的事件,由读改成写
        modify_event(epollfd,fd,EPOLLOUT);
    }
}

static void do_write(int epollfd,int fd,char *buf) {
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLOUT);
    }
    else
        modify_event(epollfd,fd,EPOLLIN);
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
复制代码

客户端

// client.c
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>

#define MAXSIZE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787
#define FDSIZE 1024
#define EPOLLEVENTS 20

static void handle_connection(int sockfd);
static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_write(int epollfd,int fd,int sockfd,char *buf);
static void add_event(int epollfd,int fd,int state);
static void delete_event(int epollfd,int fd,int state);
static void modify_event(int epollfd,int fd,int state);

int main(int argc,char *argv[]) {
    int                 sockfd;
    struct sockaddr_in servaddr;
    sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
    connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
    //处理链接
    handle_connection(sockfd);
    close(sockfd);
    return 0;
}


static void handle_connection(int sockfd) {
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    char buf[MAXSIZE];
    int ret;
    epollfd = epoll_create(FDSIZE);
    add_event(epollfd,STDIN_FILENO,EPOLLIN);
    for ( ; ; )
    {
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,sockfd,buf);
    }
    close(epollfd);
}

static void handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf) {
    int fd;
    int i;
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,sockfd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,sockfd,buf);
    }
}

static void do_read(int epollfd,int fd,int sockfd,char *buf) {
    int nread;
    nread = read(fd,buf,MAXSIZE);
        if (nread == -1)
    {
        perror("read error:");
        close(fd);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"server close.\n");
        close(fd);
    }
    else
    {
        if (fd == STDIN_FILENO)
            add_event(epollfd,sockfd,EPOLLOUT);
        else
        {
            delete_event(epollfd,sockfd,EPOLLIN);
            add_event(epollfd,STDOUT_FILENO,EPOLLOUT);
        }
    }
}

static void do_write(int epollfd,int fd,int sockfd,char *buf) {
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
    }
    else
    {
        if (fd == STDOUT_FILENO)
            delete_event(epollfd,fd,EPOLLOUT);
        else
            modify_event(epollfd,fd,EPOLLIN);
    }
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
复制代码

运行结果:

Node.js的Event Loop

众所周知,Node.js是单线程的,可用做高性能服务器。显然,若仅仅限定为1024个描述符,对于高并发的请求显然是不支持的。Node.js内部的Event Demultiplexer(事件多路分解器)借助epoll来实现事件循环机制。其基本步骤以下:

  1. 应用程序经过向Event Demultiplexer(事件多路分解器)提交请求来生成新的I / O操做。应用程序还指定一个处理程序,当操做完成时将调用该处理程序。向Event Demultiplexer(事件多路分解器)提交新请求是一种非阻塞调用,它当即将控制权返回给该应用程序。
  2. 当一组I / O操做完成时,事件多路分解器将新的事件推入Event Queue(事件队列)
  3. 此时Event Loop遍历Event Queue的项目。
  4. 对于每一个事件,调用关联的处理程序。
  5. 处理程序是应用程序代码的一部分,当它执行完成时将控制权返回给Event Loop。可是,在处理程序执行过程当中可能会请求新的异步操做,从而致使新的操做被插入Event Demultiplexer(事件多路分解器)
  6. Event Loop中的全部项目被处理完时,循环将再次阻塞Event Demultiplexer(事件多路分解器),当有新事件可用时,Event Demultiplexer(事件多路分解器)将触发另外一个周期。

经过epoll解析上述步骤操做:

  • 事件多路分解器即为经过epoll_create()建立的epoll句柄的抽象,在Node.js启动时,事件多路分解器会阻塞于epoll_wait调用。
  • 对于第一步,注册事件处理程序时,调用epoll_ctl()方法,设定op参数为EPOLL_CTL_ADD,向事件多路分解器添加一组事件。
  • 对于第二步,一旦I / O完成,又调用epoll_ctl()方法,设定op参数为EPOLL_CTL_DEL,删除对应事件,此时把控制权返还给应用程序。
  • 事件循环遍历事件队列,只要没有事件,就阻塞于epoll_wait()
  • 不断重复上述步骤,实现Node.js`的事件循环机制。

参考资料:

相关文章
相关标签/搜索