本文从代码角度分析Redis 的 zset 结构,但愿经过本文掌握以下内容:java
Redis 中 zset 不是单一结构完成,是跳表和哈希表共同完成node
跳表的实现原理,跳表升维全靠随机算法
跳表中查找、插入、删除的三个口诀数据库
使用场景(简单延时队列、排行榜、简单限流)编程
若是您还不能了然于胸,请继续阅读本文。数组
假设咱们有某个班级全部学生的语文成绩,想统计、查询区间范围、查询单个学生成绩、知足高性能读取这些需求,Redis 的 zset 结构无疑是最好的选择。Redis 提供了丰富的 API。示例:bash
ZADD yuwen 90 s01 89 s03 99 s02 74 s04 97 s05
微信
以 yuwen 为 key 分别存储了 s01 到 s06 共计 6 名学生的分数,咱们能够查询任一学生的成绩数据结构
ZSCORE yuwen s03
app
能够按照排序返回指定区间内的全部元素
ZRANGE yuwen 1 2 withscores
能够访问指定分数区间内的全部元素
ZRANGEBYSCORE yuwen 90 100 withscores
能够统计指定区间内的个数
ZCOUNT yuwen 80 90
zset 结构中,既支持按单个元素查询,又支持范围查询,是如何实现的呢?咱们深刻代码分析,在 Redis 的 t_zset.c 的注释中,提到:
/* ZSETs are ordered sets using two data structures to hold the same elements
* in order to get O(log(N)) INSERT and REMOVE operations into a sorted
* data structure.
*
* The elements are added to a hash table mapping Redis objects to scores.
* At the same time the elements are added to a skip list mapping scores
* to Redis objects (so objects are sorted by scores in this "view").复制代码
翻译过来是 Redis 中有两种数据结构来支持 zset 的功能,一个是 hash table ,一个是 skip list。先来看一下 zset 在代码中的定义:
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;复制代码
dict 是一个hash table ,各类编程语言中都有实现。能够保证 O(1) 的时间复杂度,不作过多解释。咱们继续看 zskiplist 的定义:
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;复制代码
zskiplist 是 Redis 对 skiplist 作了变种,skiplist 就是咱们常说的跳表。
跳表由 William Pugh 于1990年发表的论文 Skip lists: a probabilistic alternative to balanced trees 中被首次提出,查找时间复杂度为平均 O(logN),最差 O(N),在大部分状况下效率可与平衡树相媲美,但实现比平衡树简单的多,跳表是一种典型的以空间换时间的数据结构。
跳表具备如下几个特色:
由许多层结构组成。
每一层都是一个有序的链表。
最底层 (Level 1) 的链表包含全部元素。
若是一个元素出如今 Level i 的链表中,则它在 Level i 之下的链表也都会出现。
每一个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。
跳表的查找会从顶层链表的头部元素开始,而后遍历该链表,直到找到元素大于或等于目标元素的节点,若是当前元素正好等于目标,那么就直接返回它。若是当前元素小于目标元素,那么就垂直降低到下一层继续搜索,若是当前元素大于目标或到达链表尾部,则移动到前一个节点的位置,而后垂直降低到下一层。正由于 Skiplist 的搜索过程会不断地从一层跳跃到下一层的,因此被称为跳跃表。
仍是举个例子,假设连接包含1-10,共10个元素。咱们要找到第9个,须要从 header 遍历,共 9 次才能找到
一次只能比较一个数,最坏的状况下时间复杂度是O(n),若是咱们一次能够比较2个元素就行了:
一次查找2个的话,咱们只找了5次就找到了。因此就有了相似下面的结构,在链表上增长一层减小了元素个数的“链表”:
若是增长两层“链表”,只查找3次就能够找到:
即使是咱们找元素8,也只须要遍历 1->4->7->8,共4次查询。
跳表就是这样的一种数据结构,结点是跳过一部分的,从而加快了查询的速度。以前讲HashMap 中咱们提到,java 8 中当哈希冲突个数大于 7 个的时候,转换为红黑树。跳表跟红黑树二者的算法复杂度差很少,为何Redis要使用跳表而不使用红黑树呢?跳表相对于红黑树,代码简单。若是咱们要查询一个区间里面的值,用平衡树实现可能会麻烦。删除一段区间时,若是是平衡树,就会至关困难,毕竟涉及到树的平衡问题,而跳表则没有这种烦恼。
整个查询过程,能够简化理解为 if (下一个是否大于结果) 下一个 else 下一层
Redis 中的对 skiplist 作了些改造:
增长了后驱指针(*backward
)
同时记录value 和 score,且 score 能够重复
第一层维护了双向链表
zset 结构整个类图以下:
zskiplist 中保存的 zskiplistNode 节点定义:
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward; // 指向上一个节点
struct zskiplistLevel {
struct zskiplistNode *forward; // 指向下一个节点
unsigned long span; // 节点以前的跨度
} level[]; // 该节点的各层信息
} zskiplistNode;复制代码
zskiplistNode 中定义了 zskiplistLevel 的数组,用来保存该 node 在每一层的指针。查询跟咱们模拟的例子相似,不在详细描述。重点看一下插入操做:
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
// 计算 span 信息,表示从该节点到下一个节点,须要跳跃多少次
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
// 随机函数
level = zslRandomLevel();
// 若是须要上升层次记录好位置
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}复制代码
插入的时候,首先要进行查询,而后从最底层开始,插入待插入的元素。而后看看从下而上,是否须要逐层插入。但是到底要不要插入上一层呢?咱们要想每层的跳跃都很是高效,那就越是平衡越好(第一层1级跳,第二层2级跳,第3层4级跳,第4层8级跳)。可是用算法实现起来,确实很是地复杂的,而且要严格地按照2地指数次幂,咱们还要对原有地结构进行调整。因此跳表的思路是抛硬币,听天由命,产生一个随机数。Redis 中 25%几率再向上扩展。这样子,每个元素可以有X层的几率为0.25^(X-1)次方。在 Redis 中level初始化时就定义好了,为 32 层。那么,第32层有多少个元素的几率你们能够算一下。
整个插入过程,能够简化理解为:先插入最底层 if (随机几率) 扩展上一层
随机函数:
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}复制代码
对于 zskiplist 的删除操做,能够分为3个步骤:
根据member(obj)和score找到节点的位置(代码里变量x即为该节点,update记录每层x的上一个节点)
调动zslDeleteNode把x节点从skiplist逻辑上删除
释放x节点内存
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}复制代码
整个删除过程,能够简化理解为:先找到,断关联,删内存
在 zset 的建立中(zaddGenericCommand 方法)隐藏这一个逻辑分支:
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}复制代码
Redis 初始化的时候,只判断存储的元素长度是否大于64个字节(server.zset_max_ziplist_entries默认128)。大于64个字节选择zkiplist,不然ziplist。当执行增删改查的方法,根据是ziplist 仍是 zkiplist 选择不一样的实现。关于zkiplist 本文不详细叙述。只须要记住zset 在两种状况下使用 ziplist :
保存的元素个数不足 128 个
单个元素的大小超过 64 bytes
执行增删改查的方法,根据是ziplist 仍是 zkiplist 选择不一样的实现。zkiplist 本文不详细叙述。
至此,咱们介绍了Redis 中 zset 最复杂的跳表部分,结合代码和理解,请思考这4个命令背后都是依赖于什么数据结构的支撑。
ZSCORE yuwen s03
基于哈希表 O(1)复杂度
ZRANGE yuwen 1 2 withscores
基于skiplist和span查找
ZRANGEBYSCORE yuwen 90 100 withscores
基于skiplist和score查找
ZREVRANGE yuwen 1 2 withscores
基于skiplist和score 和 * backward 查找
zset 会按 score 进行排序,若是 score 表明想要执行时间的时间戳。在某个时间将它插入zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间先后进行排序。
起一个死循环线程不断地进行取第一个key值,若是当前时间戳大于等于该key值的socre就将它取出来进行消费删除,能够达到延时执行的目的。
常常浏览技术社区的话,应该对 “1小时最热门” 这类榜单不陌生。如何实现呢?若是记录在数据库中,不太容易对实时统计数据作区分。咱们以当前小时的时间戳做为 zset 的 key,把贴子ID做为 member ,点击数评论数等做为 score,当 score 发生变化时更新 score。利用 ZREVRANGE
或者 ZRANGE
查到对应数量的记录。
滑动窗口是限流常见的一直策略。若是咱们把一个用户的 ID 做为key 来定义一个 zset ,member 或者 score 都为访问时的时间戳。咱们只需统计某个 key 下在指定时间戳区间内的个数,就能获得这个用户滑动窗口内访问频次,与最大经过次数比较,来决定是否容许经过。
以上三种场景的示例代码,在下一篇给出。也欢迎你们思考是否还有其余应用场景。
若是您在微信阅读,请您点击连接 关注我 ,若是您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误