太长不看版node
- 快速列表是一个元素为压缩列表的双向链表。
- 快速列表是列表对象list的底层实现之一。
- 快速列表是在Redis3.2版本中引入的。
- 快速列表节点中压缩列表的最大字节长度(配置项为负数时)或最多元素个数(配置项为正数时)由配置项 list-max-ziplist-size 决定,默认约束为最大长度8Kb。
- 快速列表提供了选项能够使用LZF压缩算法对中间的节点中的ziplist进行压缩,列表两边多少节点不被压缩由配置项 list-compress-depth 决定,默认对全部节点不进行压缩。
本篇解析基于redis 5.0.0版本,本篇涉及源码文件为quicklist.c, quicklist.h, redic.conf。git
快速列表是一个元素为压缩列表的双向链表。 github
qulicklist是列表对象(list)的底层实现之一,是在Redis3.2中为了兼顾空间效率与时间效率而引入的。 redis
双向链表修改节点效率比较高(复杂度O(1)),可是数据结构元数据占用的空间相对比较大(每一个节点有两个指针占用16个字节)。算法
快速列表在时间效率和空间效率之间作了折中,由双向链表将若干个压缩列表链接在一块儿组成,相比于双向链表更加节省空间,相比于压缩列表操做更加的高效。数据结构
typedef struct quicklistNode {
// 向前指针
struct quicklistNode *prev;
// 向后指针
struct quicklistNode *next;
// ziplist或者是被压缩以后的ziplist(quicklistLZF)
unsigned char *zl;
// ziplist的字节长度,不受压缩影响
unsigned int sz;
// ziplist中包含的元素个数
unsigned int count : 16; /* count of items in ziplist */
// ziplist编码标示,1为原始数据,2为被压缩后数据
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
// 数据存储方式(当前只有ziplist)
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
// 须要再次被压缩标记,压缩节点被解压缩读取后会置为1
unsigned int recompress : 1; /* was this node previous compressed? */
// 测试用例使用
unsigned int attempted_compress : 1; /* node can't compress; too small */
// 预备字段
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
typedef struct quicklistLZF {
// ziplist被压缩以后的字节长度
unsigned int sz; /* LZF size in bytes*/
// ziplist被压缩后的内容
char compressed[];
} quicklistLZF;
typedef struct quicklist {
// 头节点
quicklistNode *head;
// 尾节点
quicklistNode *tail;
// 快速列表中全部元素的个数
unsigned long count;
// 快速列表节点(quicklistNode)个数
unsigned long len;
// 节点中ziplist长度的大小
int fill : 16;
// 头部开始或尾部开始 分别有几个节点不被压缩
unsigned int compress : 16;
} quicklist;
复制代码
由于快速列表的常见的应用场景大可能是访问两边的节点(例如:lpush, lpop, rpush, rpop等),为了进一步的节省空间,快速列表提供了选项能够使用LZF压缩算法对中间的节点中的ziplist进行压缩,节点压缩后本来指向ziplist的sz指针元素指向quicklistLZF结构体。 快速列表两边分别有几个节点不被压缩有配置项 list-compress-depth 决定,存储在qulicklist的compress元素中,默认为0(不进行节点压缩)。post
快速列表节点中压缩列表的最大字节长度或最多元素个数由配置项 list-max-ziplist-size 决定,存储在qulicklist的fill元素中。配置项为正数时表示节点中压缩列表最多元素个数,为负数时表示节点中压缩列表最大字节长度,默认约束为快速列表节点中压缩列表最大长度为8Kb。测试
值 | 节点ziplist最大长度 |
---|---|
-5 | 64Kb |
-4 | 32Kb |
-3 | 16Kb |
-2 | 8Kb |
-1 | 4Kb |
在快速列表的结构体定义中,使用告终构体位域(即 unsigned int count : 16; 这种写法),其实就是为了节省空间某些元素只使用几个bit位长度的空间。详细的解释戳这里。优化
#ifdef __GNUC__
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
#else
# define likely(x) !!(x)
# define unlikely(x) !!(x)
#endif
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
// 快速列表节点是否容许插入 大几率容许插入
// 内部根据fill值进行ziplist字节长度或个数的约束检测
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
// 容许插入,插入元素到快速列表节点的ziplist头部
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
// 更新ziplist字节长度
quicklistNodeUpdateSz(quicklist->head);
} else {
// ziplist字节长度或个数超限,则从新建立一个节点
quicklistNode *node = quicklistCreateNode();
// 元素插入新节点ziplist头部
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
// 更新ziplist字节长度
quicklistNodeUpdateSz(node);
// 将新建节点插入头节点以前
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_tail = quicklist->tail;
if (likely(
_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
// 容许插入,插入元素到快速列表节点的ziplist尾部
quicklist->tail->zl =
ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
// 更新ziplist字节长度
quicklistNodeUpdateSz(quicklist->tail);
} else {
quicklistNode *node = quicklistCreateNode();
// 元素插入新节点ziplist尾部
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
// 更新ziplist字节长度
quicklistNodeUpdateSz(node);
// 将新建节点插入尾节点以后
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
}
quicklist->count++;
quicklist->tail->count++;
return (orig_tail != quicklist->tail);
}
复制代码
push操做头插与尾插的步骤一致,都是根据fill元素信息判断是否要新增快速列表节点,而后调用ziplistPush将value经过头插或尾插方法插入到ziplist的头部或尾部。ui
likely()和unlikely()宏是用来进行编译优化引导的,lickly(a条件)就是告诉编译器a条件大几率是真的,编译器针对这一信息进行优化编译提升CPU预取指令的正确率。unlikely()是大几率为假。
likely()与unlikely()宏详细解释戳这里了解。
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (quicklist->count == 0)
return 0;
int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
_quicklistSaver);
if (data)
*data = vstr;
if (slong)
*slong = vlong;
if (sz)
*sz = vlen;
return ret;
}
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
int pos = (where == QUICKLIST_HEAD) ? 0 : -1;
if (quicklist->count == 0)
return 0;
if (data)
*data = NULL;
if (sz)
*sz = 0;
if (sval)
*sval = -123456789;
quicklistNode *node;
if (where == QUICKLIST_HEAD && quicklist->head) {
node = quicklist->head;
} else if (where == QUICKLIST_TAIL && quicklist->tail) {
node = quicklist->tail;
} else {
return 0;
}
// 根据下标从ziplist中获取元素
p = ziplistIndex(node->zl, pos);
if (ziplistGet(p, &vstr, &vlen, &vlong)) {
if (vstr) {
if (data)
*data = saver(vstr, vlen);
if (sz)
*sz = vlen;
} else {
if (data)
*data = NULL;
if (sval)
*sval = vlong;
}
删除该元素
quicklistDelIndex(quicklist, node, &p);
return 1;
}
return 0;
}
复制代码
pop操做相对比较简单,由于存储了头节点和尾节点,因此不管是头部pop仍是尾部pop均可以直接获取到对应的ziplist,而后取出ziplist头部value或者尾部value后进行元素删除。
int quicklistIndex(const quicklist *quicklist, const long long idx, quicklistEntry *entry) {
quicklistNode *n;
unsigned long long accum = 0;
unsigned long long index;
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
initEntry(entry);
entry->quicklist = quicklist;
if (!forward) {
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
if (index >= quicklist->count)
return 0;
while (likely(n)) {
// 遍历快速列表节点,找到目标下标元素所处的节点
if ((accum + n->count) > index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;
}
}
if (!n)
return 0;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
entry->node = n;
// 计算目标元素 ziplist内的偏移量
if (forward) {
/* forward = normal head-to-tail offset. */
// 正向遍历
entry->offset = index - accum;
} else {
/* reverse = need negative offset for tail-to-head, so undo * the result of the original if (index < 0) above. */
// 反向遍历
entry->offset = (-index) - 1 + accum;
}
// 如有须要则进行节点解压
quicklistDecompressNodeForUse(entry->node);
// 根据元素在ziplist中的下标获取元素
entry->zi = ziplistIndex(entry->node->zl, entry->offset);
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
/* The caller will use our result, so we don't re-compress here. * The caller can recompress or delete the node as needed. */
return 1;
}
复制代码
根据下标获取元素,经过遍历快速列表节点找到元素所属的节点,拿到对应的ziplist,若是节点被压缩则进行解压,而后计算出元素在ziplist中的下标拿到元素。假设快速列表有N个节点,每一个节点的ziplist有M个元素,则根据下标获取元素的复杂度为O(N + M)。