顺风车运营研发团队 谭淼
跳跃表(skiplist)是一种有序的数据结构,它经过在每一个节点中维持多个指向其余节点的指针,从而达到指向其余节点的目的。在Redis中,有序集合是经过跳跃表和hash实现的。html
1、跳跃表
为了更好的阅读下面的文章,建议先对跳跃表的基本概念进行学习,连接以下:https://www.cnblogs.com/a8457...node
2、数据结构
先看一下与跳跃表有关的数据结构。数组
一、zskiplistNode
zskiplistNode是跳跃表节点,用于存储跳跃表节点。数据结构
typedef struct zskiplistNode { sds ele; //zset元素 double score; //zset分值 struct zskiplistNode *backward; //前一个节点 struct zskiplistLevel { struct zskiplistNode *forward; //后一个节点 unsigned int span; //后一个节点的跨度 } level[]; //zskiplistLevel结构体的数组 } zskiplistNode;
该数据结构如图所示:dom
ele保存的是zset元素,score存储的是zset元素的分值,backward是指向该zskiplistNode的前一个节点,level是一个存储zskiplistLevel结构体的数组,其中zskiplistLevel的数据结构为:函数
其中forward是是指向该zskiplistNode的下一个节点,span是到下一个节点的步长。学习
二、zskiplistspa
zskiplist是跳跃表,用于存储跳跃表的关键信息。3d
typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist;
该数据结构如图所示:指针
header指针指向了跳跃表的头结点,tail指针指向了跳跃表的尾节点,length记录了跳跃表的长度,level记录了跳跃表的层数。
3、跳跃表的初始化
跳跃表的初始化使用的是zslCreate()函数,函数的代码以下所示:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; }
在zslCreate()函数中,首先使用zmalloc()函数进行内存分配,
void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE); //申请size+PREFIX_SIZE大小的空间,PREFIX_SIZE是size_t或者long long的大小 if (!ptr) zmalloc_oom_handler(size);//异常处理 *((size_t*)ptr) = size;//在多申请的空间记录申请空间的大小 update_zmalloc_stat_alloc(size+PREFIX_SIZE);//更新总的使用空间 return (char*)ptr+PREFIX_SIZE; }
zmalloc()是malloc()的封装,是在malloc()的基础上多分配一个size_t或者long long大小的内存,用来存储申请的空间的大小。
后续代码是对zskiplist进行初始化操做,值得一提的是zslCreateNode()函数。
zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; }
根据函数名能够看出,这个函数是在建立第一个zskiplistNode节点,并对其进行初始化。通过上述初始化后,能够得到zskiplist结果以下图所示。
4、跳跃表的插入
跳跃表的插入使用的是zslInsert()函数,该函数以下:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
该函数有一个难点,即zslRandomLevel()函数。该函数的定义为:
int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
首先,该函数中ZSKIPLIST_P的值为0.25,所以表达式ZSKIPLIST_P 0xFFFF的值约为0.25 65535,而表达式random()&0xFFFF是对random()返回的随机数进行对0xFFFF的取余数。所以(random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 返回true的几率约为0.25。因为是while循环,所以返回的level值的几率状况以下表所示:
解决了这个函数,zslInsert()函数便容易许多,下面能够以插入score = 1的节点为例,来进行一次插入的流程。
x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; }
x = zsl->header,如今将x赋值为header;
i = zsl->level-1,因为level的值为1,因此这个for循环能够进入一次;
i =0, zsl->level-1 = 0,两个值相等. 因此rank[0] = 0;
x->level[0]->forward ,因为level[0]->forward的值为null,因此这个while进不去;
update[0] = x,因此如今update[0]的值为header指向的节点。
level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; }
如今假设由zslRandomLevel()返回的level = 1;
zsl->level = 1, 因此这个if进不去。
x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; }
新建一个node节点x, level = 1, score = 1;
for循环能够进入一次,for循环内部的代码为更新update和新节点x的值,更新后如图所示。
for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0];
for条件 i < zsl->level 不知足,所以没法进入for循环,随后修改 x->backward 的值为null。
if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x;
if条件不知足, 进入else,将x赋值给zsl->tail;
随后zsl->length自增1。