Codis是基于proxy架构的redis集群方案,如图1所示,即客户端的请求会先发送到proxy,由proxy作sharding后转发到后端redis实例。这个sharding的规则(常称之为路由表、转发表、slot表等)保存在集中化的组件(好比zookeeper、文件系统等)上,而后由Dashboard统一配置到全部Proxy上。相比而言,redis本身的集群方案redis-cluster则是无中心化的架构,如图2所示,它没有集中化的控制组件和proxy,客户端能够向集群内的任意一台节点发送请求,而后根据节点的返回值作重定向(MOVE或ASK)操做,客户端本地也会缓存slot表,并根据每次的重定向信息来更新这个表。因为没有中心化组件存储或配置路由表,所以redis-cluster使用gossip在集群间同步路由表和集群拓补信息,在通过一段时间时候,理想状况下集群中每一个节点都掌握了整个集群的路由信息。node
图1 Codis架构图redis
图2 redis-cluster算法
对nosql数据库而言,水平扩缩容(Scale in/out)是一项基本的能力。Scale in/out是指能够动态的添加或删除集群中的节点,来水平扩展或收缩集群容量和CPU算力,它和纵向扩缩容(Scale up/down)是相对的。因为nosql是没有schema的,通常都是简单的kv结构(或者是kkv结构),所以作Scale in/out仍是相对而言比较容易的。由于key是按照slot为单位进行sharding的(常见公式有:crc16(key) % slot_num,如图3 ),所以只要将一个实例上的某些slots迁移到其它节点上,再把路由表(即slot和node的映射关系)更新便可。虽然Codis和redis-cluster都支持这种slot迁移的Scale in/out,可是他们的实现方式仍是有诸多区别的,接下来本文会阐述它们的不一样。sql
图3 key-slot-node映射关系数据库
将一个redis上指定slot下的全部key迁移到其余redis上并不麻烦。其实只要两步,第一步先获取这个slot下全部key,而后对每一个key发送迁移命令便可。因为redis自己没有slot的概念,更不维护key与slot的映射关系,所以第一步是须要改造redis引擎,使其能够维护key与slot的映射关系,这一点redis-cluster和Codis都是这么作的(好比使用一个单独的dict数组来维护这种索引关系,每一个数组的下标就是slot num,每一个数组元素是一个dick,里面存放的是<key、crc> pair)。第二步发送就比较简单了,redis原生支持对一些key进行迁移的命令:MIGRATE,以下:后端
MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1 key2 ... keyN
redis-cluster的确就是直接使用MIGRATE 命令进行key的迁移,可是这个命令是同步阻塞的,鉴于redis单线程的特性,当MIGRATE耗时过久(好比网络较慢、迁移bigkey)时会致使主线程没法处理用户请求,从而致使用户RT变大甚至超时。所以,直接使用MIGRATE命令虽然方便,可是有诸多限制。Codis本身修改了redis引擎,加入了slots同步迁移和异步迁移的功能(同步迁移比较简单,本文再也不赘述)。数组
所以,要想作到平滑的、用户基本无感的scale in/out,slot迁移须要解决如下几个难点:缓存
图4 redis-cluster slot迁移网络
如图4所述,redis-cluster为了支持slot迁移,改造引擎加入了key和slot的映射关系。redis-cluster使用rax树来维护这个关系,所以在新建集群、集群扩缩容的时候,都会涉及到slot分配、删除等操做,这些操做主要经过如下命令实现:数据结构
一旦映射关系创建好,接下来就能够执行key相关的slot命令,redis-cluster提供了如下几个命令:
redis-cluster在迁移一个slot的时候具体流程以下:
若是中途想取消一个迁移,能够向节点发送 cluster setslot <slot> stable 取消对槽 slot 的导入(import)或者迁移(migrate)状态。
因为migrate命令是同步阻塞的(同步发送并同步接收),迁移过程会阻塞该引擎上的全部key的读写,只有在迁移响应成功以后才会将本地key删除,所以迁移是原子的。
由于MIGRATE命令是同步阻塞的,所以不会存在一个key正在被迁移又同时被读写的状况,可是因为一个slot下可能有部分key被迁移完成,部分key正在等待迁移的状况,为此若是读写的一个key所属的slot正在被迁移,redis-cluster作以下处理:
redis-cluster让redis集群化,Scale能力拓展了分布式的灵活性。可是也给redis带来了一些限制,其实这些限制也是其余redis集群方案基本都有的。好比,因为redis追求简单、高性能,并不支持跨节点(分布式)事务,所以一些涉及到可能跨节点的操做都将被限制,主要有:
和redis-cluster不一样,codis的redis上不会维护slot表信息,每一个redis都默认本身负责1024个slot,slot表是维护在Dashboard并被Proxy感知的,这一点算是Codis的架构一个较大的特色。
Codis只提供了一个key相关的slot命令:slotshashkey [key1 key2...] , 获取key所对应的hashslot。
具体流程可见图5。
图5 codis slot迁移流程
因为codis使用异步迁移slotsmgrttagslot-async命令,所以没法像redis-cluster那样利用MIGRATE命令同步阻塞的特性保证key迁移的原子性。为此,Codis作了如下手段来保证key的原子性:
和redis-cluster同步迁移不一样,Codis因为使用异步迁移,所以一个正处于迁移状态的key(即key已经被发送或者被部分发送,尚未获得最终响应)是可能被用户继续读写的,为此除了像redis-cluster那样要考虑迁移中的slot,Codis还须要考虑迁移中的key的读写冲突处理。
对于一个读写请求,若是key所在的slot正在被迁移 ,proxy会使用slotsmgrt-exec-wrapper $hashkey $command [$arg1 ...] 命令对原始请求进行包装一下再发送给redis,若是原始命令是读操做则能够正常响应,若是是写操做则redis返回TRYAGIN错误,由Proxy进行重试。若是key已经迁移走,则引擎返回MOVED错误,Proxy须要更新路由表,具体过程如图6所示。
图6 codis对迁移中的key的读写处理
本文将详细描述同步迁移和异步迁移的实现原理。
图7所示的就是同步迁移的流程,源端会将key进行序列化,而后使用socket将数据发送到目标redis(其实就是调用restore命令),目标redis收到restore命令后会对key进行反序列化,存储到DB以后回复ACK,源端redis收到ACK以后就将本地的key删除。能够看到,整个过程,源端redis都是阻塞的,若是迁移的key是一个bigkey,会致使源端序列化、网络传输、目标端反序列化、源端同步删除很是耗时,因为redis的单线程特性,时间循环(eventloop)没法及时处理用户的读写事件,从而致使用户RT增高甚至超时。
图7 同步迁移流程
因为redis支持list、set、zset、hash等复合数据结构,所以会有bigkey的问题。图8所示的就是MIGRATE命令实现原理,在MIGRATE中,所谓的序列化其实就是将key对应的value进行RDB格式化,在目标端redis按照RDB格式进行加载。若是list、set、zset、hash成员不少(好比几千个甚至几万个),那么RDB格式化和加载就会很是耗时。
图8 MIGRATE命令原理
既然同步迁移会阻塞主线程,那么很容易想到的解决方案就是使用一个独立线程作迁移,如图9所示。因为多线程会设计到对共享数据(好比DB)的访问,所以须要加同步原语,这对redis单线程、几乎无锁的架构而言,改动起来是比较复杂的。
图9 独立线程实现异步迁移
另外一种异步迁移实现思路,是依然采用单线程模型,即对象的序列化(在源redis端)和反序列化(在目标redis端)依然会阻塞主线程,可是和MIGRATE同步迁移不一样,异步迁移不会同步等待restore的返回,restore完成以后目标端redis会向源端redis发送一个restore-ack命令(相似于回调机制)来通知源端redis迁移的状态。所以这样大大的减小了源端redis迁移的阻塞时间,可让事件循环(eventloop)尽快的处理下一个就绪事件。
因为这种方案依然依赖于主线程作序列化和反序列化,所以,为了进一步下降序列化和反序列化的耗时,Codis使用拆分指令(chunked)的方式对bigkey作迁移处理。如图10所示,对于一个list而言,假设其包含很是多的elem,若是一次性将其所有序列化则很是耗时,若是将其等价拆分红一条条RPUSH指令,则每一条指令则很是的轻量。
图10 指令拆分
使用指令拆分以后,本来一个key只须要一条restore命令的迁移,如今变成不少条,所以为了保证迁移的原子性(即不会存在一些elem迁移成功,一些elem迁移失败),Codis会在每个拆分指令中加上一个临时TTL,因为只有所有前已成功才会删除本地的key,所以即便中途迁移失败,已迁移成功的elem也会超时自动删除,最终效果就比如迁移没有发生同样。elem所有迁移成功以后,Codis会再单独发送一个修正TTL的命令并删除本地的key。
图11 临时TTL
异步迁移的第一步,就是先发一条DEL命令删除目标redis上的key,如图12所示。
图12 第一步先删除目标key
如图13所示,接下来收到目标redis的ACK以后会继续发送后续的拆分指令,每次发送的拆分指令的个数是能够参数控制的。
图13 临时TTL
全部的拆分指令所有发送完成以后,会再发一个修成TTL的指令,最后删除本地的key。
图14 迁移完成删除本地的key
并非全部的key都会采用chunked的方式迁移,对于string对象、小对象依然能够直接使用RDB格式序列化,只有对于大对象(bigkey)才会触发chunked方式迁移。
图15 针对不一样对象使用不一样迁移方式
前文主要论述了redis-cluster同步迁移和Codis异步迁移的异同和原理,redis-cluster同步迁移能够参考redis源码中cluster.c中关于migrateCommand和restoreCommand实现,源码仍是很是简单的。Codis的slot迁移提供了同步和异步两种,同步迁移的代码在slots.c中,其代码和redis原生的migrateCommand基本一致,所以二者观其一便可。异步迁移代码在slots_async.c中,这块的原创性就比较高了,因为原做者对代码基本没有加注释,所以为了便于理解,我在阅读源码的时候简单的加了一些中文注释,就贴在这里吧。原理如前文所述,想看实现的能够看下面的代码,我就不一一拆分解释了,由于太多了。。。
#include "server.h" /* ============================ Worker Thread for Lazy Release ============================= */ typedef struct { pthread_t thread;/* lazy工做线程 */ pthread_mutex_t mutex;/* 互斥信号量 */ pthread_cond_t cond;/* 条件变量 */ list *objs; /* 要被lazy释放的对象链表 */ } lazyReleaseWorker; /* lazy释放主线程 */ static void * lazyReleaseWorkerMain(void *args) { lazyReleaseWorker *p = args; while (1) { /* 等待在条件变量上,条件为待释放对象链表长度为0 */ pthread_mutex_lock(&p->mutex); while (listLength(p->objs) == 0) { pthread_cond_wait(&p->cond, &p->mutex); } /* 取出链表的第一个节点 */ listNode *head = listFirst(p->objs); /* 节点值为要释放的对象 */ robj *o = listNodeValue(head); /* 从链表中删除这个节点 */ listDelNode(p->objs, head); pthread_mutex_unlock(&p->mutex); /* 释放对象 */ decrRefCount(o); } return NULL; } /* lazy释放一个对象 */ static void lazyReleaseObject(robj *o) { /* 对象当前的refcount必须已经为1,即已经没有任何人引用这个对象 */ serverAssert(o->refcount == 1); /* 获取lazyReleaseWorker */ lazyReleaseWorker *p = server.slotsmgrt_lazy_release; /* 上锁 */ pthread_mutex_lock(&p->mutex); if (listLength(p->objs) == 0) { /* 若是待释放队列长度为0,则唤醒释放线程 */ pthread_cond_broadcast(&p->cond); } /* 将待释放对象加入释放链表 */ listAddNodeTail(p->objs, o); /* 解锁 */ pthread_mutex_unlock(&p->mutex); } /* 建立lazy释放工做线程 */ static lazyReleaseWorker * createLazyReleaseWorkerThread() { lazyReleaseWorker *p = zmalloc(sizeof(lazyReleaseWorker)); pthread_mutex_init(&p->mutex, NULL); pthread_cond_init(&p->cond, NULL); p->objs = listCreate(); /* 建立线程 */ if (pthread_create(&p->thread, NULL, lazyReleaseWorkerMain, p) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Worker Thread for Lazy Release Jobs."); exit(1); } return p; } /* 初始化Lazy释放工做线程 */ void slotsmgrtInitLazyReleaseWorkerThread() { server.slotsmgrt_lazy_release = createLazyReleaseWorkerThread(); } /* ============================ Iterator for Data Migration ================================ */ #define STAGE_PREPARE 0 #define STAGE_PAYLOAD 1 #define STAGE_CHUNKED 2 #define STAGE_FILLTTL 3 #define STAGE_DONE 4 /* 单对象迭代器 */ typedef struct { int stage; robj *key;/* 单对象对应的的key */ robj *val;/* 单对象对应的的值 */ long long expire;/* 该对象对应的过时设置 */ unsigned long cursor;/* 游标,用于dictScan */ unsigned long lindex;/* 索引,listTypeInitIterator时用到 */ unsigned long zindex;/* 索引,遍历zset时用到 */ unsigned long chunked_msgs;/* 该对象chunked消息个数 */ } singleObjectIterator; /* 建立单对象迭代 */ static singleObjectIterator * createSingleObjectIterator(robj *key) { /* 分配空间 */ singleObjectIterator *it = zmalloc(sizeof(singleObjectIterator)); /* 初始化阶段 */ it->stage = STAGE_PREPARE; /* 设置key */ it->key = key; /* 引用计数 */ incrRefCount(it->key); it->val = NULL; it->expire = 0; it->cursor = 0; it->lindex = 0; it->zindex = 0; it->chunked_msgs = 0; return it; } /* 释放SingleObjectIterator */ static void freeSingleObjectIterator(singleObjectIterator *it) { if (it->val != NULL) { /* 对val解引用 */ decrRefCount(it->val); } /* 对key解引用 */ decrRefCount(it->key); /* 释放结构 */ zfree(it); } static void freeSingleObjectIteratorVoid(void *it) { freeSingleObjectIterator(it); } /* 判断单个对象是否还有下一个阶段须要处理 */ static int singleObjectIteratorHasNext(singleObjectIterator *it) { /* 只要状态不是STAGE_DONE就还须要继续处理 */ return it->stage != STAGE_DONE; } /* 若是是sds编码的字符串对象就返回sds底层字符换的长度,不然返回默认长度len */ static size_t sdslenOrElse(robj *o, size_t len) { return sdsEncodedObject(o) ? sdslen(o->ptr) : len; } /* 若是val类型为dict时执行dictScan操做的回调 */ static void singleObjectIteratorScanCallback(void *data, const dictEntry *de) { /* 提取privdata {ll, val, &len}*/ void **pd = (void **)data; list *l = pd[0];/* 链表,用于存放scan出来的元素 */ robj *o = pd[1];/* 被迭代的对象值val */ long long *n = pd[2];/* 返回字节数的指针 */ robj *objs[2] = {NULL, NULL}; switch (o->type) { case OBJ_HASH: /* 若是原对象是hash,则分别将hash的key和value按顺序方式链表 */ objs[0] = dictGetKey(de); objs[1] = dictGetVal(de); break; case OBJ_SET: /* 若是原对象是set,则只将hash的key放入链表 */ objs[0] = dictGetKey(de); break; } /* 将扫出来的对象添加到链表 */ for (int i = 0; i < 2; i ++) { if (objs[i] != NULL) { /* 引用计数 */ incrRefCount(objs[i]); /* 这个对象的大小,对于string对象就是string长度,其余对象就按8字节算 */ *n += sdslenOrElse(objs[i], 8); listAddNodeTail(l, objs[i]); } } } /* 将double转为内存二进制表示 */ static uint64_t convertDoubleToRawBits(double value) { union { double d; uint64_t u; } fp; fp.d = value; return fp.u; } /* 将内存二进制表示转为double值 */ static double convertRawBitsToDouble(uint64_t value) { union { double d; uint64_t u; } fp; fp.u = value; return fp.d; } /* 从Uint64建立RawString对象 */ static robj * createRawStringObjectFromUint64(uint64_t v) { uint64_t p = intrev64ifbe(v); return createRawStringObject((char *)&p, sizeof(p)); } /* 从RawString获取Uint64 */ static int getUint64FromRawStringObject(robj *o, uint64_t *p) { if (sdsEncodedObject(o) && sdslen(o->ptr) == sizeof(uint64_t)) { *p = intrev64ifbe(*(uint64_t *)(o->ptr)); return C_OK; } return C_ERR; } /* 计算一个对象须要的restore命令的个数,单个restore上只能携带maxbulks个Bulk Bulk:$6\r\nfoobar\r\n Multi-bulk :"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n" */ static long numberOfRestoreCommandsFromObject(robj *val, long long maxbulks) { long long numbulks = 0; switch (val->type) { case OBJ_LIST: if (val->encoding == OBJ_ENCODING_QUICKLIST) { /* list的长度就是须要的Bulk的数目 */ numbulks = listTypeLength(val); } break; case OBJ_HASH: if (val->encoding == OBJ_ENCODING_HT) { /* hash表中每一个元素须要2个Bulk */ numbulks = hashTypeLength(val) * 2; } break; case OBJ_SET: if (val->encoding == OBJ_ENCODING_HT) { /* set中每一个元素须要1个Bulk */ numbulks = setTypeSize(val); } break; case OBJ_ZSET: if (val->encoding == OBJ_ENCODING_SKIPLIST) { /* zset中每一个元素须要2个Bulk */ numbulks = zsetLength(val) * 2; } break; } /* 若是实际的numbulks比要求的maxbulks小,则使用一条restore命令 */ if (numbulks <= maxbulks) { return 1; } /* 计算须要的restore命令个数 */ return (numbulks + maxbulks - 1) / maxbulks; } /* 估计Restore命令的个数 */ static long estimateNumberOfRestoreCommands(redisDb *db, robj *key, long long maxbulks) { /* 查找key对应的val */ robj *val = lookupKeyWrite(db, key); if (val != NULL) { return numberOfRestoreCommandsFromObject(val, maxbulks); } return 0; } extern void createDumpPayload(rio *payload, robj *o); extern zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); static slotsmgrtAsyncClient *getSlotsmgrtAsyncClient(int db); /* 单对象迭代,返回值为命令个数(Bulks) */ static int singleObjectIteratorNext(client *c, singleObjectIterator *it, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { /* * * STAGE_PREPARE ---> STAGE_PAYLOAD ---> STAGE_DONE * | A * V | * +------------> STAGE_CHUNKED ---> STAGE_FILLTTL * A | * | V * +-------+ * */ /* 本次迭代的key */ robj *key = it->key; /* 但对象迁移的准备阶段 */ if (it->stage == STAGE_PREPARE) { /* 以写的方式查找key,与lookupKeyRead区别是没有命中率更新 */ robj *val = lookupKeyWrite(c->db, key); if (val == NULL) { /* 若是key没有找到,则结束 */ it->stage = STAGE_DONE; return 0; } /* 设置值 */ it->val = val; /* 增长引用 */ incrRefCount(it->val); /* 设置过时时间 */ it->expire = getExpire(c->db, key); /* 前导消息 */ int leading_msgs = 0; /* 获取db对应的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == c) { /* 只有slotsmgrtAsyncClient未被使用的时候 */ if (ac->used == 0) { /* 表示已经被使用 */ ac->used = 1; /* 若是须要验证 */ if (server.requirepass != NULL) { /* SLOTSRESTORE-ASYNC-AUTH $password */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-AUTH"); addReplyBulkCString(c, server.requirepass); leading_msgs += 1; } /* SELECT DB操做 */ do { /* SLOTSRESTORE-ASYNC-SELECT $db */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-SELECT"); addReplyBulkLongLong(c, c->db->id); leading_msgs += 1; } while (0); } } /* SLOTSRESTORE-ASYNC delete $key */ addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "delete"); addReplyBulk(c, key); /* 计算须要的restore命令个数,maxbulks表示一个restore命令可承载的bulk最大数目 */ long n = numberOfRestoreCommandsFromObject(val, maxbulks); if (n >= 2) { /* 若是须要2个及以上,则进入CHUNKED阶段,即启用分块传输 */ it->stage = STAGE_CHUNKED; /* chunked消息个数 */ it->chunked_msgs = n; } else { /* 不然一个restore能够承载,则直接进入PAYLOAD阶段 */ it->stage = STAGE_PAYLOAD; it->chunked_msgs = 0; } /* 这里的1为delete命令,再加上其余的前导命令(若是有),做为命令个数返回 */ return 1 + leading_msgs; } /* 取出key对应的值 */ robj *val = it->val; long long ttl = 0; if (it->stage == STAGE_CHUNKED) { /* 若是是CHUNKED阶段,则设置一个临时ttl */ ttl = timeout * 3; } else if (it->expire != -1) { /* 不然若是val上有过时时间,则从新计算ttl */ ttl = it->expire - mstime(); if (ttl < 1) { ttl = 1; } } /* 当一个CHUNKED对象所有序列化完成以后会到这个阶段 */ if (it->stage == STAGE_FILLTTL) { /* SLOTSRESTORE-ASYNC expire $key $ttl */ addReplyMultiBulkLen(c, 4); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "expire"); addReplyBulk(c, key); /* 设置真实的ttl */ addReplyBulkLongLong(c, ttl); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 若是是PAYLOAD阶段切val类型不是OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type != OBJ_STRING) { /* 负载缓冲区 */ rio payload; /* 将val序列化为RDB格式 */ createDumpPayload(&payload, val); /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); /* 对象类型 */ addReplyBulkCString(c, "object"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); /* 添加payload */ addReplyBulkSds(c, payload.io.buffer.ptr); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 若是是PAYLOAD阶段切val类型为OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type == OBJ_STRING) { /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "string"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); addReplyBulk(c, val); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 若是是CHUNKED类型 */ if (it->stage == STAGE_CHUNKED) { const char *cmd = NULL; /* 根据val的类型使用不一样的子命令 */ switch (val->type) { case OBJ_LIST: cmd = "list"; break; case OBJ_HASH: cmd = "hash"; break; case OBJ_SET: cmd = "dict"; break; case OBJ_ZSET: cmd = "zset"; break; default: serverPanic("unknown object type"); } /* 是否还有更多须要序列化 */ int more = 1; /* ll链表用于存放本次SLOTSRESTORE-ASYNC命令携带的args */ list *ll = listCreate(); /* 设置是否函数,本质就是调用decrRefCount */ listSetFreeMethod(ll, decrRefCountVoid); long long hint = 0, len = 0; if (val->type == OBJ_LIST) { /* 若是val类型为OBJ_LIST,则建立list迭代 */ listTypeIterator *li = listTypeInitIterator(val, it->lindex, LIST_TAIL); do { /* 表示list每一项 */ listTypeEntry entry; /* 遍历 */ if (listTypeNext(li, &entry)) { quicklistEntry *e = &(entry.entry); robj *obj; if (e->value) { /* */ obj = createStringObject((const char *)e->value, e->sz); } else { /* */ obj = createStringObjectFromLongLong(e->longval); } /* 累计字节数 */ len += sdslenOrElse(obj, 8); /* 添加到ll */ listAddNodeTail(ll, obj); /* 索引加1 */ it->lindex ++; } else { /* 没有更多了 */ more = 0; } /* 当还有更多要发送且ll现有元素个数小于maxbulks且字节数小于 maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 释放迭代器 */ listTypeReleaseIterator(li); /* 原list的总长度 */ hint = listTypeLength(val); } if (val->type == OBJ_HASH || val->type == OBJ_SET) { /* 控制循环次数 */ int loop = maxbulks * 10; /* 默认最大循环次数 */ if (loop < 100) { loop = 100; } dict *ht = val->ptr; void *pd[] = {ll, val, &len}; do { it->cursor = dictScan(ht, it->cursor, singleObjectIteratorScanCallback, pd); if (it->cursor == 0) { /* 没有更多了 */ more = 0; } /* 若是还有更多且ll现有元素个数小于maxbulks且本次发送字节数小于maxbytes且loop不为0 */ } while (more && listLength(ll) < maxbulks && len < maxbytes && (-- loop) >= 0); /* 原hash的总大小 */ hint = dictSize(ht); } if (val->type == OBJ_ZSET) { /* 若是是ZSET类型 */ zset *zs = val->ptr; dict *ht = zs->dict; long long rank = (long long)zsetLength(val) - it->zindex; zskiplistNode *node = (rank >= 1) ? zslGetElementByRank(zs->zsl, rank) : NULL; do { if (node != NULL) { robj *field = node->obj; incrRefCount(field); len += sdslenOrElse(field, 8); listAddNodeTail(ll, field); uint64_t bits = convertDoubleToRawBits(node->score); robj *score = createRawStringObjectFromUint64(bits); len += sdslenOrElse(score, 8); listAddNodeTail(ll, score); node = node->backward; it->zindex ++; } else { /* 没有更多了 */ more = 0; } /* 若是还有更多元素且bulks没有超过maxbulks且产生的字节数没有超过maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 原hash总大小 */ hint = dictSize(ht); } /* SLOTSRESTORE-ASYNC list/hash/zset/dict $key $ttl $hint [$arg1 ...] */ addReplyMultiBulkLen(c, 5 + listLength(ll));/* MultiBulk总长度 */ addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, cmd);/* list?hash? */ addReplyBulk(c, key); addReplyBulkLongLong(c, ttl);/* ttl */ addReplyBulkLongLong(c, hint);/* 总大小 */ /* 遍历ll,ll里面存放了本地要发送的args */ while (listLength(ll) != 0) { /* 取出头结点 */ listNode *head = listFirst(ll); /* 取出值对象 */ robj *obj = listNodeValue(head); /* 添加回复 */ addReplyBulk(c, obj); /* 删除该节点 */ listDelNode(ll, head); } /* 释放ll */ listRelease(ll); if (!more) { /* 若是对象全部元素都被序列换完毕,则进入FILLTTL阶段 */ it->stage = STAGE_FILLTTL; } /* 该阶段只有一个命令 */ return 1; } if (it->stage != STAGE_DONE) { serverPanic("invalid iterator stage"); } serverPanic("use of empty iterator"); } /* ============================ Iterator for Data Migration (batched) ====================== */ typedef struct { struct zskiplist *tags;/* 标识一个hashtag有没有被添加过 */ dict *keys;/* 批处理的Keys */ list *list; /* 每一个节点的值都是singleObjectIterator */ dict *hash_slot;/* hash数组,数组的下标为slot_num,每一个数组元素的字典为key、crc对 */ struct zskiplist *hash_tags;/* 用于保存具备hashtag的key,score为key的crc,值为key */ long long timeout;/* 进程chunked restore时会指定临时ttl,值为timeout*3 */ unsigned int maxbulks;/* 单次restore最多发送多少个bulks */ unsigned int maxbytes;/* 单次发送最多发送多少字节数 */ list *removed_keys;/* 一个key被发送完成以后会加入这个链表 */ list *chunked_vals;/* 用于存放使用chunked方式发生的val */ long estimate_msgs;/* 估算的restore命令的个数 */ } batchedObjectIterator; /* 建立batchedObjectIterator */ static batchedObjectIterator * createBatchedObjectIterator(dict *hash_slot, struct zskiplist *hash_tags, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { batchedObjectIterator *it = zmalloc(sizeof(batchedObjectIterator)); it->tags = zslCreate(); it->keys = dictCreate(&setDictType, NULL); it->list = listCreate(); listSetFreeMethod(it->list, freeSingleObjectIteratorVoid); it->hash_slot = hash_slot; it->hash_tags = hash_tags; it->timeout = timeout; it->maxbulks = maxbulks; it->maxbytes = maxbytes; it->removed_keys = listCreate(); listSetFreeMethod(it->removed_keys, decrRefCountVoid); it->chunked_vals = listCreate(); listSetFreeMethod(it->chunked_vals, decrRefCountVoid); it->estimate_msgs = 0; return it; } /* 释放BatchedObjectIterator */ static void freeBatchedObjectIterator(batchedObjectIterator *it) { zslFree(it->tags); dictRelease(it->keys); listRelease(it->list); listRelease(it->removed_keys); listRelease(it->chunked_vals); zfree(it); } /* 批处理迭代(即一次处理多个key) */ static int batchedObjectIteratorHasNext(batchedObjectIterator *it) { /* list链表不为空,每一个节点的值都是singleObjectIterator */ while (listLength(it->list) != 0) { /* 每一个节点的值都是singleObjectIterator */ listNode *head = listFirst(it->list); /* 每一个节点的值都是singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* 判断单个对象是否已经处于STAGE_DONE */ if (singleObjectIteratorHasNext(sp)) { /* 不处于STAGE_DONE,即单对象迭代还没结束,则直接返回1,下次还会迭代这个对象 */ return 1; } /* 不然当前单对象已经迭代结束 */ if (sp->val != NULL) { /* 若是当前单对象的value不为空,就把单对象的key添加到removed_keys链表 */ incrRefCount(sp->key); listAddNodeTail(it->removed_keys, sp->key); if (sp->chunked_msgs != 0) { /* 若是chunked的消息个数不为0 */ incrRefCount(sp->val); /* 就把val加入到chunked_vals链表 */ listAddNodeTail(it->chunked_vals, sp->val); } } /* 删除这个节点 */ listDelNode(it->list, head); } return 0; } /* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */ static int batchedObjectIteratorNext(client *c, batchedObjectIterator *it) { /* 遍历链表 */ if (listLength(it->list) != 0) { /* 取出头结点 */ listNode *head = listFirst(it->list); /* 节点值为singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* maxbytes减去客户端输出缓冲区当前已有的大小就是本次能发送的最大字节数 */ long long maxbytes = (long long)it->maxbytes - getClientOutputBufferMemoryUsage(c); /* 单对象迭代,迭代超时timeout,迭代单词最大maxbulks,单次最大maxbytes */ return singleObjectIteratorNext(c, sp, it->timeout, it->maxbulks, maxbytes > 0 ? maxbytes : 0); } serverPanic("use of empty iterator"); } /* 批处理里面是否包含key,返回1表示存在,返回0表示不存在 */ static int batchedObjectIteratorContains(batchedObjectIterator *it, robj *key, int usetag) { /* 若是在keys中找到,则存在 */ if (dictFind(it->keys, key) != NULL) { return 1; } /* 若是没有使用hashtag则结束查找 */ if (!usetag) { return 0; } uint32_t crc; int hastag; /* 计算key的crc和hashtag */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 若是key没有hashtag则结束查找 */ return 0; } /* 不然填充range */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 以crc为范围在跳表tags中查找,每个hashtag被添加都会在tags跳表中添加一个节点 */ return zslFirstInRange(it->tags, &range) != NULL; } /* 向批处理添加一个key,返回值为本次新添加的key的个数 */ static int batchedObjectIteratorAddKey(redisDb *db, batchedObjectIterator *it, robj *key) { /* 添加到keys字典 */ if (dictAdd(it->keys, key, NULL) != C_OK) { return 0; } /* 引用计数 */ incrRefCount(key); /* 建立createSingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 对该对象须要的restore命令个数进行预估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); /* 当前批处理的key个数 */ int size = dictSize(it->keys); uint32_t crc; int hastag; /* 该key对应的slot num */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 若是key不含有hashtag则跳出 */ goto out; } /* 知道score为crc */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 寻找第一个score知足 range范围的节点*/ if (zslFirstInRange(it->tags, &range) != NULL) { /* 找到则跳出,所以是该hashtag的key已经被添加过,无需重复添加 */ goto out; } /* 引用计数 */ incrRefCount(key); /* 没找到则插入,score为crc,节点的值为key */ zslInsert(it->tags, (double)crc, key); /* 若是hash_tags跳表指针为NULL */ if (it->hash_tags == NULL) { goto out; } /* 在hash_tags中寻找score知足range范围的第一个节点 */ zskiplistNode *node = zslFirstInRange(it->hash_tags, &range); /* 若是score不一样就跳出 */ while (node != NULL && node->score == (double)crc) { /* 结点值就是key */ robj *key = node->obj; /* score相同的节点都是连续排列的,所以直接从level[0]向后遍历就好 */ node = node->level[0].forward; /* 添加到批处理keys */ if (dictAdd(it->keys, key, NULL) != C_OK) { continue; } /* 引用计数 */ incrRefCount(key); /* 为该key添加但对象迭代器SingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 对该对象须要的restore命令个数进行预估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); } out: /* 本次新加如的key的个数,注意最开始的1个key也要加上 */ return 1 + dictSize(it->keys) - size; } /* ============================ Clients ==================================================== */ /* 获取异步迁移客户端,每一个db一个 */ static slotsmgrtAsyncClient * getSlotsmgrtAsyncClient(int db) { return &server.slotsmgrt_cached_clients[db]; } /* 通知被阻塞的 SlotsmgrtAsyncClient */ static void notifySlotsmgrtAsyncClient(slotsmgrtAsyncClient *ac, const char *errmsg) { /* 获取当前迭代器 */ batchedObjectIterator *it = ac->batched_iter; /* 获取阻塞链表 */ list *ll = ac->blocked_list; /* 遍历 */ while (listLength(ll) != 0) { /* 取出头节点 */ listNode *head = listFirst(ll); /* 取出节点值,就是client */ client *c = listNodeValue(head); if (errmsg != NULL) { /* 错误信息不为空,则将错误信息返回给client */ addReplyError(c, errmsg); } else if (it == NULL) { /* 迭代器非法 */ addReplyError(c, "invalid iterator (NULL)"); } else if (it->hash_slot == NULL) { addReplyLongLong(c, listLength(it->removed_keys)); } else { /* 返回两个值,一个是本次moved一个是hash_slot如今的大小 */ addReplyMultiBulkLen(c, 2); addReplyLongLong(c, listLength(it->removed_keys)); addReplyLongLong(c, dictSize(it->hash_slot)); } /* 清除CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这个客户端不是一个正在被使用、正常服务的客户端 */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 清空客户端阻塞链表 */ c->slotsmgrt_fenceq = NULL; /* 删除当前节点 */ listDelNode(ll, head); } } /* 释放slotsmgrtAsyncClient里面的结构 */ static void unlinkSlotsmgrtAsyncCachedClient(client *c, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); /* 必须有CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT标志,表示这是一个已经被cached的客户端 */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT); serverAssert(ac->c == c); /* 通知被阻塞的客户端,消息为errmsg */ notifySlotsmgrtAsyncClient(ac, errmsg); batchedObjectIterator *it = ac->batched_iter; /* 空闲时间 */ long long elapsed = mstime() - ac->lastuse; serverLog(LL_WARNING, "slotsmgrt_async: unlink client %s:%d (DB=%d): " "sending_msgs = %ld, batched_iter = %ld, blocked_list = %ld, " "timeout = %lld(ms), elapsed = %lld(ms) (%s)", ac->host, ac->port, c->db->id, ac->sending_msgs, it != NULL ? (long)listLength(it->list) : -1, (long)listLength(ac->blocked_list), ac->timeout, elapsed, errmsg); sdsfree(ac->host); if (it != NULL) { /* 释放批处理迭代器 */ freeBatchedObjectIterator(it); } /* 释放阻塞链表 */ listRelease(ac->blocked_list); /* 取消CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示不是被缓存的slotsmgrtAsyncClient */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 状况结构,以备下一次使用(注意不须要free ac,由于这是每一个db私有的) */ memset(ac, 0, sizeof(*ac)); } /* 释放一个db相关的SlotsmgrtAsyncClient */ static int releaseSlotsmgrtAsyncClient(int db, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c == NULL) { /* 为NULL无需释放 */ return 0; } client *c = ac->c; /* 释放slotsmgrtAsyncClient里面的结构 */ unlinkSlotsmgrtAsyncCachedClient(c, errmsg); /* 释放client结构 */ freeClient(c); return 1; } /* 新建一个slotsmgrtAsyncClient */ static int createSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 新建链接 */ int fd = anetTcpNonBlockConnect(server.neterr, host, port); if (fd == -1) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 禁用nagel算法 */ anetEnableTcpNoDelay(server.neterr, fd); int wait = 100; if (wait > timeout) { wait = timeout; } /* 等待可写状态 */ if ((aeWait(fd, AE_WRITABLE, wait) & AE_WRITABLE) == 0) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, io error or timeout (%d)", host, port, db, wait); close(fd); return C_ERR; } /* 建立redis客户端,内部会将fd读事件添加到主线程eventloop */ client *c = createClient(fd); if (c == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 选择客户端绑定的db */ if (selectDb(c, db) != C_OK) { serverLog(LL_WARNING, "slotsmgrt_async: invalid DB index (DB=%d)", db); freeClient(c); return C_ERR; } /* 添加设置标志CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示这是一个已经被CACHED的客户端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 已认证 */ c->authenticated = 1; /* 释放一个db相关的SlotsmgrtAsyncClient(清空里面的成员结构) */ releaseSlotsmgrtAsyncClient(db, "interrupted: build new connection"); serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) OK", host, port, db); /* 根据db获取slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); /* 设置绑定的client */ ac->c = c; /* 没有被使用 */ ac->used = 0; /* ip */ ac->host = sdsnew(host); /* port */ ac->port = port; /* 空闲时间 */ ac->timeout = timeout; /* 更新最后一次使用时间 */ ac->lastuse = mstime(); /* 飞行中的消息计数 */ ac->sending_msgs = 0; /* 批处理迭代器 */ ac->batched_iter = NULL; /* 建立阻塞链表 */ ac->blocked_list = listCreate(); return C_OK; } /* 获取或建立一个slotsmgrtAsyncClient */ static slotsmgrtAsyncClient * getOrCreateSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 根据要操做的db获取缓存的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c != NULL) { /* 不为NULL,在比较下host和port,只有彻底线条才返回 */ if (ac->port == port && !strcmp(ac->host, host)) { return ac; } } /* 不然新建一个slotsmgrtAsyncClient */ return createSlotsmgrtAsyncClient(db, host, port, timeout) != C_OK ? NULL : ac; } static void unlinkSlotsmgrtAsyncNormalClient(client *c) { /* 释放一个正在被使用的、正常的client */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT); /* 阻塞链表不能为NULL */ serverAssert(c->slotsmgrt_fenceq != NULL); /* 该客户端阻塞的链表 */ list *ll = c->slotsmgrt_fenceq; /* 在阻塞链表中搜索该客户端 */ listNode *node = listSearchKey(ll, c); /* 必须能搜索到 */ serverAssert(node != NULL); /* 再也不是一个正在被使用的、正常的client */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 再也不阻塞也就没有阻塞链表 */ c->slotsmgrt_fenceq = NULL; /* 从阻塞链表中删除该客户端 */ listDelNode(ll, node); } void slotsmgrtAsyncUnlinkClient(client *c) { /* 针对CACHED类型客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT) { unlinkSlotsmgrtAsyncCachedClient(c, "interrupted: connection closed"); } /* 针对NORMAL类型客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { unlinkSlotsmgrtAsyncNormalClient(c); } } /* 会被按期执行 */ void slotsmgrtAsyncCleanup() { /* 遍历全部db */ for (int i = 0; i < server.dbnum; i ++) { /* 获取每一个db对应的 slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(i); if (ac->c == NULL) { continue; } /* 计算空闲时间 */ long long elapsed = mstime() - ac->lastuse; /* 提取客户端timeout */ long long timeout = ac->batched_iter != NULL ? ac->timeout : 1000LL * 60; if (elapsed <= timeout) { /* 若是空闲时间小于timeout则继续遍历 */ continue; } /* 不然就释放这个客户端 */ releaseSlotsmgrtAsyncClient(i, ac->batched_iter != NULL ? "interrupted: migration timeout" : "interrupted: idle timeout"); } } /* 获取异步迁移状态或者阻塞一个client */ static int getSlotsmgrtAsyncClientMigrationStatusOrBlock(client *c, robj *key, int block) { /* 获取当前db上的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL || ac->batched_iter == NULL) { /* 没有迁移或迁移完成 */ return 0; } /* 获取当前的batched_iter */ batchedObjectIterator *it = ac->batched_iter; if (key != NULL && !batchedObjectIteratorContains(it, key, 1)) { /* 若是key不为NULL且key不在batched中则直接返回0,表示该key没有迁移或者迁移完成 */ return 0; } if (!block) { /* 若是不容许阻塞则直接返回 */ return 1; } if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { /* 若是这个客户端是一个CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT,便是一个 正在服务的slotsmgrtAsyncClient */ return -1; } /* 获取阻塞链表 */ list *ll = ac->blocked_list; /* 设置CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这是一个正常的被阻塞的客户端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 设置客户端阻塞在哪一个链表上 */ c->slotsmgrt_fenceq = ll; /* 添加到阻塞队列 */ listAddNodeTail(ll, c); return 1; } /* ============================ Slotsmgrt{One,TagOne}AsyncDumpCommand ====================== */ /* SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ static void slotsmgrtAsyncDumpGenericCommand(client *c, int usetag) { long long timeout; /* 获取timeout */ if (getLongLongFromObject(c->argv[1], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[1]->ptr); return; } /* 若是timeout为0就修正为30s */ if (timeout == 0) { timeout = 1000 * 30; } /* 获取maxbulks */ long long maxbulks; if (getLongLongFromObject(c->argv[2], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[2]->ptr); return; } /* 若是maxbulks就修正为默认值3000 */ if (maxbulks == 0) { maxbulks = 1000; } /* 建立批处理迭代器,若是使用hashtag则提供 tagged_keys */ batchedObjectIterator *it = createBatchedObjectIterator(NULL, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, INT_MAX); /* 向批处理添加keys */ for (int i = 3; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } /* 添加一个空对象节点到复链表reply中,用于存放MultiBulk的长度 */ void *ptr = addDeferredMultiBulkLength(c); int total = 0; /* batched迭代 */ while (batchedObjectIteratorHasNext(it)) { /* batchedObjectIteratorNext返回本次迭代产生的SLOTSRESTORE系列命令的个数 */ total += batchedObjectIteratorNext(c, it); } /* 把真实的长度写进去 */ setDeferredMultiBulkLength(c, ptr, total); /* 释放批处理迭代器 */ freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 0); } /* * * SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 1); } /* ============================ Slotsmgrt{One,TagOne,Slot,TagSlot}AsyncCommand ============= */ /* 根据配置的client_obuf参数来修正maxbytes */ static unsigned int slotsmgrtAsyncMaxBufferLimit(unsigned int maxbytes) { clientBufferLimitsConfig *config = &server.client_obuf_limits[CLIENT_TYPE_NORMAL]; if (config->soft_limit_bytes != 0 && config->soft_limit_bytes < maxbytes) { /* 若是配置的大小比soft_limit_bytes大则使用soft_limit_bytes */ maxbytes = config->soft_limit_bytes; } if (config->hard_limit_bytes != 0 && config->hard_limit_bytes < maxbytes) { /* 若是配置的大小比hard_limit_bytes大则使用hard_limit_bytes */ maxbytes = config->hard_limit_bytes; } return maxbytes; } /* 在给定长时间usecs内至少产生atleast条消息(一条消息表明一条SLOTSRESTORE命令) */ static long slotsmgrtAsyncNextMessagesMicroseconds(slotsmgrtAsyncClient *ac, long atleast, long long usecs) { /* 批处理迭代 */ batchedObjectIterator *it = ac->batched_iter; /* 阶段截止时间 */ long long deadline = ustime() + usecs; long msgs = 0; /* 若是批处理还有对象须要迭代切客户端输出缓冲区使用字节数小于maxbytes */ while (batchedObjectIteratorHasNext(it) && getClientOutputBufferMemoryUsage(ac->c) < it->maxbytes) { /* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */ if ((msgs += batchedObjectIteratorNext(ac->c, it)) < atleast) { continue; } /* 若是已经超时就返回 */ if (ustime() >= deadline) { return msgs; } } /* 返回消息的个数 */ return msgs; } /* hash_slot的扫描函数 */ static void slotsScanSdsKeyCallback(void *l, const dictEntry *de) { sds skey = dictGetKey(de); robj *key = createStringObject(skey, sdslen(skey)); /* 将key添加都链表 */ listAddNodeTail((list *)l, key); } /* SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ /* SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ static void slotsmgrtAsyncGenericCommand(client *c, int usetag, int usekey) { /* 提取host和port */ char *host = c->argv[1]->ptr; long long port; if (getLongLongFromObject(c->argv[2], &port) != C_OK || !(port >= 1 && port < 65536)) { addReplyErrorFormat(c, "invalid value of port (%s)", (char *)c->argv[2]->ptr); return; } /* 提取timeout,用于chunk迁移时的临时ttl */ long long timeout; if (getLongLongFromObject(c->argv[3], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[3]->ptr); return; } /* 默认30S */ if (timeout == 0) { timeout = 1000 * 30; } /* 提取maxbulks,用于以为每一个chunk能鞋底的bulk数目 */ long long maxbulks; if (getLongLongFromObject(c->argv[4], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[4]->ptr); return; } if (maxbulks == 0) { maxbulks = 200; } /* 最大512K */ if (maxbulks > 512 * 1024) { maxbulks = 512 * 1024; } /* 提取 maxbytes,用于决定单词迁移发送的最大字节数 */ long long maxbytes; if (getLongLongFromObject(c->argv[5], &maxbytes) != C_OK || !(maxbytes >= 0 && maxbytes <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbytes (%s)", (char *)c->argv[5]->ptr); return; } if (maxbytes == 0) { maxbytes = 512 * 1024; } if (maxbytes > INT_MAX / 2) { maxbytes = INT_MAX / 2; } /* 根据客户端配置的outbuf大小修正maxbytes */ maxbytes = slotsmgrtAsyncMaxBufferLimit(maxbytes); dict *hash_slot = NULL; long long numkeys = 0; if (!usekey) { /* 不是SLOTSMGRTTAGONE-ASYNC和SLOTSMGRTONE-ASYNC,即不指定key迁移 则提取slotnum */ long long slotnum; if (getLongLongFromObject(c->argv[6], &slotnum) != C_OK || !(slotnum >= 0 && slotnum < HASH_SLOTS_SIZE)) { addReplyErrorFormat(c, "invalid value of slot (%s)", (char *)c->argv[6]->ptr); return; } /* 获取hash_slot字典 */ hash_slot = c->db->hash_slots[slotnum]; /* 提取numkeys */ if (getLongLongFromObject(c->argv[7], &numkeys) != C_OK || !(numkeys >= 0 && numkeys <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of numkeys (%s)", (char *)c->argv[7]->ptr); return; } /* 若是numkeys为0就默认为每次迁移100 */ if (numkeys == 0) { numkeys = 100; } } /* DB是否正处于迁移状态 */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { addReplyError(c, "the specified DB is being migrated"); return; } /* 带有CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志的客户端是一个被阻塞正在等待操做结束的客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { addReplyError(c, "previous operation has not finished"); return; } /* 获取或建立一个slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getOrCreateSlotsmgrtAsyncClient(c->db->id, host, port, timeout); if (ac == NULL) { addReplyErrorFormat(c, "create client to %s:%d failed", host, (int)port); return; } /* 建立批处理迭代器 */ batchedObjectIterator *it = createBatchedObjectIterator(hash_slot, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, maxbytes); if (!usekey) { /* 建立一个链表ll,用于存放从hash_slot扫描出来的数据 */ list *ll = listCreate(); listSetFreeMethod(ll, decrRefCountVoid); for (int i = 2; i >= 0 && it->estimate_msgs < numkeys; i --) { unsigned long cursor = 0; if (i != 0) { cursor = random(); } else { if (htNeedsResize(hash_slot)) { dictResize(hash_slot); } } if (dictIsRehashing(hash_slot)) { dictRehash(hash_slot, 50); } int loop = numkeys * 10; if (loop < 100) { loop = 100; } do { /* slotsScanSdsKeyCallback里面会把扫描出来的key添加都ll中 */ cursor = dictScan(hash_slot, cursor, slotsScanSdsKeyCallback, ll); while (listLength(ll) != 0 && it->estimate_msgs < numkeys) { listNode *head = listFirst(ll); robj *key = listNodeValue(head); long msgs = estimateNumberOfRestoreCommands(c->db, key, it->maxbulks); if (it->estimate_msgs == 0 || it->estimate_msgs + msgs <= numkeys * 2) { batchedObjectIteratorAddKey(c->db, it, key); } listDelNode(ll, head); } /* */ } while (cursor != 0 && it->estimate_msgs < numkeys && dictSize(it->keys) < (unsigned long)numkeys && (-- loop) >= 0); } listRelease(ll); } else { /* 不然就是指定key的迁移 */ for (int i = 6; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } } /* 当前没有正在发送的消息 */ serverAssert(ac->sending_msgs == 0); /* 客户端阻塞链表也为空 */ serverAssert(ac->batched_iter == NULL && listLength(ac->blocked_list) == 0); ac->timeout = timeout; /* 更新最后使用时间 */ ac->lastuse = mstime(); ac->batched_iter = it; /* 在500ms内至少产生3条命令 */ ac->sending_msgs = slotsmgrtAsyncNextMessagesMicroseconds(ac, 3, 500); /* 判断db是否在迁移状态,若是是则阻塞 */ getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ac->sending_msgs != 0) { return; } notifySlotsmgrtAsyncClient(ac, NULL); ac->batched_iter = NULL; freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 1); } /* * * SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 1); } /* * * SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 0); } /* * * SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtTagSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 0); } /* * * SLOTSMGRT-ASYNC-FENCE * */ void slotsmgrtAsyncFenceCommand(client *c) { /* 获取异步迁移状态或者阻塞一个client */ int ret = getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ret == 0) { /* 没有阻塞,说明当前没有迁移任务 */ addReply(c, shared.ok); } else if (ret != 1) { /* 正常状况下若是客户端成功阻塞,会返回1 */ addReplyError(c, "previous operation has not finished (call fence again)"); } /* 返回1的状况下,客户端暂时不会受到任何返回,后续迁移完成后会收到最终通知 */ } /* * * SLOTSMGRT-ASYNC-CANCEL * */ void slotsmgrtAsyncCancelCommand(client *c) { addReplyLongLong(c, releaseSlotsmgrtAsyncClient(c->db->id, "interrupted: canceled")); } /* ============================ SlotsmgrtAsyncStatus ======================================= */ static void singleObjectIteratorStatus(client *c, singleObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "key"); addReplyBulk(c, it->key); fields ++; addReplyBulkCString(c, "val.type"); addReplyBulkLongLong(c, it->val == NULL ? -1 : it->val->type); fields ++; addReplyBulkCString(c, "stage"); addReplyBulkLongLong(c, it->stage); fields ++; addReplyBulkCString(c, "expire"); addReplyBulkLongLong(c, it->expire); fields ++; addReplyBulkCString(c, "cursor"); addReplyBulkLongLong(c, it->cursor); fields ++; addReplyBulkCString(c, "lindex"); addReplyBulkLongLong(c, it->lindex); fields ++; addReplyBulkCString(c, "zindex"); addReplyBulkLongLong(c, it->zindex); fields ++; addReplyBulkCString(c, "chunked_msgs"); addReplyBulkLongLong(c, it->chunked_msgs); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* batchedObjectIterator的状态 */ static void batchedObjectIteratorStatus(client *c, batchedObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "keys"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, dictSize(it->keys)); addReplyMultiBulkLen(c, dictSize(it->keys)); dictIterator *di = dictGetIterator(it->keys); dictEntry *de; while((de = dictNext(di)) != NULL) { addReplyBulk(c, dictGetKey(de)); } dictReleaseIterator(di); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, it->timeout); fields ++; addReplyBulkCString(c, "maxbulks"); addReplyBulkLongLong(c, it->maxbulks); fields ++; addReplyBulkCString(c, "maxbytes"); addReplyBulkLongLong(c, it->maxbytes); fields ++; addReplyBulkCString(c, "estimate_msgs"); addReplyBulkLongLong(c, it->estimate_msgs); fields ++; addReplyBulkCString(c, "removed_keys"); addReplyBulkLongLong(c, listLength(it->removed_keys)); fields ++; addReplyBulkCString(c, "chunked_vals"); addReplyBulkLongLong(c, listLength(it->chunked_vals)); fields ++; addReplyBulkCString(c, "iterators"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, listLength(it->list)); singleObjectIterator *sp = NULL; if (listLength(it->list) != 0) { sp = listNodeValue(listFirst(it->list)); } singleObjectIteratorStatus(c, sp); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* * * SLOTSMGRT-ASYNC-STATUS * */ void slotsmgrtAsyncStatusCommand(client *c) { /* */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL) { addReply(c, shared.nullmultibulk); return; } /* 预留MultiBulk长度 */ void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "host"); addReplyBulkCString(c, ac->host); fields ++; addReplyBulkCString(c, "port"); addReplyBulkLongLong(c, ac->port); fields ++; addReplyBulkCString(c, "used"); addReplyBulkLongLong(c, ac->used); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, ac->timeout); fields ++; addReplyBulkCString(c, "lastuse"); addReplyBulkLongLong(c, ac->lastuse); fields ++; addReplyBulkCString(c, "since_lastuse"); addReplyBulkLongLong(c, mstime() - ac->lastuse); fields ++; addReplyBulkCString(c, "sending_msgs"); addReplyBulkLongLong(c, ac->sending_msgs); /* 被阻塞的客户端的个数 */ fields ++; addReplyBulkCString(c, "blocked_clients"); addReplyBulkLongLong(c, listLength(ac->blocked_list)); fields ++; addReplyBulkCString(c, "batched_iterator"); batchedObjectIteratorStatus(c, ac->batched_iter); /* 设置MultiBulk长度 */ setDeferredMultiBulkLength(c, ptr, fields * 2); } /* ============================ SlotsmgrtExecWrapper ======================================= */ /* * * SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...] * */ void slotsmgrtExecWrapperCommand(client *c) { /* MultiBulk长度2 */ addReplyMultiBulkLen(c, 2); if (c->argc < 3) { addReplyLongLong(c, -1); addReplyError(c, "wrong number of arguments for SLOTSMGRT-EXEC-WRAPPER"); return; } /* 查找命令 */ struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); if (cmd == NULL) { addReplyLongLong(c, -1); addReplyErrorFormat(c,"invalid command specified (%s)", (char *)c->argv[2]->ptr); return; } if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) { addReplyLongLong(c, -1); addReplyErrorFormat(c, "wrong number of arguments for command (%s)", (char *)c->argv[2]->ptr); return; } /* 写的方式查找key */ if (lookupKeyWrite(c->db, c->argv[1]) == NULL) { addReplyLongLong(c, 0); addReplyError(c, "the specified key doesn't exist"); return; } /* 若是是写命令且 c->argv[1]正处于迁移状态,不会阻塞客户端 */ if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) { /* 返回1 */ addReplyLongLong(c, 1); addReplyError(c, "the specified key is being migrated"); return; } else { /* 返回2表示正常 */ addReplyLongLong(c, 2); robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2)); for (int i = 2; i < c->argc; i ++) { argv[i - 2] = c->argv[i]; incrRefCount(c->argv[i]); } /* 被重复引用计数的要减去 */ for (int i = 0; i < c->argc; i ++) { decrRefCount(c->argv[i]); } zfree(c->argv); c->argc = c->argc - 2; c->argv = argv; c->cmd = cmd; /* 调用被包装的命令 */ call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE); } } /* ============================ SlotsrestoreAsync Commands ================================= */ /* SLOTSRESTORE-ASYNC的回复 */ static void slotsrestoreReplyAck(client *c, int err_code, const char *fmt, ...) { va_list ap; va_start(ap, fmt); sds s = sdscatvprintf(sdsempty(), fmt, ap); va_end(ap); addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-ACK"); addReplyBulkLongLong(c, err_code); addReplyBulkSds(c, s); if (err_code != 0) { /* 若是有错误则回复以后关闭客户端 */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int verifyDumpPayload(unsigned char *p, size_t len); /* slotsrestore-async命令具体处理 */ static int slotsrestoreAsyncHandle(client *c) { /* 获取本节点上异步迁移状态,即便在迁移也不会阻塞这个client */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { /* 本节点当前db上正在执行迁移,不能响应slotsrestore-async命令 */ slotsrestoreReplyAck(c, -1, "the specified DB is being migrated"); return C_ERR; } const char *cmd = ""; /* 参数校验 */ if (c->argc < 2) { goto bad_arguments_number; } cmd = c->argv[1]->ptr; /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key [$ttl $arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 3) { goto bad_arguments_number; } robj *key = c->argv[2]; /* SLOTSRESTORE-ASYNC delete $key */ if (!strcasecmp(cmd, "delete")) { if (c->argc != 3) { goto bad_arguments_number; } /* 同步删除 */ int deleted = dbDelete(c->db, key); if (deleted) { /* 删除成功,通知全部watch该key的client */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; } /* 回复,成删除回复1,没有删除则返回0 */ slotsrestoreReplyAck(c, 0, deleted ? "1" : "0"); return C_OK; } /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl [$arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 4) { goto bad_arguments_number; } /* 提取ttl */ long long ttl; if (getLongLongFromObject(c->argv[3], &ttl) != C_OK || ttl < 0) { slotsrestoreReplyAck(c, -1, "invalid TTL value (TTL=%s)", c->argv[3]->ptr); return C_ERR; } /* SLOTSRESTORE-ASYNC expire $key $ttl */ if (!strcasecmp(cmd, "expire")) { /* 参数校验 */ if (c->argc != 4) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) == NULL) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过时设置 */ goto success_common; } /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ if (!strcasecmp(cmd, "string")) { /* 参数校验 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } /* 对val编码 */ robj *val = c->argv[4] = tryObjectEncoding(c->argv[4]); /* 添加到db */ dbAdd(c->db, key, val); /* 引用计数 */ incrRefCount(val); /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过时设置 */ goto success_common; } /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ if (!strcasecmp(cmd, "object")) { /* 参数校验 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } void *bytes = c->argv[4]->ptr; rio payload; /* 校验RDB序列化格式 */ if (verifyDumpPayload(bytes, sdslen(bytes)) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid payload checksum"); return C_ERR; } /* 初始化payload */ rioInitWithBuffer(&payload, bytes); /* 获取对象类型 */ int type = rdbLoadObjectType(&payload); if (type == -1) { slotsrestoreReplyAck(c, -1, "invalid payload type"); return C_ERR; } /* 获取值对象 */ robj *val = rdbLoadObject(type, &payload); if (val == NULL) { slotsrestoreReplyAck(c, -1, "invalid payload body"); return C_ERR; } /* 添加到db */ dbAdd(c->db, key, val); /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过时设置 */ goto success_common; } /* ========================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl $hint [$arg1, $arg2 ...] */ /* ========================================================== */ /* 参数校验 */ if (c->argc < 5) { goto bad_arguments_number; } /* 提取总长度hint */ long long hint; if (getLongLongFromObject(c->argv[4], &hint) != C_OK || hint < 0) { slotsrestoreReplyAck(c, -1, "invalid Hint value (Hint=%s)", c->argv[4]->ptr); return C_ERR; } int xargc = c->argc - 5; robj **xargv = &c->argv[5]; /* SLOTSRESTORE-ASYNC list $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "list")) { /* 查看key是否存在 */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* 若是key已经存在,则val类型必须为OBJ_LIST切编码类型必须为OBJ_ENCODING_QUICKLIST */ if (val->type != OBJ_LIST || val->encoding != OBJ_ENCODING_QUICKLIST) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_LIST, OBJ_ENCODING_QUICKLIST, val->type, val->encoding); return C_ERR; } } else { /* 不然key不存在 */ if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 常见Quicklist对象 */ val = createQuicklistObject(); /* 设置选项 */ quicklistSetOptions(val->ptr, server.list_max_ziplist_size, server.list_compress_depth); /* 添加到db */ dbAdd(c->db, key, val); } /* 将全部的args添加到val Quicklist中 */ for (int i = 0; i < xargc; i ++) { xargv[i] = tryObjectEncoding(xargv[i]); listTypePush(val, xargv[i], LIST_TAIL); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", listTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC hash $key $ttl $hint [$hkey1 $hval1 ...] */ if (!strcasecmp(cmd, "hash")) { /* 对于hash类型args必须是偶数 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,则类型必须为OBJ_HASH,编码类型必须为OBJ_ENCODING_HT */ if (val->type != OBJ_HASH || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_HASH, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就建立hash对象 */ val = createHashObject(); if (val->encoding != OBJ_ENCODING_HT) { hashTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 若是总长度不为0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint建立或者扩展ht */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0; i < xargc; i += 2) { /* field */ hashTypeTryObjectEncoding(val, &xargv[i], &xargv[i + 1]); /* value */ hashTypeSet(val, xargv[i], xargv[i + 1]); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", hashTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC dict $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "dict")) { /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,则类型必须为OBJ_SET,编码类型必须为OBJ_ENCODING_HT */ if (val->type != OBJ_SET || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_SET, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就建立set对象 */ val = createSetObject(); if (val->encoding != OBJ_ENCODING_HT) { setTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 若是总长度不为0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint建立或者扩展ht */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0; i < xargc; i ++) { /* feild */ xargv[i] = tryObjectEncoding(xargv[i]); /* val */ setTypeAdd(val, xargv[i]); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", setTypeSize(val)); goto success_common; } /* SLOTSRESTORE-ASYNC zset $key $ttl $hint [$elem1 $score1 ...] */ if (!strcasecmp(cmd, "zset")) { /* zset参数也必须是偶数,elem1和score配对 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 提取score */ double *scores = zmalloc(sizeof(double) * xargc / 2); for (int i = 1, j = 0; i < xargc; i += 2, j ++) { uint64_t bits; if (getUint64FromRawStringObject(xargv[i], &bits) != C_OK) { zfree(scores); slotsrestoreReplyAck(c, -1, "invalid zset score ([%d]), bad raw bits", j); return C_ERR; } scores[j] = convertRawBitsToDouble(bits); } /* */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* val已经存在,校验类型 */ if (val->type != OBJ_ZSET || val->encoding != OBJ_ENCODING_SKIPLIST) { zfree(scores); slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_ZSET, OBJ_ENCODING_SKIPLIST, val->type, val->encoding); return C_ERR; } } else { /* 不存在 */ if (xargc == 0) { zfree(scores); slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不然就建立zset对象 */ val = createZsetObject(); if (val->encoding != OBJ_ENCODING_SKIPLIST) { zsetConvert(val, OBJ_ENCODING_SKIPLIST); } /* 添加到db */ dbAdd(c->db, key, val); } zset *zset = val->ptr; /* 若是总长度不为0 */ if (hint != 0) { dict *ht = zset->dict; /* 就建立或修正hash大小为hint */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0, j = 0; i < xargc; i += 2, j ++) { robj *elem = xargv[i] = tryObjectEncoding(xargv[i]); dictEntry *de = dictFind(zset->dict, elem); if (de != NULL) { /* score */ double score = *(double *)dictGetVal(de); zslDelete(zset->zsl, score, elem); /* memeber */ dictDelete(zset->dict, elem); } /* 添加elem */ zskiplistNode *znode = zslInsert(zset->zsl, scores[j], elem); /* 引用计数 */ incrRefCount(elem); /* 添加score */ dictAdd(zset->dict, elem, &(znode->score)); /* 引用计数 */ incrRefCount(elem); } zfree(scores); /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", zsetLength(val)); goto success_common; } slotsrestoreReplyAck(c, -1, "unknown command (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; bad_arguments_number: slotsrestoreReplyAck(c, -1, "wrong number of arguments (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; success_common: /* ttl不为0就设置过时,不然就删除过时设置 */ if (ttl != 0) { setExpire(c->db, key, mstime() + ttl); } else { removeExpire(c->db, key); } /* 通知watched */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; return C_OK; } /* * * SLOTSRESTORE-ASYNC delete $key * expire $key $ttl * object $key $ttl $payload * string $key $ttl $payload * list $key $ttl $hint [$elem1 ...] * hash $key $ttl $hint [$hkey1 $hval1 ...] * dict $key $ttl $hint [$elem1 ...] * zset $key $ttl $hint [$elem1 $score1 ...] * */ void slotsrestoreAsyncCommand(client *c) { /* slotsrestore-async命令处理 */ if (slotsrestoreAsyncHandle(c) != C_OK) { c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } /* 目的实例发送SLOTSRESTORE-ASYNC-ACK的处理 */ static int slotsrestoreAsyncAckHandle(client *c) { /* 获取该db上对应的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c != c) { /* 必须是同一个客户端发送的SLOTSRESTORE-ASYNC-ACK才合法 */ addReplyErrorFormat(c, "invalid client, permission denied"); return C_ERR; } /* 参数校验,格式:SLOTSRESTORE-ASYNC-ACK $errno $message */ if (c->argc != 3) { addReplyError(c, "wrong number of arguments for SLOTSRESTORE-ASYNC-ACK"); return C_ERR; } long long errcode; if (getLongLongFromObject(c->argv[1], &errcode) != C_OK) { addReplyErrorFormat(c, "invalid errcode (%s)", (char *)c->argv[1]->ptr); return C_ERR; } /* 若是有错误这个就是错误的描述信息 */ const char *errmsg = c->argv[2]->ptr; if (errcode != 0) { /* 错误码不为0则打印错误 */ serverLog(LL_WARNING, "slotsmgrt_async: ack[%d] %s", (int)errcode, errmsg != NULL ? errmsg : "(null)"); return C_ERR; } /* batched_iter校验,理论上在有迁移状态下不能为NULL */ if (ac->batched_iter == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: null batched iterator"); addReplyError(c, "invalid iterator (NULL)"); return C_ERR; } /* 正在发送的消息个数(飞行中的消息) */ if (ac->sending_msgs == 0) { serverLog(LL_WARNING, "slotsmgrt_async: invalid message counter"); addReplyError(c, "invalid pending messages"); return C_ERR; } /* 更新slotsmgrtAsyncClient最后一次被使用的时间 */ ac->lastuse = mstime(); /* 飞行中的消息个数减一(即一条restore命令收到了一个ack) */ ac->sending_msgs -= 1; /* 继续产生新的restore命令(在给定10ms内至少产生2条消息) */ ac->sending_msgs += slotsmgrtAsyncNextMessagesMicroseconds(ac, 2, 10); /* 若是还有正在发送的消息(即发出去还没收到ACK) */ if (ac->sending_msgs != 0) { return C_OK; } /* 通知客户端 */ notifySlotsmgrtAsyncClient(ac, NULL); /* 获取批处理迭代器 */ batchedObjectIterator *it = ac->batched_iter; if (listLength(it->removed_keys) != 0) { /* 若是被移走的key个数不为0 */ list *ll = it->removed_keys; for (int i = 0; i < c->argc; i ++) { /* 遍历removed_keys链表,对其引用计数减一 */ decrRefCount(c->argv[i]); } /* 释放客户端当前的参数结构 */ zfree(c->argv); /* DEL key1 key2 key2 ... */ c->argc = 1 + listLength(ll); /* 分配argv结构 */ c->argv = zmalloc(sizeof(robj *) * c->argc); for (int i = 1; i < c->argc; i ++) { /* 遍历、填充argv */ listNode *head = listFirst(ll); /* 获取被移走的key */ robj *key = listNodeValue(head); /* 将其从db中删除 */ if (dbDelete(c->db, key)) { /* 通知key空间 */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; } /* 填充argv */ c->argv[i] = key; /* 引用计数 */ incrRefCount(key); /* 删除当前节点 */ listDelNode(ll, head); } /* 填充 argv[0] */ c->argv[0] = createStringObject("DEL", 3); /* 注意,虽然客户端发来的是SLOTSRESTORE-ASYNC-ACK命令, 可是此时咱们已经将其改写为一条DEL命令,该函数退出后,会被 propagate写到AOF文件和全部slaves */ } /* 用于存放使用chunked方式发生的val */ if (listLength(it->chunked_vals) != 0) { list *ll = it->chunked_vals; /* 遍历 chunked_vals链表 */ while (listLength(ll) != 0) { /* 头结点 */ listNode *head = listFirst(ll); /* 提取节点值 */ robj *o = listNodeValue(head); /* 引用计数 */ incrRefCount(o); /* 删除当前节点 */ listDelNode(ll, head); /* 若是当前对象refcount不为1就先decrRefCount */ if (o->refcount != 1) { decrRefCount(o); } else { /* 不然refcount为1就lazy释放 */ lazyReleaseObject(o); } } } ac->batched_iter = NULL; freeBatchedObjectIterator(it); return C_OK; } /* * * SLOTSRESTORE-ASYNC-ACK $errno $message * */ void slotsrestoreAsyncAckCommand(client *c) { /* 调用slotsrestoreAsyncAckHandle进一步处理 */ if (slotsrestoreAsyncAckHandle(c) != C_OK) { /* Close after writing entire reply. */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int time_independent_strcmp(const char *a, const char *b); /* * * SLOTSRESTORE-ASYNC-AUTH $passwd * */ void slotsrestoreAsyncAuthCommand(client *c) { if (!server.requirepass) { /* 若是服务端没有设置密码则返回错误 */ slotsrestoreReplyAck(c, -1, "Client sent AUTH, but no password is set"); return; } if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { /* 密码匹配成功则设置客户端的authenticated标志,并响应ok */ c->authenticated = 1; slotsrestoreReplyAck(c, 0, "OK"); } else { /* 密码匹配失败 */ c->authenticated = 0; slotsrestoreReplyAck(c, -1, "invalid password"); } } /* * * SLOTSRESTORE-ASYNC-SELECT $db * */ void slotsrestoreAsyncSelectCommand(client *c) { long long db; if (getLongLongFromObject(c->argv[1], &db) != C_OK || !(db >= 0 && db <= INT_MAX) || selectDb(c, db) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid DB index (%s)", c->argv[1]->ptr); } else { slotsrestoreReplyAck(c, 0, "OK"); } }