baiyanc++
首先看一张咱们很是熟悉的redis命令执行图:
那么思考这样一个问题,当咱们链接了redis服务端以后,而后输入并执行某条redis命令:如set key1 value1。这条命令到底是如何被发送到redis服务端的,redis服务端又是如何解析,并做出相应处理,并返回执行成功的呢?redis
redis在TCP协议基础之上,封装了本身的一套协议规范,方便服务端与客户端去接收与解析数据,划清命令参数之间的边界,方便最终对以TCP字节流传输的数据进行处理。下面咱们使用tcpdump来捕获redis-cli发送命令时的数据包:算法
tcpdump port 6379 -i lo -X
这时,咱们在客户端中输入set key1 value1命令。在tcpdump中捕获的数据包以下:
第一个是客户端发送命令到redis服务端时的数据包,而第二个是redis服务端响应给客户端的数据包。咱们首先只看第一个数据包,它从客户端43856端口发送到redis服务端的6379端口。首先前20个字节是IP头部,后32字节是TCP头部(因为TCP头部后面存在可选项)。
咱们主要关注从“2a33”开始的数据信息,从这里开始就是redis具体的数据格式了。从右边对数据的一个ASCII码翻译也能够看到set、key一、value1的字样,中间还有一些用.表示的字符,那么这里,咱们根据抓包结果分析一下redis数据传输的协议格式。segmentfault
看到这里,咱们是否可以发现如下规律:数组
- redis以"*"做为标志,表示命令的开始。在*后面紧跟的数字表明参数的个数(set key1 value1有3个参数因此为3)
- redis以"$"做为命令参数的开始,后面紧跟的数字表明参数的长度(如key1的长度为4因此为$4)
- redis以"rn"做为参数之间的分隔符,方便解析TCP字节流数据时定位边界位置
综合来看,客户端向服务端发送的redis数据包格式以下:缓存
*3 \r\n set \r\n $4 \r\n key1 \r\n $6 \r\n value1 \r\n
相比FastCGI协议,redis仅仅使用几个分隔符和特殊字符,就完成了对命令的传输语法及数据格式的规范化,同时服务端经过其中定义好的分隔符,也可以方便高效地从字节流数据中解析并读取出正确的数据。这种通讯协议简单高效,可以知足redis对高性能的要求。安全
既然命令已经经过redis数据传输协议安全地送达到了服务端,那么,服务端就要开始对传输过来的字节流数据进行处理啦。因为咱们在协议中清晰地定义了每一个参数的边界(\r\n),因此,redis服务端解析起来也很是轻松。网络
redis是典型事件驱动程序。为了提升单进程的redis的性能,redis采用IO多路复用技术来处理客户端的命令请求。redis会在建立客户端实例的时,指定服务端接收到客户端命令请求的事件时,所要执行的事件处理函数:app
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); if (fd != -1) { anetNonBlock(NULL,fd); //设置非阻塞 anetEnableTcpNoDelay(NULL,fd); //设置不采用Nagle算法,避免半包与粘包现象 if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); //设置keep-alive //注意这里建立了一个文件事件。当客户端读事件就绪的时候,回调readQueryFromClient()函数 if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... }
为了暂存客户端请求到服务端的字节流数据,redis封装了一个接收缓冲区,来缓存从套接字中读取的数据。后续的命令处理流程从缓冲区中读取命令数据并处理便可。缓冲区的好处在于不用一直维持读写套接字。在后续的流程中,咱们只须要从缓冲区中读取数据,而不是仍从套接字中读取。这样就能够提早释放套接字,节省资源。缓冲区的创建与使用就是在以前讲过的客户端回调函数readQueryFromClient()中完成的:tcp
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ... qblen = sdslen(c->querybuf); //获取缓冲区长度 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //建立一个sds结构做为缓冲区 nread = read(fd, c->querybuf+qblen, readlen); //从套接字中读取数据到缓冲区中暂存 ... //真正地处理命令 processInputBufferAndReplicate(c); }
这段代码建立并往缓冲区中写入了字节流数据,而后调用processInputBufferAndReplicate()去真正地处理命令。processInputBufferAndReplicate()函数中只是简单的调用了==processInputBuffer()函数。因为咱们以前的缓冲区中已经有了客户端发给服务端的字节流数据,因此咱们须要在这一层进行数据初步的筛选与处理:
void processInputBuffer(client *c) { // 若是缓冲区尚未处理完,继续循环处理 while(c->qb_pos < sdslen(c->querybuf)) { ... // 对字节流数据进行定制化分发处理 if (c->reqtype == PROTO_REQ_INLINE) { //若是是INLINE类型的请求 if (processInlineBuffer(c) != C_OK) break; //调用processInlineBuffer解析缓冲区数据 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {//若是是MULTIBULK类型的请求 if (processMultibulkBuffer(c) != C_OK) break; //调用processMultibulkBuffer解析缓冲区数据 } else { serverPanic("Unknown request type"); } // 开始处理具体的命令 if (c->argc == 0) { //命令参数为0个,非法 resetClient(c); } else { //命令参数不为0,合法 // 调用processCommand()真正处理命令 if (processCommand(c) == C_OK) { // ... } } } }
看到这里,读者可能会有些疑惑。什么是INLINE、什么是MULTIBULK?在redis中,有两种请求命令类型:
- INLINE类型:简单字符串格式,如ping命令
- MULTIBULK类型:字符串数组格式。如set、get等等大部分命令都是这种类型
这个函数其实就是一个分发器。因为底层的字节流数据是无规则的,因此咱们须要根据客户端的reqtype字段,去区分请求字节流数据属于那种请求类型,进而分发到对应的函数中进行处理。因为咱们常常执行的命令都是MULTIBULK类型,咱们也以MULTIBULK类型为例。对于set、get这种MULTIBULK请求类型,会被分发到processMultibulkBuffer()函数中进行处理。
在开启TCP的Nagle算法时,TCP会将多个redis命令请求的数据包合并或者拆分发送。这样就会出如今一个数据包中命令不完整、或者一个数据包中包含多个命令的状况。为了解决这个问题,processMultibulkBuffer()函数保证,当只有在缓冲区中包含一个完整请求时,这个函数才会成功解析完字节流中的命令参数,并返回成功状态码。不然,会break出外部的while循环,等待下一次事件循环再从套接字中读取剩余的数据,再进行对命令的解析。这样就保证了redis协议中的数据的完整性,也保证了实际命令参数的完整性。
int processMultibulkBuffer(client *c) { while(c->multibulklen) { ... /* 读取命令参数字节流 */ if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { //若是$后面表明参数长度的数字与实际命令长度不匹配(+2的位置是\r\n),说明数据不完整,直接跳出循环,等待下一次读取剩余数据 break; } else { //命令完整,进行一些执行命令以前的初始化工做 if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); sdsIncrLen(c->querybuf,-2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); } else { c->argv[c->argc++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); c->qb_pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; //处理下一个命令参数 } } }
咱们回到外层。当咱们成功执行processMultibulkBuffer()函数以后,说明当前命令已经完整,能够对命令进行处理了。咱们想一下,加入要咱们去设计根据不一样的命令,调用不一样的处理函数,从而完成不一样的功能,咱们应该怎么作呢?想了想,咱们能够简单写出如下代码:
if (command == "get") { doGetCommand(); //get命令处理函数 } else if (command == "set") { doSetCommand(); //set命令处理函数 } else { printf("非法命令") }
以上代码很是简单,只是根据咱们获得的不一样命令请求,分发到不一样的命令处理函数中进行定制化处理。那么redis其实也是一样的道理,那究竟redis是怎么作的呢:
int processCommand(client *c) { //若是是退出命令直接返回 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //去字典里查找命令,并把要执行的命令处理函数赋值到c结构体中的cmd字段 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 返回值校验 if (!c->cmd) { //没有找到该命令 flagTransaction(c); sds args = sdsempty(); int i; for (i=1; i < c->argc && sdslen(args) < 128; i++) args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", (char*)c->argv[0]->ptr, args); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || //命令参数不匹配 (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } // 真正执行命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { //真正执行命令 call(c,CMD_CALL_FULL); //核心函数 c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; }
在这个函数中,最重要的就是lookupCommand()函数和call()函数的调用了。在redis中,全部命令都存储在一个字典中,这个字典长这样:
struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0}, {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}, {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0}, {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0}, {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, ... };
咱们能够看到,这个字典是全部命令的集合,咱们调用lookupCommand就是从这里获取命令及命令的相关信息的。它是一个结构体数组,包含全部命令名称、命令处理函数、参数个数、以及种种标记。其实这里就至关于一个配置信息的维护,以及命令道处理函数名称的映射关系,从而很好的解决了咱们一开始使用if-else来分发命令处理函数的难以维护、可扩展性差的问题。
在咱们成功在字典中找到一个命令的处理函数以后,咱们只须要去调用相应的命令处理函数就好啦。上面最后的call()函数中就对相应的命令处理函数进行了调用,并返回调用结果给客户端。好比,setCommand()就是set命令的实际处理函数:
void setCommand(client *c) { int j; robj *expire = NULL; int unit = UNIT_SECONDS; int flags = OBJ_SET_NO_FLAGS; for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_XX)) { flags |= OBJ_SET_NX; } else if ((a[0] == 'x' || a[0] == 'X') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_NX)) { flags |= OBJ_SET_XX; } else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_PX) && next) { flags |= OBJ_SET_EX; unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == 'p' || a[0] == 'P') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_EX) && next) { flags |= OBJ_SET_PX; unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); return; } } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); }
这个函数首先对NX、EX参数进行了判断及处理,最终调用了setGenericCommand(),来执行set命令的通用逻辑部分:
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } setKey(c->db,key,val); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
最终会调用addReply()通用返回函数,应该是要把执行结果返回给客户端了。咱们看看该函数里面作了些什么:
void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyStringToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
咱们仔细阅读这段代码,好像并无找到执行结果是何时返回给客户端的。在这个函数中,只是将返回结果添加到了输出缓冲区中,一个命令就执行完了。那么到底是何时返回的呢?是否还记得在介绍开启事件循环时,提到函数beforesleep()会在每次事件循环阻塞等待文件事件以前执行,主要执行一些不是很费时的操做,好比过时键删除操做,向客户端返回命令回复等。这样,就能够减节省返回执行结果时的网络通讯开销,将同一个客户端上的多个命令的屡次返回,对多个命令作一个缓存,最终一次性统一返回,减小了返回的次数,提升了性能。
执行完set key1 value1命令以后,咱们获得了一个"OK"的返回,表明命令执行成功。其实咱们仔细观察上面返回的第二个数据包,其实底层是一个"+OK"的返回值。那么为何要有一个+号呢?由于除了咱们上面讲过的set命令,还有get命令、lpush命令等等,他们的返回值都是不同的。get会返回数据集合、lpush会返回一个整数,表明列表的长度等等。一个字符串的表示是远远不能知足须要的。因此在redis通讯协议中,一共定义了五种返回值结构。客户端经过每种返回结构的第一个字符,来判断是哪一种类型的返回值:
- 状态回复:第一个字符是“+”;例如,SET命令执行完毕会向客户端返回“+OK\r\n”。
- 错误回复:第一个字符是“-”;例如,当客户端请求命令不存在时,会向客户端返回“-ERR unknown command 'testcmd'”。
- 整数回复:第一个字符是“:”;例如,INCR命令执行完毕向客户端返回“:100\r\n”。
- 批量回复:第一个字符是“$”;例如,GET命令查找键向客户端返回结果“$5\r\nhello\r\n”,其中$5表示返回字符串长度。
- 多条批量回复:第一个字符是“”;例如,LRANGE命令可能会返回多个多个值,格式为“3\r\n$6\r\nvalue1\r\n$6rnvalue2rn$6\r\nvalue3\r\n”,与命令请求协议格式相同,“\*3”表示返回值数目,“$6”表示当前返回值字符串长度,多个返回值用“\r\n”分隔开。
咱们执行set命令就是第一种类型,即状态回复。客户端经过+号,就可以知道这是状态回复,从而就知道该如何读取后面的字节流内容了。
至此,咱们就走完了一个redis命令的完整生命周期,同时也了解了redis通讯协议的格式与规范。接下来,我将会深刻每个命令的实现,你们加油。