众所周知,Redis 服务器是一个事件驱动程序。那么事件驱动对于 Redis 而言有什么含义?源码中又是如何实现事件驱动的呢?今天,咱们一块儿来认识下 Redis 服务器的事件驱动。数据库
对于 Redis 而言,服务器须要处理如下两类事件:服务器
接下来,咱们先来认识下文件事件。网络
Redis 基于 Reactor 模式开发了本身的网络事件处理器,这个处理器被称为文件事件处理器(file event handler):数据结构
虽然文件处理器以单线程方式运行,但经过 IO 多路复用程序监听多个套接字,既实现了高性能的网络通讯模型,又能够很好的与 Redis 服务器中其它一样以单线程运行的模块进行对接,保持了 Redis 内部单线程设计的简洁。并发
图 1 展现了文件事件处理器的四个组成部分:dom
文件事件是对套接字的抽象。每当一个套接字准备好执行链接应答(accept)、写入、读取、关闭等操做时,就好产生一个文件事件。由于一个服务器一般会链接多个套接字,因此多个文件事件有可能会并发的出现。socket
而 IO 多了复用程序负责监听多个套接字,并向文件事件分派器分发那些产生事件的套接字。ide
尽管多个文件事件可能会并发的出现,但 IO 多路复用程序老是会将全部产生事件的套接字都放到一个队列里面,而后经过这个队列,以有序、同步的方式,把每个套接字传输给文件事件分派器。当上一个套接字产生的事件被处理完毕以后(即,该套接字为事件所关联的事件处理器执行完毕),IO 多路复用程序才会继续向文件事件分派器传送下一个套接字。如图 2 所示:函数
文件事件分派器接收 IO 多路复用程序传来的套接字,并根据套接字产生的事件类型,调用相应的事件处理器。
服务器会为执行不一样任务的套接字关联不一样的事件处理器。这些处理器本质上就是一个个函数。它们定义了某个事件发生时,服务器应该执行的动做。
Redis 的 IO 多路复用程序的全部功能都是经过包装常见的 select、epoll、evport 和 kqueue 这些 IO 多路复用函数库来实现的。每一个 IO 多路复用函数库在 Redis 源码中都对应一个单独的文件,好比 ae_select.c、ae_poll.c、ae_kqueue.c 等。
因为 Redis 为每一个 IO 多路复用函数库都实现了相同的 API,因此 IO 多路复用程序的底层实现是能够互换的,如图 3 所示:
Redis 在 IO 多路复用程序的实现源码中用 #include
宏定义了相应的规则,**程序会在编译时自动选择系统中性能最高的 IO 多路复用函数库来做为 Redis 的 IO 多路复用程序的底层实现,这保证了 Redis 在各个平台的兼容性和高性能。对应源码以下:
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
IO 多路复用程序能够监听多个套接字的 ae.h/AE_READABLE
和 ae.h/AE_WRITABLE
事件,这两类事件和套接字操做之间有如下对应关系:
IO 多路复用程序容许服务器同时监听套接字的 AR_READABLE 事件和 AE_WRITABLE 事件。若是一个套接字同时产生了两个事件,那么文件分派器会优先处理 AE_READABLE 事件,而后再处理 AE_WRITABLE 事件。简单来讲,若是一个套接字既可读又可写,那么服务器将先读套接字,后写套接字。
Redis 为文件事件编写了多个处理器,这些事件处理器分别用于实现不一样的网络通讯需求。好比说:
在这些事件处理器中,服务器最经常使用的是与客户端进行通讯的链接应答处理器、命令请求处理器和命令回复处理器。
1)链接应答处理器
networking.c/acceptTcpHandle
函数是 Redis 的链接应答处理器,这个处理器用于对链接服务器监听套接字的客户端进行应答,具体实现为 sys/socket.h/accept
函数的包装。
当 Redis 服务器进行初始化的时候,程序会将这个链接应答处理器和服务器监听套接字的 AE_READABLE 事件关联。对应源码以下
# server.c/initServer ... /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } ...
当有客户端用 sys/scoket.h/connect
函数链接服务器监听套接字时,套接字就会产生 AE_READABLE 事件,引起链接应答处理器执行,并执行相应的套接字应答操做。如图 4 所示:
2)命令请求处理器
networking.c/readQueryFromClient
函数是 Redis 的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容,具体实现为 unistd.h/read
函数的包装。
当一个客户端经过链接应答处理器成功链接到服务器以后,服务器会将客户端套接字的 AE_READABLE 事件和命令请求处理器关联起来(networking.c/acceptCommonHandler
函数)。
当客户端向服务器发送命令请求的时候,套接字就会产生 AR_READABLE 事件,引起命令请求处理器执行,并执行相应的套接字读入操做,如图 5 所示:
在客户端链接服务器的整个过程当中,服务器都会一直为客户端套接字的 AE_READABLE 事件关联命令请求处理器。
3)命令回复处理器
networking.c/sendReplToClient
函数是 Redis 的命令回复处理器,这个处理器负责将服务器执行命令后获得的命令回复经过套接字返回给客户端。
当服务器有命令回复须要发给客户端时,服务器会将客户端套接字的 AE_WRITABLE 事件和命令回复处理器关联(networking.c/handleClientsWithPendingWrites
函数)。
当客户端准备好接收服务器传回的命令回复时,就会产生 AE_WRITABLE 事件,引起命令回复处理器执行,并执行相应的套接字写入操做。如图 6 所示:
当命令回复发送完毕以后,服务器就会解除命令回复处理器与客户端套接字的 AE_WRITABLE 事件的关联。对应源码以下:
# networking.c/writeToClient ... if (!clientHasPendingReplies(c)) { c->sentlen = 0; # buffer 缓冲区命令回复已发送,删除套接字和事件的关联 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } ...
以前咱们经过 debug 的形式大体认识了客户端与服务器的链接过程。如今,咱们站在文件事件的角度,再一次来追踪 Redis 客户端与服务器进行链接并发送命令的整个过程,看看在过程当中会产生什么事件,这些事件又是如何被处理的。
先来看客户端与服务器创建链接的过程:
server.c/initServer()
)。networking.c/acceptTcpHandler()
)。networking.c/acceptCommonHandler()
)进行关联,使得服务器能够接收该客户端发来的命令请求。此时,客户端已成功与服务器创建链接了。上述过程,咱们仍然能够用 gdb 调试,查看函数的执行过程。具体调试过程以下:
gdb ./src/redis-server (gdb) b acceptCommonHandler # 给 acceptCommonHandler 函数设置断点 (gdb) r redis-conf --port 8379 # 启动服务器
另外开一个窗口,使用 redis-cli 链接服务器:redis-cli -p 8379
回到服务器窗口,咱们会看到已进入 gdb 调试模式,输入:info stack
,能够看到如图 6 所示的堆栈信息。
如今,咱们再来认识命令的执行过程:
server.c/processCommad()
中 lookupCommand
函数调用;server.c/processCommad()
中 call
函数调用。network.c/writeToClient()
函数。图 7 展现了命令执行过程的堆栈信息。图 8 则展现了命令回复过程的堆栈信息。
上一节咱们一块儿认识了文件事件。接下来,让咱们再来认识下时间事件。
Redis 的时间时间分为如下两类:
对于时间事件,数据结构源码(ae.h/aeTimeEvent):
/* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
主要属性说明:
时间事件进程执行的函数为 ae.c/processTimeEvents()
。
此外,对于时间事件的类型区分,取决于时间事件处理器的返回值:
ae.h/AE_NOMORE
,为定时事件。该事件在到达一次后就会被删除;ae.h/AE_NOMORE
,为周期事件。当一个周期时间事件到达后,服务器会根据事件处理器返回的值,对时间事件的 when_sec 和 when_ms 属性进行更新,让这个事件在一段时间以后再次到达,并以这种方式一致更新运行。好比,若是一个时间事件处理器返回 30,那么服务器应该对这个时间事件进行更新,让这个事件在 30 毫秒后再次执行。持续运行的 Redis 服务器须要按期对自身的资源和状态进行检查和调整,从而确保服务能够长期、稳定的运行。这些按期操做由 server.c/serverCron()
函数负责执行。主要操做包括:
Redis 服务器以周期性事件的方式来运行 serverCron 函数,在服务器运行期间,每隔一段时间,serverCron 就会执行一次,直到服务器关闭为止。
关于执行次数,可参见 redis.conf
文件中的 hz 选项。默认为 10,表示每秒运行 10 次。
因为服务器同时存在文件事件和时间事件,因此服务器必须对这两种事件进行调度,来决定什么时候处理文件事件,什么时候处理时间事件,以及花多少时间来处理它们等等。
事件的调度和执行有 ae.c/aeProcessEvents()
函数负责。源码以下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* 首先判断是否存在须要监听的文件事件,若是存在须要监听的文件事件,那么经过IO多路复用程序获取 * 准备就绪的文件事件,至于IO多路复用程序是否等待以及等待多久的时间,依发生时间距离如今最近的时间事件肯定; * 若是eventLoop->maxfd == -1表示没有须要监听的文件事件,可是时间事件确定是存在的(serverCron()), * 若是此时没有设置 AE_DONT_WAIT 标志位,此时调用IO多路复用,其目的不是为了监听文件事件是否准备就绪, * 而是为了使线程休眠到发生时间距离如今最近的时间事件的发生时间(做用相似于unix中的sleep函数), * 这种休眠操做的目的是为了不线程一直不停的遍历时间事件造成的无序链表,形成没必要要的资源浪费 */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; /* 寻找发生时间距离如今最近的时间事件,该时间事件的发生时间与当前时间之差就是IO多路复用程序应该等待的时间 */ if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; // 建立 timeval 结构 aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; /* 若是时间之差大于0,说明时间事件到时时间未到,则等待对应的时间; * 若是时间间隔小于0,说明时间事件已经到时,此时若是没有 * 文件事件准备就绪,那么IO多路复用程序应该当即返回,以避免 * 耽误处理时间事件*/ if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 阻塞并等等文件事件产生,最大阻塞事件由 timeval 结构决定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 处理全部已产生的文件事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) // 处理全部已到达的时间事件 processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
将 aeProcessEvents
函数置于一个循环里面,加上初始化和清理函数,就构成了 Redis 服务器的主函数 server.c/main()
。如下是主函数的伪代码:
def main(): // 初始化服务器 init_server(); // 一直处理事件,直到服务器关闭为止 while server_is_not_shutdown(): aeProcessEvents(); // 服务器关闭,执行清理操做 clear_server()
从事件处理的角度来看,Redis 服务器的运行流程能够用流程图 1 来归纳:
如下是事件的调度和执行规则: