读懂才会用 : 带你见识 Redis 的 zset

快餐车

本文从代码角度分析Redis 的 zset 结构,但愿经过本文掌握以下内容:java

  1. Redis 中 zset 不是单一结构完成,是跳表和哈希表共同完成node

  2. 跳表的实现原理,跳表升维全靠随机算法

  3. 跳表中查找、插入、删除的三个口诀数据库

  4. 使用场景(简单延时队列、排行榜、简单限流)编程

若是您还不能了然于胸,请继续阅读本文。数组

场景案例

假设咱们有某个班级全部学生的语文成绩,想统计、查询区间范围、查询单个学生成绩、知足高性能读取这些需求,Redis 的 zset 结构无疑是最好的选择。Redis 提供了丰富的 API。示例:bash

ZADD yuwen 90 s01 89 s03 99 s02 74 s04 97 s05微信

以 yuwen 为 key 分别存储了 s01 到 s06 共计 6 名学生的分数,咱们能够查询任一学生的成绩数据结构

ZSCORE yuwen s03app

能够按照排序返回指定区间内的全部元素

ZRANGE yuwen 1 2 withscores

能够访问指定分数区间内的全部元素

ZRANGEBYSCORE yuwen 90 100 withscores

能够统计指定区间内的个数

ZCOUNT yuwen 80 90

实现

zset 结构中,既支持按单个元素查询,又支持范围查询,是如何实现的呢?咱们深刻代码分析,在 Redis 的 t_zset.c 的注释中,提到:

/* ZSETs are ordered sets using two data structures to hold the same elements
 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
 * data structure.
 *
 * The elements are added to a hash table mapping Redis objects to scores.
 * At the same time the elements are added to a skip list mapping scores
 * to Redis objects (so objects are sorted by scores in this "view").复制代码

翻译过来是 Redis 中有两种数据结构来支持 zset 的功能,一个是 hash table ,一个是 skip list。先来看一下 zset 在代码中的定义:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;复制代码

dict 是一个hash table ,各类编程语言中都有实现。能够保证 O(1) 的时间复杂度,不作过多解释。咱们继续看 zskiplist 的定义:

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;复制代码

zskiplist 是 Redis 对 skiplist 作了变种,skiplist 就是咱们常说的跳表。

跳表

跳表由 William Pugh 于1990年发表的论文 Skip lists: a probabilistic alternative to balanced trees 中被首次提出,查找时间复杂度为平均 O(logN),最差 O(N),在大部分状况下效率可与平衡树相媲美,但实现比平衡树简单的多,跳表是一种典型的以空间换时间的数据结构。

跳表具备如下几个特色:

  • 由许多层结构组成。

  • 每一层都是一个有序的链表。

  • 最底层 (Level 1) 的链表包含全部元素。

  • 若是一个元素出如今 Level i 的链表中,则它在 Level i 之下的链表也都会出现。

  • 每一个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

跳表的查找会从顶层链表的头部元素开始,而后遍历该链表,直到找到元素大于或等于目标元素的节点,若是当前元素正好等于目标,那么就直接返回它。若是当前元素小于目标元素,那么就垂直降低到下一层继续搜索,若是当前元素大于目标或到达链表尾部,则移动到前一个节点的位置,而后垂直降低到下一层。正由于 Skiplist 的搜索过程会不断地从一层跳跃到下一层的,因此被称为跳跃表。

仍是举个例子,假设连接包含1-10,共10个元素。咱们要找到第9个,须要从 header 遍历,共 9 次才能找到



一次只能比较一个数,最坏的状况下时间复杂度是O(n),若是咱们一次能够比较2个元素就行了:



一次查找2个的话,咱们只找了5次就找到了。因此就有了相似下面的结构,在链表上增长一层减小了元素个数的“链表”:



若是增长两层“链表”,只查找3次就能够找到:


即使是咱们找元素8,也只须要遍历 1->4->7->8,共4次查询。

跳表就是这样的一种数据结构,结点是跳过一部分的,从而加快了查询的速度。以前讲HashMap 中咱们提到,java 8 中当哈希冲突个数大于 7 个的时候,转换为红黑树。跳表跟红黑树二者的算法复杂度差很少,为何Redis要使用跳表而不使用红黑树呢?跳表相对于红黑树,代码简单。若是咱们要查询一个区间里面的值,用平衡树实现可能会麻烦。删除一段区间时,若是是平衡树,就会至关困难,毕竟涉及到树的平衡问题,而跳表则没有这种烦恼。

整个查询过程,能够简化理解为 if (下一个是否大于结果) 下一个 else 下一层


增强版跳表

Redis 中的对 skiplist 作了些改造:

  • 增长了后驱指针(*backward

  • 同时记录value 和 score,且 score 能够重复

  • 第一层维护了双向链表

zset 结构整个类图以下:


zskiplist 中保存的 zskiplistNode 节点定义:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward; //  指向上一个节点
    struct zskiplistLevel {
        struct zskiplistNode *forward;  // 指向下一个节点
        unsigned long span; // 节点以前的跨度
    } level[];  // 该节点的各层信息
} zskiplistNode;复制代码

zskiplistNode 中定义了 zskiplistLevel 的数组,用来保存该 node 在每一层的指针。查询跟咱们模拟的例子相似,不在详细描述。重点看一下插入操做:

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 计算 span 信息,表示从该节点到下一个节点,须要跳跃多少次
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // 随机函数
    level = zslRandomLevel();
    // 若是须要上升层次记录好位置
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}复制代码

插入的时候,首先要进行查询,而后从最底层开始,插入待插入的元素。而后看看从下而上,是否须要逐层插入。但是到底要不要插入上一层呢?咱们要想每层的跳跃都很是高效,那就越是平衡越好(第一层1级跳,第二层2级跳,第3层4级跳,第4层8级跳)。可是用算法实现起来,确实很是地复杂的,而且要严格地按照2地指数次幂,咱们还要对原有地结构进行调整。因此跳表的思路是抛硬币,听天由命,产生一个随机数。Redis 中 25%几率再向上扩展。这样子,每个元素可以有X层的几率为0.25^(X-1)次方。在 Redis 中level初始化时就定义好了,为 32 层。那么,第32层有多少个元素的几率你们能够算一下。

整个插入过程,能够简化理解为:先插入最底层 if (随机几率) 扩展上一层

随机函数:

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}复制代码


对于 zskiplist 的删除操做,能够分为3个步骤:

  • 根据member(obj)和score找到节点的位置(代码里变量x即为该节点,update记录每层x的上一个节点)

  • 调动zslDeleteNode把x节点从skiplist逻辑上删除

  • 释放x节点内存

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}复制代码

整个删除过程,能够简化理解为:先找到,断关联,删内存

在 zset 的建立中(zaddGenericCommand 方法)隐藏这一个逻辑分支:

if (server.zset_max_ziplist_entries == 0 ||
    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
    zobj = createZsetObject();
} else {
    zobj = createZsetZiplistObject();
}复制代码

Redis 初始化的时候,只判断存储的元素长度是否大于64个字节(server.zset_max_ziplist_entries默认128)。大于64个字节选择zkiplist,不然ziplist。当执行增删改查的方法,根据是ziplist 仍是 zkiplist 选择不一样的实现。关于zkiplist 本文不详细叙述。只须要记住zset 在两种状况下使用 ziplist :

  1. 保存的元素个数不足 128 个

  2. 单个元素的大小超过 64 bytes

执行增删改查的方法,根据是ziplist 仍是 zkiplist 选择不一样的实现。zkiplist 本文不详细叙述。

结论

至此,咱们介绍了Redis 中 zset 最复杂的跳表部分,结合代码和理解,请思考这4个命令背后都是依赖于什么数据结构的支撑。

ZSCORE yuwen s03 基于哈希表 O(1)复杂度

ZRANGE yuwen 1 2 withscores 基于skiplist和span查找

ZRANGEBYSCORE yuwen 90 100 withscores 基于skiplist和score查找

ZREVRANGE yuwen 1 2 withscores 基于skiplist和score 和 * backward 查找

使用场景

1. 延时队列

zset 会按 score 进行排序,若是 score 表明想要执行时间的时间戳。在某个时间将它插入zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间先后进行排序。

起一个死循环线程不断地进行取第一个key值,若是当前时间戳大于等于该key值的socre就将它取出来进行消费删除,能够达到延时执行的目的。

2. 排行榜

常常浏览技术社区的话,应该对 “1小时最热门” 这类榜单不陌生。如何实现呢?若是记录在数据库中,不太容易对实时统计数据作区分。咱们以当前小时的时间戳做为 zset 的 key,把贴子ID做为 member ,点击数评论数等做为 score,当 score 发生变化时更新 score。利用 ZREVRANGE 或者 ZRANGE 查到对应数量的记录。

3. 限流

滑动窗口是限流常见的一直策略。若是咱们把一个用户的 ID 做为key 来定义一个 zset ,member 或者 score 都为访问时的时间戳。咱们只需统计某个 key 下在指定时间戳区间内的个数,就能获得这个用户滑动窗口内访问频次,与最大经过次数比较,来决定是否容许经过。

以上三种场景的示例代码,在下一篇给出。也欢迎你们思考是否还有其余应用场景。


关注我

若是您在微信阅读,请您点击连接 关注我 ,若是您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误

相关文章
相关标签/搜索