Reids 是采用单线程和IO多路复用是处理来之客户端的请求的,其中主要用到了evport,epoll,kqueue,select四种多路复用(按优先顺序)。api
其中evport应该是Solaris上的,epoll是Linux上,kqueue则是FreeBSD上,而最后的select则是兼容性的选择,不少系统都支持。数组
// ae.c /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
Redis将以上4种,都封装好,而后使用aeEventLoop
进行调用。app
在Redis的main函数最后,会调用aeMain()
进入事件循环,直至结束:ide
// server.c int main(int argc, char **argv) { ...... // 设置每次事件循环等待前的事件处理 aeSetBeforeSleepProc(server.el,beforeSleep); // 设置每次事件循环等待后的事件处理 aeSetAfterSleepProc(server.el,afterSleep); // 进入事件循环 aeMain(server.el); aeDeleteEventLoop(server.el); return 0; }
aeEventLoop
aeEventLoop
的结构体// ae.h /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop;
maxfd
: 记录aeEventLoop
时间循环中,注册的文件描述符最大值setsize
: 最大文件描述符支持的数量timeEventNextId
: 记录时间事件的Id,每添加一个时间事件则增1,每处理一个时间事件则减一(详细能够看ae.c
中的函数aeCreateTimeEvent()
和processTimeEvents()
)lastTime
: 检查系统时钟误差所用的events
: 一个长度为setsize
大小的aeFileEvent
数组,其中数组的下标是文件描述符,例event[fd]
,能够参考aeCreateFileEvent()
函数,aeFileEvent
中保存在该fd
注册的事件和事件对应着的读写操做(具体结构参考下文)。fired
: 一个长度为setsize
大小的aeFiredEvent
数组,该数组是保存在该次事件循环中,有哪些fd
触发了事件,触发了什么事件(具体结构参考下文)。timeEventHead
: 记录着时间事件的链表头,时间事件是保存在一个链表中的(具体结构参考下文)stop
: 事件循环aeEventLoop
是否已经中止的标识符,当stop
为1时,表示已经中止了apidata
: 是给封装4个多路复用(evport
,epoll
,kueue
,select
)时使用的。beforesleep
: 保存着事件循环阻塞前调用的方法(参考aeSetBeforeSleepProc()
)aftersleep
: 保存着事件循环阻塞后调用的方法(参考aeSetAfterSleepProc()
)aeFileEvent
结构体/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
mask
: 记录该文件描述符fd
已经注册了什么事件
READABLE
: “可读”事件WRITABLE
: “可写”事件BARRIER
: 表示在处理“可写”事件以前,不处理“可读”事件rfileProc
: 记录读操做的函数指针wfileProc
: 记录写操做的函数指针clientData
: 私有数据aeFiredEvent
结构体/* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
fd
: 有事件触发的文件描述符mask
: 记录文件描述符触发了什么事件的标识aeMain()
和aeProcessEvents()
的流程aeMain()
是aeEventLoop
中的main函数(废话),调用后,就会循环调用aeProcessEvents()
,直至stop
为1。函数
aeProcessEvents()
则是事件循环的主要操做,他会调用多路复用函数并阻塞对应的时间和计算触发时间事件。oop
// ae.c // 事件循环的主程序 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 触发sleep前的事件,其实就是 epoll_wait() 阻塞前执行 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 真正调用epoll的函数,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP 表示触发全部类型的时间和触发epoll_wait()阻塞后的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
// ae.c /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * the events that's possible to process without to wait are processed. * * The function returns the number of events processed. */ // 处理时间事件,调用各个文件描述符对应的读就绪/写就绪的函数,还有触发 AfterSleepProc // flags 是标识函数处理的方式, // AE_ALL_EVENTS: 标识全部事件都处理 // AE_FILE_EVENTS: 处理文件描述符事件 // AE_TIME_EVENTS: 处理时间事件 // AE_DONT_WAIT: 函数不阻塞(就是aeApiPoll()不会阻塞) // AE_CALL_AFTER_SLEEP: 触发 AfterSleepProc // 其实目前代码看来,只有ae.c 和 networking.c中调用到了该函数 // 基本上都是设为AE_ALL_EVENTS // 差异仅为ae.c中要加上AE_CALL_AFTER_SLEEP // networking.c要加上AE_DONT_WAIT int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ // 若是不须要检查时间事件和文件(指文件描述符)事件,则直接返回 // 其实目前没有这个状况 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ // 若是有文件描述符 // 或 // falgs 表示须要检查时间事件 且 没有标为不等待 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 从数组中找出距离时间最短的时间事件 // PS: 从注释中看aeSearchNearestTimer()是遍历的结构,每次都是O(N) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 若是有时间事件 long now_sec, now_ms; // 获取当前时间 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 计算肯定epoll_wait()的timeout,让epoll_wait()能在下一个时间事件须要触发的时候返回 /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 若是是AE_DONT_WAIT,那就就会将epoll_wait()的timeout设为0,那么就不会阻塞了 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { // 像注释说的那样,这个状况可让epoll_wait()阻塞,直至有时间返回 /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } /* Call the multiplexing API, will return only on timeout or when * some event fires. */ // 这里将会阻塞tvp中的时间(除非设置了AE_DONT_WAIT) // numevents是有多少个事件,其中事件类型和其文件描述符保存在eventLoop->fired中 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ // 调用阻塞后的处理 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 处理每一个eventLoop->fired的事件 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ // AE_BARRIER 是表示优先处理 “可写事件”,Redis默认是优先处理“可读事件”的 int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ // 可读的文件描述符 if (!invert && fe->mask & mask & AE_READABLE) { // rfileProc是为每一个文件描述符注册的读事件 // 好比监听端口的服务端文件描述符,则是在server.c中的initServer()函数中, // 调用了aeCreateFileEvent()来注册的 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ // 可写的文件描述符 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { // rfileProc是为每一个文件描述符注册的写事件 // 好比监听端口的服务端文件描述符,则是在server.c中的initServer()函数中, // 调用了aeCreateFileEvent()来注册的 fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 检查并执行时间事件(若是有须要触发的话) /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
函数 | 做用 |
---|---|
aeEventLoop *aeCreateEventLoop(int setsize) |
aeEventLoop 的建立和初始化函数 |
void aeDeleteEventLoop(aeEventLoop *eventLoop) |
删除释放aeDeleteEventLoop 的函数 |
void aeStop(aeEventLoop *eventLoop) |
中止事件循环 |
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) |
为文件描述符fd 注册事件,并将其添加到eventLoop->events 中 |
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) |
为文件描述符fd 注销事件,并将其从eventLoop->events 中移除 |
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) |
从eventLoop->events 中获取fd 注册时的mask |
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc) |
注册添加时间事件 |
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) |
删除时间事件 |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) |
事件循环的主要函数 |
int aeWait(int fd, int mask, long long milliseconds) |
阻塞等待文件描述符fd 读/写/错误 就绪 |
void aeMain(aeEventLoop *eventLoop) |
Main函数 |
char *aeGetApiName(void) |
获取所使用的多路复用的名字(evport,epoll,kqueue,select) |
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) |
设置阻塞前处理函数 |
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) |
设置阻塞后处理函数 |
int aeGetSetSize(aeEventLoop *eventLoop) |
获取eventLoop 的setsize |
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) |
重置eventLoop 的setsize |
从上面能够看出,事件循环里面,一共处理了3大类的时间:ui
BeforeSleepProc
和AfterSleepProc
的阻塞先后处理下面来看看它们是作了哪些事情this
BeforeSleepProc
在server.c
中,将beforeSleep()
设置为阻塞前的处理函数idea
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */ // 启动集群 if (server.cluster_enabled) clusterBeforeSleep(); /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ // 过时键的回收 if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. */ // 主从的响应 if (server.get_ack_from_slaves) { robj *argv[3]; argv[0] = createStringObject("REPLCONF",8); argv[1] = createStringObject("GETACK",6); argv[2] = createStringObject("*",1); /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[2]); server.get_ack_from_slaves = 0; } /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); /* Write the AOF buffer on disk */ // 写AOF flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ // 释放GIL锁 if (moduleCount()) moduleReleaseGIL(); }
能够看出,其主要工做有:线程
等
因此,在阻塞的启动,其实Redis是会存在其余线程对Redis的数据集等进行处理的。
但在响应客户端请求其中,有且只会有主线程对数据集进行操做,因此使得请求是串行访问。
AfterSleepProc
在server.c
中,将afterSleep()
设置为阻塞后的处理函数
/* This function is called immadiately after the event loop multiplexing * API returned, and the control is going to soon return to Redis by invoking * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); }
在事件循环阻塞后,第一件事是设置GIL锁,使得只有主线程对数据集进行操做。
server.c
的initServer()
函数中)networking.c
中)aof.c
中)cluster.c
中)replication.c
中)sentinel.c
中)在server.c
中,将serverCron()
设置为一个时间事件。
从注释上看,serverCron()
作了挺多东西的:
serverCron()
中,挺巧妙的利用了run_with_period()
使得每一个工做都间隔一段事件执行,避免操做太频繁。
serverCron()
具体的代码就不贴出来了,由于和其余各类功能都有关联,就不是事件循环里面写了。