有序集SortedSet算是redis中一个颇有特点的数据结构,经过这篇文章来总结一下这块知识点。redis
原文地址:http://www.jianshu.com/p/75ca...数组
redis中的有序集,容许用户使用指定值对放进去的元素进行排序,而且基于该已排序的集合提供了一系列丰富的操做集合的API。
举例以下:数据结构
//添加元素,table1为有序集的名字,100为用于排序字段(redis把它叫作score),a为咱们要存储的元素 127.0.0.1:6379> zadd table1 100 a (integer) 1 127.0.0.1:6379> zadd table1 200 b (integer) 1 127.0.0.1:6379> zadd table1 300 c (integer) 1 //按照元素索引返回有序集中的元素,索引从0开始 127.0.0.1:6379> zrange table1 0 1 1) "a" 2) "b" //按照元素排序范围返回有序集中的元素,这里用于排序的字段在redis中叫作score 127.0.0.1:6379> zrangebyscore table1 150 400 1) "b" 2) "c" //删除元素 127.0.0.1:6379> zrem table1 b (integer) 1
在有序集中,用于排序的值叫作score,实际存储的值叫作member。函数
因为有序集中提供的API较多,这里只举了几个常见的,具体能够参考redis文档。源码分析
关于有序集,咱们有一个十分常见的使用场景就是用户评论。在APP或者网站上发布一条消息,下面会有不少评论,一般展现是按照发布时间倒序排列,这个需求就可使用有序集,以发布评论的时间戳做为score,而后按照展现评论的数量倒序查找有序集。网站
老规矩,咱们仍是从server.c文件中的命令表中找到相关命令的处理函数,而后一一分析。
依旧从添加元素开始,zaddCommand函数:spa
void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); }
这里能够看到流程转向了zaddGenericCommand,而且传入了一个模式标记。
关于SortedSet的操做模式这里简单说明一下,先来看一条完整的zadd命令:code
zadd key [NX|XX] [CH] [INCR] score member [score member ...]
其中的可选项咱们依次看下:server
NX表示若是元素存在,则不执行替换操做直接返回。blog
XX表示只操做已存在的元素。
CH表示返回修改(包括添加,更新)元素的数量,只能被ZADD命令使用。
INCR表示在原来的score基础上加上新的score,而不是替换。
上面代码片断中的ZADD_NONE表示普通操做。
接下来看下zaddGenericCommand函数的源码,很长,耐心一点点看:
void zaddGenericCommand(client *c, int flags) { //一条错误提示信息 static char *nanerr = "resulting score is not a number (NaN)"; //有序集名字 robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; //记录元素操做个数 int added = 0; int updated = 0; int processed = 0; //查找score的位置,默认score在位置2上,但因为有各类模式,因此须要判断 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; //判断命令中是否设置了各类模式 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } //设置模式 int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; //经过上面的解析,scoreidx为真实的初始score的索引位置 //这里客户端参数数量减去scoreidx就是剩余全部元素的数量 elements = c->argc - scoreidx; //因为有序集中score,member成对出现,因此加一层判断 if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } //这里计算score,member有多少对 elements /= 2; //参数合法性校验 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } //参数合法性校验 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } //这里开始解析score,先初始化scores数组 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { //填充数组,这里注意元素是成对出现,因此各个score之间要隔一个member if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } //这里首先在client对应的db中查找该key,即有序集 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { //没有指定有序集且模式为XX(只操做已存在的元素),直接返回 if (xx) goto reply_to_client; //根据元素数量选择不一样的存储结构初始化有序集 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { //哈希表 + 跳表的组合模式 zobj = createZsetObject(); } else { //ziplist(压缩链表)模式 zobj = createZsetZiplistObject(); } //加入db中 dbAdd(c->db,key,zobj); } else { //若是ZADD操做的集合类型不对,则返回 if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } //这里开始往有序集中添加元素 for (j = 0; j < elements; j++) { double newscore; //取出client传过来的score score = scores[j]; int retflags = flags; //取出与之对应的member ele = c->argv[scoreidx+1+j*2]->ptr; //向有序集中添加元素,参数依次是有序集,要添加的元素的score,要添加的元素,操做模式,新的score int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); //添加失败则返回 if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } //记录操做 if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; //设置新score值 score = newscore; } //操做记录 server.dirty += (added+updated); //返回逻辑 reply_to_client: if (incr) { if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { addReplyLongLong(c,ch ? added+updated : added); } //清理逻辑 cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
代码有点长,来张图看一下存储结构:
注:每一个entry都是由score+member组成
有了上面的结构图之后,能够想到删除操做应该就是根据不一样的存储结构进行,若是是ziplist就执行链表删除,若是是哈希表+跳表结构,那就要把两个集合都进行删除。真实逻辑是什么呢?
咱们来看下删除函数zremCommand的源码,相对短一点:
void zremCommand(client *c) { //获取有序集名 robj *key = c->argv[1]; robj *zobj; int deleted = 0, keyremoved = 0, j; //作校验 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; for (j = 2; j < c->argc; j++) { //一次删除指定元素 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++; //若是有序集中所有元素都被删除,则回收有序表 if (zsetLength(zobj) == 0) { dbDelete(c->db,key); keyremoved = 1; break; } } //同步操做 if (deleted) { notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); signalModifiedKey(c->db,key); server.dirty += deleted; } //返回 addReplyLongLong(c,deleted); }
看下具体的删除操做源码:
//参数zobj为有序集,ele为要删除的元素 int zsetDel(robj *zobj, sds ele) { //与添加元素相同,根据不一样的存储结构执行不一样的删除逻辑 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; //ziplist是一个简单的链表删除节点操做 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { zobj->ptr = zzlDelete(zobj->ptr,eptr); return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; de = dictUnlink(zs->dict,ele); if (de != NULL) { //查询该元素的score score = *(double*)dictGetVal(de); //从哈希表中删除元素 dictFreeUnlinkedEntry(zs->dict,de); //从跳表中删除元素 int retval = zslDelete(zs->zsl,score,ele,NULL); serverAssert(retval); //若是有须要则对哈希表进行resize操做 if (htNeedsResize(zs->dict)) dictResize(zs->dict); return 1; } } else { serverPanic("Unknown sorted set encoding"); } //没有找到指定元素返回0 return 0; }
最后看一个查询函数zrangeCommand源码,也是很长,汗~~~,不过放心,有了上面的基础,大体也能猜到查询逻辑应该是什么样子的:
void zrangeCommand(client *c) { //第二个参数,0表示顺序,1表示倒序 zrangeGenericCommand(c,0); } void zrangeGenericCommand(client *c, int reverse) { //有序集名 robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; int llen; int rangelen; //参数校验 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; //根据参数附加信息判断是否须要返回score if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c,shared.syntaxerr); return; } //有序集校验 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //索引值重置 llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; //返回空集 if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1; //返回给客户端结果长度 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); //一样是根据有序集的不一样结构执行不一样的查询逻辑 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; //根据正序仍是倒序计算起始索引 if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); else eptr = ziplistIndex(zl,2*start); serverAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); //注意嵌套的ziplistGet方法就是把eptr索引的值读出来保存在后面三个参数中 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); //返回value if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen); //若是须要则返回score if (withscores) addReplyDouble(c,zzlGetScore(sptr)); //倒序从后往前,正序从前日后 if (reverse) zzlPrev(zl,&eptr,&sptr); else zzlNext(zl,&eptr,&sptr); } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; sds ele; //找到起始节点 if (reverse) { ln = zsl->tail; if (start > 0) ln = zslGetElementByRank(zsl,llen-start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl,start+1); } //遍历并返回给客户端 while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); ele = ln->ele; addReplyBulkCBuffer(c,ele,sdslen(ele)); if (withscores) addReplyDouble(c,ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); } }
上面就是关于有序集SortedSet的添加,删除,查找的源码。能够看出SortedSet会根据存放元素的数量选择ziplist或者哈希表+跳表两种数据结构进行实现,之因此源码看上去很长,主要缘由也就是要根据不一样的数据结构进行不一样的代码实现。只要掌握了这个核心思路,再看源码就不会太难。
有序集的逻辑不难,就是代码有点长,涉及到ziplist,skiplist,dict三套数据结构,其中除了常规的dict以外,另外两个数据结构内容都很多,准备专门写文章进行总结,就不在这里赘述了。本文主要目的是总结一下有序集SortedSet的实现原理。