如何解决Redis 主从数据不一致问题

线上问题

近期咱们在对Redis作大规模迁移升级的时候,采用模拟复制协议的方式进行数据传输同步。redis

在此期间,咱们遇到以下两个问题:测试

  1. 迁移先后Redis过时时间不一致。
  2. 迁移先后Redis key 数量不一致。

迁移先后Redis过时时间不一致

针对第一个问题,Redis 过时时间不一致问题,经过测试而且查阅Redis源码中得出以下结论:
Redis社区版本在正常的主从复制也会出现过时时间不一致问题,主要是因为在主从进行全同步期间,若是主库此时有expire 命令,那么到从库中,该命令将会被延迟执行。由于全同步须要耗费时间,数据量越大,那么过时时间差距就越大。
Redis expire 命令主要实现以下:ui

expireGenericCommand(c,mstime(),UNIT_SECONDS);

void expireGenericCommand(redisClient *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
        return;
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;

expire 600 到redis中过时时间实际上是(当前timestamp+600)*1000,最终Redis会存储计算后这个值在Redis中。因此上面提到的状况,等到命令到从库的时候,当前的timestamp跟以前的timestamp不同了,特别是发生在全同步后的expire命令,延迟时间基本上等于全同步的数据,最终形成过时时间不一致。this

这个问题其实已是官方的已知问题,解决方案有两个:unix

1. 业务采用expireat timestamp 方式,这样命令传送到从库就没有影响。
2. 在Redis代码中将expire命令转换为expireat命令。

官方没有作第二个选择,反而是提供expireat命令来给用户选择。其实从另一个角度来看,从库的过时时间大于主库的过时时间,其实影响不大。由于主库会主动触发过时删除,若是该key删除以后,主库也会向从库发送删除的命令。可是若是主库的key已经到了过时时间,redis没有及时进行淘汰,这个时候访问从库该key,那么这个key是不会被触发淘汰的,这样若是对于过时时间要求很是苛刻的业务仍是会有影响的。
并且目前针对于咱们大规模迁移的时间,在进行过时时间校验的时候,发现大量key的过时时间都不一致,这样也不利于咱们进行校验。code

因此针对第一个问题,咱们将expire/pexpire/setex/psetex 命令在复制到从库的时候转换成时间戳的方式,好比expire 转成expireat命令,setex转换成set和expireat命令,具体实现以下:orm

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL) {
        if (!strcasecmp(argv[0]->ptr,"expire") ||
            !strcasecmp(argv[0]->ptr,"setex") ||
            !strcasecmp(argv[0]->ptr,"pexpire") ||
            !strcasecmp(argv[0]->ptr,"psetex") ) {
            long long when;
            robj *tmpargv[3];
            robj *tmpexpire[3];
            argv[2] = getDecodedObject(argv[2]);
            when = strtoll(argv[2]->ptr,NULL,10);
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"setex")) {
                    when *= 1000;
            }    
            when += mstime();
            /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"pexpire")) {
                tmpargv[0] = createStringObject("PEXPIREAT",9);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
            }    
            /* Translate SETEX/PSETEX to SET and PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"setex") ||
                !strcasecmp(argv[0]->ptr,"psetex")) {
                argc = 3;
                tmpargv[0] = createStringObject("SET",3);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = getDecodedObject(argv[3]);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                tmpexpire[0] = createStringObject("PEXPIREAT",9);
                tmpexpire[1] = getDecodedObject(argv[1]);
                tmpexpire[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpexpire,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
                decrRefCount(tmpexpire[0]);
                decrRefCount(tmpexpire[1]);
                decrRefCount(tmpexpire[2]);
            }
        } else {
                replicationFeedSlaves(server.slaves,dbid,argv,argc);
        }
}
}

目前上述修改已经应用到线上迁移环境中,上线之后Redis过时时间不一致问题解决,目前迁移先后的过时时间是严格保持一致的。server

迁移先后Redis key 数量不一致

针对于第二个问题,Redis key 迁移先后数量不一致问题,其实在Redis社区版本的主从复制中,也会常常出现key数量不一致。其中一个很是关键的问题是,redis在作主从复制的时候,会对当前的存量数据作一个RDB快照(bgsave命令),而后将RDB快照传给从库,从库会解析RDB文件而且load到内存中。然儿在上述的两个步骤中Redis会忽略过时的key:ip

1. 主库在作RDB快照文件的时候,发现key已通过期了,则此时不会将过时的key写到RDB文件中。
2. 从库在load RDB文件到内存中的时候,发现key已通过期了,则此时不会将过时的key load进去。

因此针对上述两个问题会形成Redis主从key不一致问题,这个对于咱们作数据校验的时候会有些影响,因始终以为key不一致,可是不影响业务逻辑。
针对上述问题,目前咱们将以上两个步骤都改成不忽略过时key,过时key的删除统一由主库触发删除,而后将删除命令传送到从库中。这样key的数量就彻底一致了。
最终在打上以上两个patch以后,再进行迁移测试的时候,验证key过时时间以及数量都是彻底一致的。
最后贴上以上修改的代码(针对于社区版本Redis 3.0.7):内存

如下代码修改均在我标记注释的下面

一、作bgsave的时候不忽略过时的key

rdb.c

/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actually saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        /* 注释下面这一行 */
        /* if (expiretime < now) return 0; */                                                                                                                                                
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }   

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

二、作bgrewirteaof 的时候不忽略过时的key

aof.c

int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    char byte;
    size_t processed = 0;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }   

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }   

        /* SELECT the new DB */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */  
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
 
            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);
 
            expiretime = getExpire(db,&key);
 
            /* If this key is already expired skip it */
            /* 注释下面这一行 */
            /* if (expiretime != -1 && expiretime < now) continue; */

三、在load rdb 的时候不忽略过时key

rdb.c

int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;

    rioInitWithFile(&rdb,fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    startLoading(fp);
    while(1) {
        robj *key, *val;
        expiretime = -1;

        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
 
        if (type == REDIS_RDB_OPCODE_EOF)
            break;
 
        /* Handle SELECT DB opcode as a special case */
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
         /* 注释下面5行 */
        /* if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
             continue;
        } */
        /* Add the new object in the hash table */
        dbAdd(db,key,val);
 
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);
 
        decrRefCount(key);
    }

总结

注意上述修改在内存策略为noeviction一直有效,可是其余内存策略只能在Redis 使用内存小于最大内存的时候才会有效,由于从库在使用内存超过最大内存的时候也会触发淘汰,这个时候也无法彻底保证数据一致性了。

相关文章
相关标签/搜索