baiyannode
命令含义:将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出如今目标实例上,而当前实例上的 key 会被删除
命令格式:redis
MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]
命令实战:将键key一、key二、key3批量迁移到本机6380端口的redis实例上,并存储到目标实例的第0号数据库,超时时间为1000毫秒。可选项COPY若是表示不移除源实例上的 key ,REPLACE选项表示替换目标实例上已存在的 key 。KEYS选项表示能够同时批量传送多个keys(但前面的key参数的位置必须设置为空)数据库
127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3 OK
返回值:迁移成功时返回 OK ,不然返回错误数组
migrate命令的执行过程可分为参数校验、链接创建、组装数据、发送数据、处理返回五个阶段。一样的,migrate命令的处理函数为migrateCommand():缓存
void migrateCommand(client *c) { migrateCachedSocket *cs; // 链接另外一个实例的socket int copy = 0, replace = 0, j; // 是否开启copy及replace选项标记 char *password = NULL; // 密码 long timeout; // 超时时间 long dbid; // 数据库id robj **ov = NULL; /* 要迁移的对象 */ robj **kv = NULL; /* 键名 */ robj **newargv = NULL; rio cmd, payload; // 重要,存储目标实例执行的命令及DUMP的payload int may_retry = 1; int write_error = 0; int argv_rewritten = 0; /* 支持同时传输多个key. */ int first_key = 3; /* 第一个键参数的位置. */ int num_keys = 1; /* 默认只传送一个key. */ /* 校验其余选项,从COPY选项开始校验 */ for (j = 6; j < c->argc; j++) { int moreargs = j < c->argc-1; if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 若是命令参数等于copy,开启copy选项 copy = 1; } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 若是命令参数等于replace,开启replace选项 replace = 1; } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 若是命令参数等于auth,开启auth选项 if (!moreargs) { // 参数数量超出规定数量,报错 addReply(c,shared.syntaxerr); return; } j++; password = c->argv[j]->ptr; } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 若是设置了keys参数,代表要同时传输多个keys值过去 if (sdslen(c->argv[3]->ptr) != 0) { // 若是开启了keys选项,前面key参数的位置必须设置为空 addReplyError(c, "When using MIGRATE KEYS option, the key argument" " must be set to the empty string"); return; } first_key = j+1; num_keys = c->argc - j - 1; break; /*如今first_key值指向keys的第一个值.,并将num_keys设置为keys的数量 */ } else { addReply(c,shared.syntaxerr); return; } } /* 选择的db和超时时间数据校验,看是不是合法的数字格式 */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) { return; } if (timeout <= 0) timeout = 1000; /* 接下来会检查是否有能够迁移的键 */ ov = zrealloc(ov,sizeof(robj*)*num_keys); kv = zrealloc(kv,sizeof(robj*)*num_keys); int oi = 0; /* 检查全部的键,判断输入的键中,是否存在合法的键来进行迁移 */ for (j = 0; j < num_keys; j++) { if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去键空间字典中查找该键,若是该键没有超时 kv[oi] = c->argv[first_key+j]; // 将未超时的键存到kv数组中,说明当前key是能够migrate的;不然若是超时就没法进行migrate oi++; } } num_keys = oi; // 更新当前可migrate的key总量 if (num_keys == 0) { // 若是没有能够迁移的key,那么给客户端返回“NOKEY"字符串 zfree(ov); zfree(kv); addReplySds(c,sdsnew("+NOKEY\r\n")); return; }
刚开始执行migrate命令的时候,因为migrate参数不少,须要对其逐个作校验。尤为是在启用keys参数同时迁移多个keys的时候,须要进行参数的动态判断。同时须要判断是否有合法的键来进行迁移。只有没有过时的键才可以迁移,不然不进行迁移,最大化节省系统资源。dom
假如咱们要从当前6379端口上的redis实例迁移到6380端口上的redis实例,咱们必然要创建一个socket链接:socket
try_again: write_error = 0; /* 链接创建 */ cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); if (cs == NULL) { zfree(ov); zfree(kv); return; }
咱们看到,在主流程中调用了migrateGetSocket()函数建立了一个socket,这里是一个带缓存的socket。咱们暂时不跟进这个函数,后面我会以扩展的形式来跟进。函数
基于这个socket,咱们能够将数据以TCP协议中规定的字节流形式传输到目标实例上。这就须要一个序列化的过程了。6379实例须要将keys序列化,6380须要将数据反序列化。这就须要借助咱们以前讲过的DUMP命令和RESTORE命令,分别来进行序列化和反序列化了。
redis并无当即进行DUMP将key序列化,而是首先组装要在目标redis实例上所要执行的命令,好比AUTH/SELECT/RESTORE等命令。要想在目标实例上执行命令,那么必须一样基于以前创建的socket链接,以当前的redis实例做为客户端,往与目标redis实例创建的TCP链接中,写入按照redis协议封装的命令集合(如*2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis使用了本身封装的I/O抽象层rio,它实现了一个I/O缓冲区。经过读取其缓冲区中的数据,就能够往咱们在创建socket的时候生成的fd中写入数据啦。首先redis会创建一个rio缓冲区,并按照redis数据传输协议所要求的格式,组装要在目标实例上执行的redis命令:源码分析
// 初始化一个rio缓冲区 rioInitWithBuffer(&cmd,sdsempty()); /* 组装AUTH命令 */ if (password) { serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照redis协议写入一条命令开始的标识\*2。表示命令一共有2个参数 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 写入$4\r\n AUTH \r\n serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照协议格式写入密码 } /* 在目标实例上选择数据库 */ int select = cs->last_dbid != dbid; /* 判断是否已经选择过数据库,若是选择过就不用再次执行SELECT命令 */ if (select) { // 若是没有选择过,须要执行SELECT命令选择数据库 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,写入开始表示\*2 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,写入$6\r\n SELECT \r\n serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 写入$1\r\n 1 \r\n }
那么接下来须要进行DUMP的序列化操做了。因为序列化操做耗时较久,因此可能出现这种状况:在以前第一次检测是否超时的时候没有超时,可是因为此次序列化操做时间较久,执行期间,这个键超时了,那么redis简单粗暴地丢弃该超时键,直接放弃迁移这个键:.net
int non_expired = 0; // 暂存新的未过时的键的数量 /* 若是在DUMP的过程当中过时了,直接continue. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } /* 通过上面的筛选以后,都是最新的、没有过时的键,这些键能够最终被迁移了. */ kv[non_expired++] = kv[j];
而后,在目标实例上最终咱们须要执行RESTORE命令,将以前通过DUMP序列化的字节流反序列化,过程和上面同理:
serverAssertWithInfo(c,NULL, rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,写入开始表示\*5或4 if (server.cluster_enabled) // 若是集群模式开启 serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); else serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,写入$7 RESTORE \r\n serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, sdslen(kv[j]->ptr))); // 将全部须要反序列化的key写入 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入过时时间
接下来,咱们就须要最终执行DUMP命令,将咱们须要传输的全部键等数据序列化了,这里redis调用了createDumpPayload()来建立一个DUMP载荷,这就是最终序列化好的数据:
createDumpPayload(&payload,ov[j],kv[j]); // 序列化数据 serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); // 将序列化数据存到rio cmd中等待发送 sdsfree(payload.io.buffer.ptr); if (replace) serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace选项开启
目前,咱们须要发送的、按照redis协议组装好的全部序列化好的命令及数据都存放在了cmd这个rio结构体变量缓存中。咱们当前的6379redis实例仿佛就是一个客户端,而要传输的目标实例6380就是一个服务端。接下来就须要读取缓存而且往直前创建好的socket中写入数据,将数据最终传输至目标实例:
errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; int nwritten = 0; while ((towrite = sdslen(buf)-pos) > 0) { towrite = (towrite > (64*1024) ? (64*1024) : towrite); //按照64K的块大小来发送 nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往socket fd中写入数据(数据来源于rio的缓存) if (nwritten != (signed)towrite) { write_error = 1; goto socket_err; } pos += nwritten; } }
在目标redis上分别执行AUTH、SELECT、RESTORE命令,RESTORE命令会反序列化并将key写入目标实例。那么这几个命令执行完毕以后,咱们如何知道它们是否执行成功呢?一样的,目标redis 6380实例在执行完命令以后,也会有相应的返回值,咱们须要根据返回值来判断命令是否执行成功、是否将key成功迁移完成:
char buf0[1024]; /* 存储AUTH命令返回值. */ char buf1[1024]; /* 存储SELECT命令返回值 */ char buf2[1024]; /* 存储RESTORE命令返回值. */ /* 从socket fd中读取AUTH命令返回值. */ if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0) goto socket_err; /* 从socket fd中读取SELECT命令返回值. */ if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0) goto socket_err; int error_from_target = 0; int socket_error = 0; int del_idx = 1; /* 迁移完成以后须要将原有实例上的key删除 */ if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); for (j = 0; j < num_keys; j++) { /* 从socket fd中读取RESTORE命令返回值 */ if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) { socket_error = 1; break; } if ((password && buf0[0] == '-') || (select && buf1[0] == '-') || buf2[0] == '-') { if (!error_from_target) { ... } else { if (!copy) { // 没有开启copy选项,须要删除原有实例的键 ... /* 删除原有实例上的键 */ dbDelete(c->db,kv[j]); ... } } } ... /* 若是发生socket错误,关闭链接 */ if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); ... sdsfree(cmd.io.buffer.ptr); // 释放cmd的rio缓冲区 zfree(ov); zfree(kv); zfree(newargv); // 释放存储key的robj结构体 return;
综上,migrate命令就执行完成了。咱们总结一下它的执行过程:
- 命令参数校验
- 按照redis协议组装目标实例上须要执行的命令
- 将要传输的key序列化
- 建立socket链接
- 经过socket链接将命令及数据传输至目标实例
- 目标实例执行命令并存储相应的key
- 处理目标实例的返回值
- 若是失败执行重试逻辑,若是成功则执行完毕
在migrate命令执行过程当中,调用了migrateGetSocket()建立socket。redis借助字典结构,实现了缓存socket,避免了屡次建立socket所带来的开销:
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) { int fd; sds name = sdsempty(); migrateCachedSocket *cs; /* 查找字典中是否有相应 ip:port 的缓存socket. */ name = sdscatlen(name,host->ptr,sdslen(host->ptr)); name = sdscatlen(name,":",1); name = sdscatlen(name,port->ptr,sdslen(port->ptr)); // 查找字典 cs = dictFetchValue(server.migrate_cached_sockets,name); if (cs) { // 若是找到了,说明以前建立过ip:port的socket sdsfree(name); cs->last_use_time = server.unixtime; return cs; // 直接返回缓存socket } /* 若是在字典中没有找到,说明没有缓存,须要从新建立. */ /* 判断是否缓存的socket过多,最大为64个 */ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { /* 若是字典中缓存的socket过多,须要随机删除一些 */ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); cs = dictGetVal(de); close(cs->fd); zfree(cs); dictDelete(server.migrate_cached_sockets,dictGetKey(de)); } /* 建立socket */ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, atoi(c->argv[2]->ptr)); if (fd == -1) { sdsfree(name); addReplyErrorFormat(c,"Can't connect to target node: %s", server.neterr); return NULL; } anetEnableTcpNoDelay(server.neterr,fd); /* 检查是否在超时时间内建立完成 */ if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { sdsfree(name); addReplySds(c, sdsnew("-IOERR error or timeout connecting to the client\r\n")); close(fd); return NULL; } /* 将新建立的socket加入缓存并返回给调用者 */ cs = zmalloc(sizeof(*cs)); cs->fd = fd; cs->last_dbid = -1; cs->last_use_time = server.unixtime; // 将新建立的socket加入字典,缓存起来等待下次使用 dictAdd(server.migrate_cached_sockets,name,cs); return cs; }