Redis源码系列的初衷,是帮助咱们更好地理解Redis,更懂Redis,而怎么才能懂,光看是不够的,建议跟着下面的这一篇,把环境搭建起来,后续能够本身阅读源码,或者跟着我这边一块儿阅读。因为我用c也是好几年之前了,些许错误在所不免,但愿读者能不吝指出。html
曹工说Redis源码(1)-- redis debug环境搭建,使用clion,达到和调试java同样的效果java
曹工说Redis源码(2)-- redis server 启动过程解析及简单c语言基础知识补充linux
曹工说Redis源码(3)-- redis server 启动过程完整解析(中)redis
曹工说Redis源码(4)-- 经过redis server源码来理解 listen 函数中的 backlog 参数编程
曹工说Redis源码(5)-- redis server 启动过程解析,以及EventLoop每次处理事件前的前置工做解析(下)api
先给你们复习下前面一讲的功课,你们知道,redis 基本是单线程,也就是说,假设咱们启动main方法的,是线程A,那么,最终,去处理客户端socket链接、读取客户端请求、以及向客户端socket写数据,也仍是线程A。数组
同时,你们想必也知道,redis 里仍是有一些后台任务要作的,好比:网络
字典的rehash(rehash的意思是,redis 里,字典结构,实际上是包含了两个hashtable,通常使用第一个;当须要扩充其size的时候,hashtable[1] 就会扩充内存到扩充后的size,而后,就须要把hashtable[0]里面的数据,所有迁移到 hashtable[1] 来,这个过程,即所谓的rehash),rehash的过程,仍是比较耗时的;异步
redis 里的键,若是设了过时时间,到了过时时间后,这个key,是否是就在redis里不存在了呢?不必定,可是你去访问的时候,确定是看不到了。但这个怎么作到的呢?难道每次来一个这种key,就设置一个timer,在指定过时时间后执行清除任务吗?这个想来,开销太大了;socket
因此,其实分了两种策略:
检查当前的客户端集合,看看哪些是一直空闲,且超过了必定时间的,这部分客户端,被筛选出来,直接干掉,关掉与该客户端之间的长链接。
还有其余一些任务,下边再说。
因此,从上面可知,redis 主要要干两类活,一种是客户端要它干的,好比,我执行个get/set命令,这个优先级比较高;另外一类就是例行工做,每隔多久就得干一次。
前面一讲,咱们已经讲到了下面这个代码:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 若是有须要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) // 1 eventLoop->beforesleep(eventLoop); // 2 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
1处,咱们已经讲完了;本讲,主要讲2处,这个主循环。ok,扯了一堆,let's go!
获取有没有周期任务要执行,若是有,则计算一下,要过多久,才到周期任务的执行时间;把过多久这个时间,算出来后,定义为 timeLeftToScheduledJobTime;若是没有周期任务,这个时间能够定义为null;
若是发现时间已经到了,则表示如今就能够执行这个周期任务了,把timeLeftToScheduledJobTime 设为0
这部分代码,以下所示:
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; // 1 struct timeval tv, *tvp; // 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 2 shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 若是时间事件存在的话 // 那么根据最近可执行时间事件和如今时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms; // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 /** * 3 获取当前时间,这里把两个long 局部变量的地址传进去了,在里面会去修改它 */ aeGetTime(&now_sec, &now_ms); tvp = &tv; // 4 tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000; tvp->tv_sec--; } else { tvp->tv_usec = (shortest->when_ms - now_ms) * 1000; } // 5 时间差小于 0 ,说明事件已经能够执行了,将秒和毫秒设为 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 执行到这一步,说明没有时间事件 if (flags & AE_DONT_WAIT) { // 6 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 7 tvp = NULL; /* wait forever */ } }
timeLeftToScheduledJobTime
long now_sec, now_ms
tvp->tv_sec
AE_DONT_WAIT
选项,分出2个分支,一个设为0,一个设为null。说到网络编程中的多路复用,select几乎是绕不开的话题,在没有epoll以前,基本就是使用select。固然,select有它的缺点,那就是:
下面,你们看看select的api,你们也能够自行在linux机器上执行:man select 查看。
select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
上面的第一行,是select的简单说明,其中一个词是,synchronous,同步的意思,说明select是同步的,不是异步的,只是进行了io多路复用。
下面那个是其api,简单解释三个参数:
参数fd_set *readfds
Those listed in readfds will be watched to see if characters become available for reading (more
precisely, to see if a read will not block
也就是说,这个集合中的fd,会被监测,看看哪些fd能够无阻塞地读取;怎么才能无阻塞地读取,那确定是这个fd的输入缓冲区有内容啊,好比,客户端发了数据过来
参数fd_set *writefds
those in writefds will be watched to see if a write will not block
这个集合,会被监测,看看是否能够对这个fd,进行无阻塞地写;何时,不能无阻塞地写呢?确定是缓冲区满了的时候。这种应该常见于:给对端发数据时,对方一直不ack这些数据,因此我方的缓冲区里,一直不能删这些数据,致使缓冲区满。
struct timeval *timeout
The timeout argument specifies the minimum interval that select() should block waiting for a file descriptor to become ready. If both fields of the timeval structure are zero, then select() returns immediately. (This is useful for polling.) If timeout is NULL (no timeout), select() can block
indefinitely.
这个timeout参数,指定了select()操做,等待文件描述符变成ready过程当中,须要等待多长时间。若是这个timeout的两个字段,都被设为了0,则select()会立刻返回。若是timeout是null,这个操做会无限阻塞。
因此,select我就算你们了解了,其中的timeout参数,简单来讲,就是调用select时,最大阻塞多久就要返回。
若是设为0,则立刻返回;若是为null,则无限阻塞;若是为正常的大于0的值,则阻塞对应的时长。
和前面的部分,联系起来,就是说:
有的函数,天生适合拿来说课。epoll,kqueue等,会单独拿来说。
// 1 处理文件事件,阻塞时间由 tvp 决定,tvp:timevalue pointer numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 2 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop, fd, fe->clientData, mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop, fd, fe->clientData, mask); } processed++; }
1处,这里就会根据当前的操做系统,决定调用select或是epoll,或是其余的实现。(经过条件编译实现)。
假设这里的底层实现,就是前面讲的select函数,那么,select函数执行完后,eventLoop->fired 属性,就会存放此次select筛选出来的那些,ready的文件描述符集合。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /** * 拷贝到带_的变量中 */ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
如上所示,1处,调用select;2处,赋值给fired。
2处,从fired中取出对应的文件描述符
3处,若是fired中的文件描述符,可读,则执行对应的函数指针rfileProc指向的函数
4处,若是fired中的文件描述符,可写,则执行对应的函数指针wfileProc指向的函数
/* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
这里会调用processTimeEvents,其实现以下,其中涉及到复杂的时间计算,咱们能够只看核心流程:
/* Process time events * * 处理全部已到达的时间事件 */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); // 更新最后一次处理时间事件的时间 eventLoop->lastTime = now; // 遍历链表 // 执行那些已经到达的事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId - 1; while (te) { long now_sec, now_ms; long long id; // 获取当前时间 aeGetTime(&now_sec, &now_ms); // 若是当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; //1 执行事件处理器,并获取返回值 retval = te->timeProc(eventLoop, id, te->clientData); processed++; // 记录是否有须要循环执行这个事件时间 if (retval != AE_NOMORE) { // 2 是的, retval 毫秒以后继续执行这个时间事件 aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms); } else { // 不,将这个事件删除 aeDeleteTimeEvent(eventLoop, id); } // 由于执行事件以后,事件列表可能已经被改变了 // 所以须要将 te 放回表头,继续开始执行事件 te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
1处,执行timeProc这个函数指针,执行的函数,在初始化的时候,这个指针,被赋值为serverCron;
初始化时,会调用一下代码:
// 为 serverCron() 建立时间事件 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); }
这里的serverCron,是一个函数指针。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { // 更新时间计数器 long long id = eventLoop->timeEventNextId++; // 建立时间事件结构 aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 设置 ID te->id = id; // 设定处理事件的时间 aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms); // 1 设置事件处理器 te->timeProc = proc; te->finalizerProc = finalizerProc; // 设置私有数据 te->clientData = clientData; // 将新事件放入表头 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
上面的1处,将传入的serverCron,赋值给了te->timeProc。
2处,注册下一次的周期任务
本讲主要讲解了主循环的最外层结构,若是有什么不清楚的,能够留言。