近期咱们在对Redis作大规模迁移升级的时候,采用模拟复制协议的方式进行数据传输同步。redis
在此期间,咱们遇到以下两个问题:测试
针对第一个问题,Redis 过时时间不一致问题,经过测试而且查阅Redis源码中得出以下结论:
Redis社区版本在正常的主从复制也会出现过时时间不一致问题,主要是因为在主从进行全同步期间,若是主库此时有expire 命令,那么到从库中,该命令将会被延迟执行。由于全同步须要耗费时间,数据量越大,那么过时时间差距就越大。
Redis expire 命令主要实现以下:ui
expireGenericCommand(c,mstime(),UNIT_SECONDS); void expireGenericCommand(redisClient *c, long long basetime, int unit) { robj *key = c->argv[1], *param = c->argv[2]; long long when; /* unix time in milliseconds when the key will expire. */ if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK) return; if (unit == UNIT_SECONDS) when *= 1000; when += basetime;
expire 600 到redis中过时时间实际上是(当前timestamp+600)*1000
,最终Redis会存储计算后这个值在Redis中。因此上面提到的状况,等到命令到从库的时候,当前的timestamp跟以前的timestamp不同了,特别是发生在全同步
后的expire命令,延迟时间基本上等于全同步的数据,最终形成过时时间不一致。this
这个问题其实已是官方的已知问题,解决方案有两个:unix
1. 业务采用expireat timestamp 方式,这样命令传送到从库就没有影响。 2. 在Redis代码中将expire命令转换为expireat命令。
官方没有作第二个选择,反而是提供expireat命令来给用户选择。其实从另一个角度来看,从库的过时时间大于主库的过时时间,其实影响不大。由于主库会主动触发过时删除,若是该key删除以后,主库也会向从库发送删除的命令。可是若是主库的key已经到了过时时间,redis没有及时进行淘汰,这个时候访问从库该key,那么这个key是不会被触发淘汰的,这样若是对于过时时间要求很是苛刻的业务仍是会有影响的。
并且目前针对于咱们大规模迁移的时间,在进行过时时间校验的时候,发现大量key的过时时间都不一致,这样也不利于咱们进行校验。code
因此针对第一个问题,咱们将expire/pexpire/setex/psetex 命令在复制到从库的时候转换成时间戳的方式,好比expire 转成expireat命令,setex转换成set和expireat命令
,具体实现以下:orm
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & REDIS_PROPAGATE_REPL) { if (!strcasecmp(argv[0]->ptr,"expire") || !strcasecmp(argv[0]->ptr,"setex") || !strcasecmp(argv[0]->ptr,"pexpire") || !strcasecmp(argv[0]->ptr,"psetex") ) { long long when; robj *tmpargv[3]; robj *tmpexpire[3]; argv[2] = getDecodedObject(argv[2]); when = strtoll(argv[2]->ptr,NULL,10); if (!strcasecmp(argv[0]->ptr,"expire") || !strcasecmp(argv[0]->ptr,"setex")) { when *= 1000; } when += mstime(); /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ if (!strcasecmp(argv[0]->ptr,"expire") || !strcasecmp(argv[0]->ptr,"pexpire")) { tmpargv[0] = createStringObject("PEXPIREAT",9); tmpargv[1] = getDecodedObject(argv[1]); tmpargv[2] = createStringObjectFromLongLong(when); replicationFeedSlaves(server.slaves,dbid,tmpargv,argc); decrRefCount(tmpargv[0]); decrRefCount(tmpargv[1]); decrRefCount(tmpargv[2]); } /* Translate SETEX/PSETEX to SET and PEXPIREAT */ if (!strcasecmp(argv[0]->ptr,"setex") || !strcasecmp(argv[0]->ptr,"psetex")) { argc = 3; tmpargv[0] = createStringObject("SET",3); tmpargv[1] = getDecodedObject(argv[1]); tmpargv[2] = getDecodedObject(argv[3]); replicationFeedSlaves(server.slaves,dbid,tmpargv,argc); tmpexpire[0] = createStringObject("PEXPIREAT",9); tmpexpire[1] = getDecodedObject(argv[1]); tmpexpire[2] = createStringObjectFromLongLong(when); replicationFeedSlaves(server.slaves,dbid,tmpexpire,argc); decrRefCount(tmpargv[0]); decrRefCount(tmpargv[1]); decrRefCount(tmpargv[2]); decrRefCount(tmpexpire[0]); decrRefCount(tmpexpire[1]); decrRefCount(tmpexpire[2]); } } else { replicationFeedSlaves(server.slaves,dbid,argv,argc); } } }
目前上述修改已经应用到线上迁移环境中,上线之后Redis过时时间不一致问题解决,目前迁移先后的过时时间是严格保持一致的。server
针对于第二个问题,Redis key 迁移先后数量不一致问题,其实在Redis社区版本的主从复制中,也会常常出现key数量不一致。其中一个很是关键的问题是,redis在作主从复制的时候,会对当前的存量数据作一个RDB快照(bgsave命令),而后将RDB快照传给从库,从库会解析RDB文件而且load到内存中。然儿在上述的两个步骤中Redis会忽略过时的key:ip
1. 主库在作RDB快照文件的时候,发现key已通过期了,则此时不会将过时的key写到RDB文件中。 2. 从库在load RDB文件到内存中的时候,发现key已通过期了,则此时不会将过时的key load进去。
因此针对上述两个问题会形成Redis主从key不一致问题,这个对于咱们作数据校验的时候会有些影响,因始终以为key不一致,可是不影响业务逻辑。
针对上述问题,目前咱们将以上两个步骤都改成不忽略过时key
,过时key的删除统一由主库触发删除,而后将删除命令传送到从库中。这样key的数量就彻底一致了。最终在打上以上两个patch以后,再进行迁移测试的时候,验证key过时时间以及数量都是彻底一致的。
最后贴上以上修改的代码(针对于社区版本Redis 3.0.7):内存
如下代码修改均在我标记注释
的下面
一、作bgsave的时候不忽略过时的key
rdb.c
/* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, long long now) { /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ /* 注释下面这一行 */ /* if (expiretime < now) return 0; */ if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val) == -1) return -1; return 1; }
二、作bgrewirteaof 的时候不忽略过时的key
aof.c
int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); char byte; size_t processed = 0; /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } server.aof_child_diff = sdsempty(); rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* If this key is already expired skip it */ /* 注释下面这一行 */ /* if (expiretime != -1 && expiretime < now) continue; */
三、在load rdb 的时候不忽略过时key
rdb.c
int rdbLoad(char *filename) { uint32_t dbid; int type, rdbver; redisDb *db = server.db+0; char buf[1024]; long long expiretime, now = mstime(); FILE *fp; rio rdb; if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR; rioInitWithFile(&rdb,fp); rdb.update_cksum = rdbLoadProgressCallback; rdb.max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(&rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); errno = EINVAL; return REDIS_ERR; } rdbver = atoi(buf+5); if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) { fclose(fp); redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); errno = EINVAL; return REDIS_ERR; } startLoading(fp); while(1) { robj *key, *val; expiretime = -1; /* Read type. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if (type == REDIS_RDB_OPCODE_EXPIRETIME) { if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; /* the EXPIRETIME opcode specifies time in seconds, so convert * into milliseconds. */ expiretime *= 1000; } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) { /* Milliseconds precision expire times introduced with RDB * version 3. */ if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; } if (type == REDIS_RDB_OPCODE_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_RDB_OPCODE_SELECTDB) { if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } db = server.db+dbid; continue; } /* Read key */ if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ /* 注释下面5行 */ /* if (server.masterhost == NULL && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); continue; } */ /* Add the new object in the hash table */ dbAdd(db,key,val); /* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); decrRefCount(key); }
注意上述修改在内存策略为noeviction
一直有效,可是其余内存策略只能在Redis 使用内存小于最大内存的时候才会有效,由于从库在使用内存超过最大内存的时候也会触发淘汰,这个时候也无法彻底保证数据一致性了。