上篇咱们简单介绍了 redis 客户端的一些基本概念,包括其 client 数据结构中对应的相关字段的含义,本篇咱们结合这些,来分析分析 redis 服务端程序是如何运行的。一条命令请求的完成,客户端服务端都经历了什么?服务端程序中定时函数 serverCron 都有哪些逻辑?java
咱们日常最简单的一个 redis 客户端命令,redis-cli,这个命令会致使咱们的客户端向服务端发起一个 connect 链接操做,具体就是如下几个步骤。git
一、网络链接程序员
第一步是网络链接,也就是咱们的客户端会与服务端进行 TCP 三次握手,并指明使用 socket 通讯协议。github
接着服务端 redis 使用 epoll 事件机制监听端口的读事件,一旦事件可读则断定是有客户端尝试创建链接,服务端会检查最大容许链接数是否到达,若是达到则拒绝创建链接,不然服务端会建立一个 fd 文件描述符并返回给客户端,表明链接成功创建。redis
二、更新客户端链接信息数据库
以前介绍 redis 客户端的时候,咱们说过 redisServer 中有这么一个字段:api
struct redisServer {
........
list *clients; /* List of active clients */
........
}
复制代码
clients 字段是一个双端链表结构,保存了全部成功创建链接的客户端 client 信息,那么咱们第二步就是建立一个 client 结构的客户端抽象实例并添加到 redisServer 结构 clients 链表中。数组
三、为新客户端注册读事件bash
每个客户端链接都对应一个 fd 文件描述符,咱们只须要监听这个文件描述符的读事件,便可判断该套接字上是否有信息发送过来。服务器
这里也同样,咱们经过注册该 fd 的读事件,当该客户端发送信息给服务端时,咱们无需去轮询便可发现该客户端在请求服务端的动做,继而服务端程序解析命令。
redis 服务端程序启动后,会初始化一些字段变量,为 redisServer 中的一些字段赋默认值,还会读取用户指定的配置文件内容并加载配置,反应到具体数据结构内,最后会调用 asMain 函数进行事件循环监听。
每当客户端发起链接请求,或者发送命令过来,这里的事件分发器就会监听到套接字的可读事件,因而找到可读事件所绑定的事件处理器 readQueryFromClient,并调用它。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; ........ //读取客户端输入缓冲区大小 qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //从 fd 文件描述符对应的 socket 中读取命令数据 //保存进 querybuf 输入缓冲区 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { //异常返回 return; } else { //异常释放客户端链接 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { //客户端已经关闭、释放客户端 serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; //若是输入缓冲区长度超过系统设置最大长度,释放客户端 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { size_t prev_offset = c->reploff; //这里会读取缓冲区写入的命令 processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } 复制代码
总的来讲,readQueryFromClient 主要完成的就是将 socket 中发来的命令读取到客户端输入缓冲区,而后调用 processInputBuffer 处理缓冲区中的命令。
void processInputBuffer(client *c) { server.current_client = c; while(sdslen(c->querybuf)) { if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; if (c->flags & CLIENT_BLOCKED) break; if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; //判断请求类型 if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } } //根据不一样的请求类型,执行命令解析 //实际上就是把命令的名称、参数解析存入 argc 数组中 if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } if (c->argc == 0) { resetClient(c); } else { //查找执行命令 if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { c->reploff = c->read_reploff - sdslen(c->querybuf); } if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } if (server.current_client == NULL) break; } } server.current_client = NULL; } 复制代码
processCommand 函数会从客户端实例命令参数字段中拿到命令的名称、参数类型、参数值等等信息。redisServer 在成功启动后,会调用 populateCommandTable 方法初始化 redisCommandTable,存入一个字典集合。
每个 redisCommand 是这么一个数据结构:
struct redisCommand {
//命令名称
char *name;
//函数指针,指向一个具体实现
redisCommandProc *proc;
//参数个数
int arity;
//命令的类型,写命令?读命令?等
char *sflags;
int flags;
redisGetKeysProc *getkeys_proc;
int firstkey;
int lastkey;
int keystep;
//服务器启动后共调用该命令次数
//服务器启动后执行该命令耗时总
long long microseconds, calls;
};
复制代码
processCommand 最后会找到命令,进而执行命令,并将命令执行的结果写入客户端输出缓冲区,并将响应写回客户端。以上就是 redis 对于一条命令请求的执行过程,随着咱们的不断学习,以上内容会不断深刻,如今你能够理解的大概就好。
redis 能够说是事件驱动中间件,它主要有两种事件,文件事件和时间事件,文件事件咱们就很少说,时间事件主要分为两种,一种是定时事件,另外一种周期事件。
定时事件指的是,预约的程序将会在某个具体的时间节点执行。周期事件是指,预约程序每隔某个时间间隔就会被调用执行。
而咱们的 serverCron 显然是一个周期时间事件,在正式分析其源码实现以前,咱们先来看看它的前世今身,在哪里被注册,又是如何被调用的。
void initServer(void) { 。。。。。 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } 。。。。。 } 复制代码
咱们 redis 服务器启动初始化时,会调用 aeCreateTimeEvent 绑定一个 serverCron 的时间事件。
这是 redis 中事件循环结构
typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop; 复制代码
其中指针 timeEventHead 是一个双端链表,全部的时间事件都会以链表的形式存储在这里,具体指向的结构是 aeTimeEvent。
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
//下一次何时被执行(单位秒)
long when_sec; /* seconds */
//下一次何时被执行(单位毫秒)
long when_ms; /* milliseconds */
//时间事件处理函数
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
//先后链表指针
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
复制代码
serverCron 在这里会被建立并添加到时间事件链表中,并设置它下一次执行时间为当前时间,具体你能够自行深刻查看调用栈,那么下一次时间事件检查的时候,serverCron 就必定会被执行。
好了,至此 serverCron 已经注册进 redis 的时间事件结构中,那么何时检查并调用呢?
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } 复制代码
还记的咱们 redis 成功启动后,会进入主事件循环中吗?aeProcessEvents 里面具体不一行行带你们分析了,咱们挑相关的进行分析。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { 。。。。。 //遍历整个时间事件链表,找到最快要被执行的任务 //计算与当前时间的差值 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; //记录差值保存进变量 tvp if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { //已经错过执行该时间事件,tvp 赋零 tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { tvp = NULL; /* wait forever */ } } //aeApiPoll 会处理文件事件,最长 tvp 时间就要返回 numevents = aeApiPoll(eventLoop, tvp); 。。。。。 //检查处理时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); } 复制代码
你看,实际上尽管咱们对周期时间事件指定了严格的执行间隔,但实际上大多数状况下,时间事件会晚于咱们既定时间节点。
processTimeEvents 函数检查全部时间事件函数,若是有符合条件应该获得执行的,会当即执行该事件处理器,并根据事件处理器返回的状态,删除时间事件或设置下一次执行时间。
static int processTimeEvents(aeEventLoop *eventLoop) { 。。。。。。 //获取当前时间 aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; if (retval != AE_NOMORE) { //这是一个周期执行的时间事件,设置下次执行时间 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { //删除事件 te->id = AE_DELETED_EVENT_ID; } } te = te->next; } 复制代码
以上,你应该了解到 serverCron 什么时候注册的,什么时候被执行,通过了哪些过程。下面咱们具体看 serverCron 的内容。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; UNUSED(eventLoop); UNUSED(id); UNUSED(clientData); if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); //更新 server.unixtime 和 server.mxtime updateCachedTime(); //每间隔 100 毫秒,统计一次这段时间内命令的执行状况 run_with_period(100) { trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); } 。。。。。。 } 复制代码
其中 run_with_period 为何能作到显式控制 100 毫秒内只执行一次呢?
其实 run_with_period 的宏定义以下:
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))) 复制代码
server.hz 是 redisServer 结构中的一个字段,能够容许咱们经过配置文件进行调节,它是一个整数,描述服务 serverCron 在一秒内执行 N 次。server.cronloops 描述服务器自启动以来,共执行 serverCron 次数。
那么,1000/server.hz 描述的就是 serverCron 每间隔多少毫秒就须要被执行,若是咱们传入的 ms 小于这个间隔,返回 1 并立马执行后续函数体。或者根据 serverCron 已经执行的次数,计算间隔时间是否达到,返回 0 或 1。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 。。。。。 //更新全局 lru 时钟,这个用于每一个 redis 对象最长未访问淘汰策略 unsigned long lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock); //不断比较当前内存使用量,存储最高峰值内存使用量 if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory(); server.resident_set_size = zmalloc_get_rss(); // 若是收到了SIGTERM信号,尝试退出 if (server.shutdown_asap) { if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; } 。。。。。。 } 复制代码
lru 后面咱们会继续说的,redis 维护一个全局 lru 时钟参照,每一个 redisObject 结构中也会有一个本身的 lru 时钟,它记录的是上一次访问该对象时的时钟,这些信息会用于键值淘汰策略。因此,服务器会定时的更新这个全局 lru 时钟,保证它准确。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 。。。。。 //每间隔五秒,输出非空数据库中的相关属性信息 run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } //若是不是sentinel模式,则每5秒输出一个connected的client的信息到log if (!server.sentinel_mode) { run_with_period(5000) { serverLog(LL_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } 。。。。。。 } 复制代码
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。。
clientsCron();
databasesCron();
。。。。。。
}
复制代码
clientsCron 会检查有哪些客户端链接超时并将他们释放,还会检查客户端的输入缓冲区 querybuff 是否太大,或者该客户端不是很活跃,那么会释放掉该客户端的输入缓冲区并从新建立一个默认大小的。
databasesCron 会首先随机遍历全部的数据库并抽取 expired 集合中部分键,判断是否过时并执行相应的删除操做。除此以外,该函数还会随机访问部分数据库,并根据其状态触发 rehash。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 。。。。 //若是服务没有在执行 rdb 备份生成,也没有在 aof 备份生成 //而且有被延迟的 aof rewrite,那么这里会执行 //当服务器正在进行 BGSAVE 备份的期间,全部的 rewrite 请求都会被延迟 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } //若是有 rdb 子进程或 aof 子进程 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); //子进程 id 等于负一,说明子进程退出或异常,记录日志 if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { //pid 指向 rdb 子进程 id //判断若是子进程退出了,进行一些后续的 rdb 操做 //更新 dirty,lastsave 时间等等 backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { //pid 指向 aof 子进程 id //aof 子进程退出,处理其后续的一些收尾 backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } } updateDictResizePolicy(); closeChildInfoPipe(); } } else { //这部分咱们前面的文章介绍过 //saveparams 保存了 save 全部的配置项,是一个数组 //这里校验是否达到条件 for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } 。。。。 } 复制代码
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 。。。。 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); //每一秒检查一次上一轮aof的写入是否发生了错误,若是有错误则尝试从新写一次 run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } freeClientsInAsyncFreeQueue(); clientsArePaused(); run_with_period(1000) replicationCron(); run_with_period(100) { if (server.cluster_enabled) clusterCron(); } run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } run_with_period(1000) { migrateCloseTimedoutSockets(); } if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } //增长 serverCron 执行次数 server.cronloops++; return 1000/server.hz; 。。。。 } 复制代码
以上,咱们分析了 serverCron 的内部逻辑,虽说咱们配置上能够指定它执行间隔,可是实际上取决于具体的执行时间,内部逻辑也很多,但愿你能了解了个大概。
好了,这是咱们对于 redis 服务端程序的一点点了解,若是以为我有说不对的地方或者你有更深的理解,也欢迎你加我微信一块儿探讨。
接下来,咱们的 redis 之旅从单击开始步入多机模式,下一篇多机数据库的理~
关注公众不迷路,一个爱分享的程序员。 公众号回复「1024」加做者微信一块儿探讨学习! 每篇文章用到的全部案例代码素材都会上传我我的 github github.com/SingleYam/o… 欢迎来踩!