前面的两篇文章中介绍了微信的libco库如何hook系统函数和协程的建立和管理,本篇文章将介绍libco库是进行事件管理的。java
libco库使用一种相似时间片的技术进行轮询,使得每一个注册在其上的事件都有机会执行。segmentfault
在上一篇文章中介绍stCoRoutineEnv_t
时,咱们将stCoEpoll_t
这个结构跳过了,如今咱们来仔细分析下这个数据结构。数组
struct stCoEpoll_t { int iEpollFd; static const int _EPOLL_SIZE = 1024 * 10; struct stTimeout_t *pTimeout; //用于保存timeout item struct stTimeoutItemLink_t *pstTimeoutList; // 在后续的event_ loop中介绍 struct stTimeoutItemLink_t *pstActiveList; co_epoll_res *result; };
stCoEpoll_t
中主要保存了epoll监听的fd,以及注册在其中的超时事件。stTimeoutItem_t
实际上是libco库实现的双向链表,有prev和next指针,同时保存了链表指针。后面在使用过程当中再介绍stTimeout_t
。服务器
struct stTimeoutItem_t { enum { eMaxTimeout = 40 * 1000 //40s }; stTimeoutItem_t *pPrev; stTimeoutItem_t *pNext; stTimeoutItemLink_t *pLink; unsigned long long ullExpireTime; OnPreparePfn_t pfnPrepare; OnProcessPfn_t pfnProcess; void *pArg; // routine bool bTimeout; }; struct stTimeoutItemLink_t { stTimeoutItem_t *head; stTimeoutItem_t *tail; }; struct stTimeout_t { stTimeoutItemLink_t *pItems; int iItemSize; unsigned long long ullStart; long long llStartIdx; };
在上篇文章中,在初始化本线程的stCoRoutineEnv_t
时,在co_init_curr_thread_env
的最后,会调用AllocEpoll() => AllocTimeout()
方法,咱们看一下AllocTimeout中具体作了哪些事情。微信
stTimeout_t *AllocTimeout( int iSize ) { stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) ); lp->iItemSize = iSize; lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize ); lp->ullStart = GetTickMS(); lp->llStartIdx = 0; return lp; }
下面以一个简单的客户端连接服务器的例子在说明在libco中是如何添加监听事件的。网络
fd = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in addr; SetAddr(endpoint->ip, endpoint->port, addr); ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
因为在libco库中hook了socket和connect的函数,所以,这个逻辑会调用poll
函数,最终将调用co_poll_inner
。下面介绍co_poll_inner
的具体逻辑。
第一步,先将epoll结构转换成poll结构(不清楚为何必定要转换成poll类型,难道是为了兼容性吗?)数据结构
//1.struct change stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t))); memset( &arg,0,sizeof(arg) ); arg.iEpollFd = epfd; arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd)); arg.nfds = nfds; stPollItem_t arr[2]; if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) { arg.pPollItems = arr; } else { arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) ); } memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) ); arg.pfnProcess = OnPollProcessEvent; //记住这个函数,后续有用 arg.pArg = GetCurrCo( co_get_curr_thread_env() );//参数为当前Env指针
第二步,将poll结构加入到epoll的监听事件中
第三步,添加timeout事件框架
//3.add timeout unsigned long long now = GetTickMS(); arg.ullExpireTime = now + timeout; int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 将本事件加入到timeout的指定链表中 int iRaiseCnt = 0; if( ret != 0 ) { co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld", ret,now,timeout,arg.ullExpireTime); errno = EINVAL; iRaiseCnt = -1; } else { co_yield_env( co_get_curr_thread_env() ); iRaiseCnt = arg.iRaiseCnt; }
在AllocTimeout
只初始化了60*1000(即60s)的链表数组,此时在AddTimeout
中,将根据本监听事件的超时时间添加到对应的数组index中的链表中,是否是比较相似于java
中的HashMap
的实现方式?
这里有个问题,若是超时时间超过了60s,那么超时事件都会添加到当前index的前一个游标处,至关于有可能61s,65s的超时事件都会在同一个timeout链表中,那么会不会出现,因为时间还没到,而超时事件被处理呢?异步
AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
添加完超时事件后,本协程调用co_yield_env
放弃执行,stRoutineEnv_t将会调用其余的协程进行处理。socket
将事件都加入到timeout链表,以及注册到epoll fd后,main 协程将调用co_eventloop
进行轮询。
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg ) { if( !ctx->result ) { ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE ); } co_epoll_res *result = ctx->result; for(;;) { // 1. 调用epoll_wait int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 ); stTimeoutItemLink_t *active = (ctx->pstActiveList); stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList); // 将timeout链表清空 memset( timeout,0,sizeof(stTimeoutItemLink_t) ); // 处理poll事件 for(int i=0;i<ret;i++) { stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr; if( item->pfnPrepare ) { // 这个函数基本是 OnPollPreparePfn // 在pollPreaprePfn中,将poll_inner中添加的timeout事件删除,并添加到active list中 item->pfnPrepare( item,result->events[i],active ); } else { AddTail( active,item ); } } // 2. 将stTimeout_t中的timeout事件所有添加到timeout链表中 unsigned long long now = GetTickMS(); TakeAllTimeout( ctx->pTimeout,now,timeout ); // 设置其为timeout事件 stTimeoutItem_t *lp = timeout->head; while( lp ) { //printf("raise timeout %p\n",lp); lp->bTimeout = true; lp = lp->pNext; } // 3. 添加timeoutList 到 active list Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout ); // 4. 对active list进行遍历执行 lp = active->head; while( lp ) { PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active ); // 这里会对timeout事件进行判断,若时间不超时,仍然会将其加入到stTimeout_t的timeout数组队列中 if (lp->bTimeout && now < lp->ullExpireTime) { int ret = AddTimeout(ctx->pTimeout, lp, now); if (!ret) { lp->bTimeout = false; lp = active->head; continue; } } if( lp->pfnProcess ) { lp->pfnProcess( lp ); } lp = active->head; } if( pfn ) { if( -1 == pfn( arg ) ) { break; } } } }
具体步骤以下:
至此,整个libco库的事件监听的分析已经完成。
每一个网络框架都会有一个相似eventloop的函数,用于轮询注册的io事件,libco库也不例外,轮询就是比较简单粗暴,可是又是颇有效果。libco库将socket相关的函数都进行了hook,使得调用者可使用同步的方法进行编码,却可以异步的执行。