#Redis本身的事件模型 aec++
##1.Redis的事件模型库vim
你们到网上Google“Redis libevent”就能够搜到Redis为何没有选择libevent以及libev为其事件模型库,而是本身写了一个事件模型。 从代码中能够看到它主要支持了epoll、select、kqueue、以及基于Solaris的event ports。主要提供了对两种类型的事件驱动:api
##2.使用示例数组
这里写了一个由标准输入的读事件驱动的echo服务例子,同时用一个5秒的循环定时器每一个5秒打印一次服务器状态。这里用了epoll为底层 事件接口。具体的代码抽取能够从Redis的源码中抽取"ae.c"、“ae.h”、"ae_select.c"、“ae_epoll.c”、"ae_evport.c"这几个文件,经过 ae.c中的宏::服务器
#define HAVE_EPOLL 1 // illustrate to use epoll #ifdef HAVE_EVPORT # include "ae_evport.c" #else #ifdef HAVE_EPOLL # include "ae_epoll.c" #else #ifdef HAVE_KQUEUE # include "ae_kqueue.c" #else # include "ae_select.c" #endif #endif #endif
这里主要是分析Redis的事件模型的封装,所以对于其对socket的包装以及内存管理都不作分析。故采用标准输入,同时须要将这些文件中 的内存管理接口"zmalloc()"以及"zfree()"替换成C库中的“malloc()”还有"free()"。可使用sed或者vim的%s作替换操做。数据结构
将主程序贴在这里::app
#include "ae.h" #include <stdio.h> #include <assert.h> #include <unistd.h> #include <sys/time.h> #define MAXFD 5 void loop_init(struct aeEventLoop *l) { puts("I'm loop_init!!! \n"); } void file_cb(struct aeEventLoop *l,int fd,void *data,int mask) { char buf[51] ={0}; read(fd,buf,51); printf("I'm file_cb ,here [EventLoop: %p],[fd : %d],[data: %p],[mask: %d] \n",l,fd,data,mask); printf("get %s",buf); } int time_cb(struct aeEventLoop *l,long long id,void *data) { printf("now is %ld\n",time(NULL)); printf("I'm time_cb,here [EventLoop: %p],[id : %lld],[data: %p] \n",l,id,data); return 5*1000; } void fin_cb(struct aeEventLoop *l,void *data) { puts("call the unknow final function \n"); } int main(int argc,char *argv[]) { aeEventLoop *l; char *msg = "Here std say:"; char *user_data = malloc(50*sizeof(char)); if(! user_data) assert( ("user_data malloc error",user_data) ); memset(user_data,'\0',50); memcpy(user_data,msg,sizeof(msg)); l = aeCreateEventLoop(MAXFD); aeSetBeforeSleepProc(l,loop_init); int res; res = aeCreateFileEvent(l,STDIN_FILENO,AE_READABLE,file_cb,user_data); printf("create file event is ok? [%d]\n",res); res = aeCreateTimeEvent(l,5*1000,time_cb,NULL,fin_cb); printf("create time event is ok? [%d]\n",!res); aeMain(l); puts("Everything is ok !!!\n"); return 0; }
没有什么逻辑,就是注册一个标准输入的读事件,和一个定时器事件。这里要说明的就是在ae.h中定义了读、写、定时器等回调函数的类型::less
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
按这个类型定义回调函数就能够。其中asFileProc和aeTimeProc比较容易理解,就是IO读写事件和定时器事件的回调函数。这里要注意了,若是 定义的定时器回调函数返回值为正数,那么表示该定时器是一个循环定时器,即在第一次执行完后添加定时器事件时给定的延迟后不删除定时器, 在延迟该返回值时间(单位是毫秒)后再次执行该定时器。因此就要注意,好比要每5秒执行一个操做,那么在添加定时器时要给定其定时时间为 5000毫秒,同时在该定时器的回调函数中也要返回5000.dom
而后aeBeforeSleepProc回调函数比较的扑朔迷离,从Sleep上不容易理解,其实想到select和epoll这些机制的做用就能够想到了,这个函数是在 poll以前执行,从源码中看到就是在每一个处理事件的循环开始出执行的。而aeEventFinalizerProc单从名字就更难理解,从源码中看到它是在删除 定时器事件时候执行的。socket
clientData比较好理解,就和在epoll中的ptr指针的做用同样。主要能够存放用户对每一个事件上附加的数据。
事件循环的入口函数是aeMain(),将建立好的aeEventLoop传入就能够了。
使用起来很简单,对于不是很复杂或者对接入层要求不高的应用能够一试。
##3.ae.c分析
Redis的ae(姑且这么称呼Redis用的事件模型库的名字)主要逻辑在文件“ae.c”中,其中根据使用的系统事件接口分别选择包含"ae_epoll.c"或其余 文件。用到的主要数据结构在文件“ae.h”中定义。下面用一个不规范的UML类图表示了几个主要数据结构之间的关系,其中连在一块儿的表示一个数组或者 箭头表示的链表。这么画主要是帮助理解。
下面根据上面的示例程序一一作说明。
###3.1 主要数据结构的建立
####3.1.1 aeCreateEventLoop
首先要建立一个aeCreateEventLoop对象。该对象须要一个最大文件描述符做为参数setSize,这个参数的意义须要了解ae的数据存放结构。从上面的图能够看到 在aeEventLoop结构中有两个数组(其实就是服务器程序惯用提早分配好内存而后用index映射到相应位置的作法),这两个数组的大小就是这里的参数值。 ae会建立一个 setSize*sizeof(aeFileEvent)
以及一个 setSize*siezeof(aeFiredEvent)
大小的内存,用文件描述符做为其索引。这一能够达到0(1)的速度找到事件数据所在位置。那么这个大小定位多少合适呢?在Linux个中,文件描述符是个有限的资源,当打开一个文件时就会消耗一个文件描述符,当关闭该文件描述符或者程序结束时会释放该文件描述符资源,从而供其余文件打开操做使用。当文件描述符超过最大值后,打开文件就会出错。那么这个最大值是多少呢?能够经过/proc/sys/fs/file-max看到系统支持的最大的文件描述符数。经过 ulimit -n
能够看到当前用户能打开的最大的文件描述符。在我这里的一台8g内存的机器上,系统支持最大的文件描述是365146。而在这台64bit的机器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent)
大小为40byte。按系统最大支持的文件描述符来算,固定消耗内存为14.6M。这样以文件描述符做为数组的下标来索引,虽然这样的哈希在接入量不大的状况下会有大量的浪费。可是最多也就浪费14M 的内存,所以这样的设计是可取的。
在作好这些准备后还要准备系统提供的事件模型接口。这里以epoll为例,其余的能够自行查阅源码了解。ae首先提供了一个统一的结构名aeApiState,能够想象成c++中接口。在包装epoll的aeApiState中有一个epfd表示epoll占用的fd,一个epoll_event *events,其实也是一个aeApiState数组::
其和aeFiredEvent相对应,当epoll_wait()返回时,会将pending的文件描述符的信息放在aeFiredEvent数组中,包括有fd,以及mask事件类型,此时的aeFiredEvent不是以fd做为下标的,而是把这个数组当成一个缓冲区,存放一次epoll_wait()返回的全部fd,同时用epoll_event数组存放了epoll_wait()返回中的epoll_data数据,用其数据能够填充aeFiredEvent数组的内容供ae使用找到pending d的aeFileEvent对象。并在下一次进入epoll_wait()前处理完。这样完成了对epoll数据封装。
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;
建立IO事件时须要指定要要注册的文件的文件描述符fd,以及要监听的事件类型mask。ae会先经过fd找到其对应的aeCreateFileEvent对象所在内存位置::
typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
而后添加其要监听的事件类型mask fe->mask |= mask;
,接着回根据要监听的类型添加其读事件或者写事件的回调函数,即aeFileProc。同时更新maxfd以备后用,如在select中的最大fd的指定。
在建立文件事件的过程当中还要经过宏判断后include进来的底层事件模型接口来注册IO事件。这里和上面同样以epoll为例,其余的事件模型也相似。经过aeApiAddEvent将文件描述符fd和事件类型mask传给epoll操做。首先经过fd为下标找到aeCreateFileEvent对应的位置,而后取得epoll的epfd.经过EPOLL_CTL_ADD和EPOLL_CTL_MOD来加入或者修改epoll在该fd上事件的类型。
####3.1.3 aeCreateTimeEvent
ae的定时器是用一个单链表来管理的,将定时器依次从head插入到单链表中。插入的过程当中会取得将来的墙上时间做为其超时的时刻。即将当前时间加上添加定时器时给定的延迟时间。定时器结构以下。并设置超时以及注销定时器时的回调函数还用clientData::
typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
###3.2 事件循环
####3.2.1 aeMain入口函数
ae事件循环的基本机构就是用一个无限循环,而后再循环中去检测各个事件的发生。固然这里不是彻底意义上的轮询,由于循环里面封装了epoll,select等事件驱动机制::
while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); }
这里的beforesleep就是上文中叙述过的,进入一次循环以前作的操做。后面会说到定时的过程其实也就是一个epoll或者select模拟的sleep过程,而等待事件到来也是“sleep”在epoll或者select上。因此这个叫名字感受也算贴切。固然这里是YY一下。不过能够帮助理解。
####3.2.2 aeProcessEvents
ae中最主要的逻辑应该也就是事件的处理了。从上面知道aeProcessEvents是处理事件的入口。在进入事件处理函数时,首先若没有任何事件则当即返回::
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
这里注释中说的ASAP我不太理解表示的啥意思,望高人指点。
而后判断是否有定时器事件,若是有那么就去取得最近的一个将超时定时器的时间减去当前时间做为epoll或者select等事件接口的超时时间。该寻找过程就是经过遍历单链表得来的。这样指定超时时间,在有IO事件pending时能够处理IO事件,同时若没有则能够保证从epoll或者select中返回去处理定时器事件。不过这里也能够不注册定时器事件而后将事件的flags与上AE_DONT_WAIT,那么就会在poll中一直等待IO时间的到来。
在得到事件接口的超时时间后,用其调用封装事件接口的函数aeApiPoll。这里依旧以epoll做为示例。其将首先得到apidata,而后从中得到epoll的文件描述符epfd,并用events指针指向的数组内存以及超时时间调用epoll的epoll_wait().在上面已经描述了,epoll()返回时会将结果放在epoll_event数组中同时返回新的文件描述符。经过对返回数据的事件类型作判断能够填充到aeFiredEvent中fd和事件类型信息。
而后返回到ae的逻辑中,经过遍历aeFiredEvent数组取得fd能够找到pending事件的aeFileEvent,而后根据事件的类型去调用用户定义的IO回调函数。
当epoll或者select超时返回时并注册了定时器事件时,经过processTimeEvents进入去处理超时事件::
/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } }
这里的注释说明了这么作的意义,其实就是若是系统事件变动了,就将全部的定时器时间设为0,让他在本次循环中超时并被执行。
当一个定时器被处理的时候,此时可能会加入新的定时,好比在定时器处理函数中加入新的定时器。而此时仅应该处理上一个时间段的状态,不该该在本次循环中去处理新的定时器。所以ae在EventLoop中加入了一个timeEventNextId的成员表示这次循环中最大的定时器id+1,这样在遍历定时器列表时,先保存最大的定时器id,而后遍历过程过滤掉定时器列表可能加入新的定时器便可::
if (te->id > maxId) { te = te->next; continue; }
这里定时器的逻辑是若单链表中的定时器时间比当前时间晚就执行定时器注册的回调函数。若是该回调函数返回正值,那么就更新定时器时间为该值以后,从而能够循环执行定时器。若是该回调函数返回AE_NOMORE,那么在执行完回调函数后注销该定时器。
###3.3 清理工做
####3.3.1 注销IO事件
注销IO事件不是以aeFileEvent为单位而是该IO事件加上其监听的事件类型为对象,所以其接口为aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)。其首先经过fd找到去掉aeFileEvent对象,而后得到已有的mask,对其进行减操做后,构成fd上新的mask事件类型。经过修改epoll或者select中注册的IO事件来完成。这里以epoll为例,会根据该文件描述符上是否还有待等待的事件类型分别调用epoll_ctr的EPOLL_CTL_MOD或者EPOLL_CTL_DEL命令。
####3.3.2注销Timer时间
注销定时器事件的操做比较暴力,直接遍历链表,找到定时器id匹配的项,使用单链表删除操做进行删除。这里再删除以前会调用定时器上的finalizerProc。
####3.3.3注销aeEventLooop
最后注销aeEventLooop就是对相关内存的释放。
##4.总结
分析到这就结束了。感受ae比较的直观。主要提供了一个IO事件和定时器事件的事件驱动模型。定时器的单链表逻辑能够再改进,好比用最小堆或者Timing-Wheel等著名的定时器解决方法。这样的一个模型用select能够跨到Windows上。所以用这套东西写的server再客户端测试的时候,也能够复用接入层。