Redis 源码分析之主从复制(4)

在上一篇文章,主要介绍了主从复制流程中 slave 的状态机流转,本篇文章中,将作相应的 master 逻辑的相关分析。node

主从建链与握手阶段

slave 在向 master 发起 TCP 建链,以及复制握手过程当中,master 一直把 slave 当成一个普通的 client 来处理。也就是说,不为 slave 保存状态,只是收到 slave 发来的命令进而处理并回复而已。

PING 命令处理

握手过程当中,首先 slave 会发过来一个 PING 命令,master 使用 pingCommand 函数来进行处理。回复字符串 +PONG,仍是权限错误,视状况而定。redis

AUTH 命令处理

可能会有一个鉴权过程,master 收到 slave 发来 AUTH 命令,使用 authCommand 函数进行处理,代码大概以下,数据库

void authCommand(client *c) {
    if (!server.requirepass) { // 未设置 auth passwd
        addReplyError(c,"Client sent AUTH, but no password is set");
    } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
      c->authenticated = 1;
      addReply(c,shared.ok);
    } else {
      c->authenticated = 0;
      addReplyError(c,"invalid password");
    }
}

client 的 authenticated 属性代表 server 是否设置了鉴权。缓存

REPLCONF 命令处理

接下来就是 REPLCONF 命令,相应处理函数为 replconfCommand,用于保存 slave 告知的端口号、地址和能力等。该函数代码逻辑基本以下,网络

首先进行必要的参数校验,命令格式为 REPLCONF <option> <value> <option> <value> ...,能够看出,后面的参数值是成对出现的,加上 REPLCONF 自己,参数个数确定是奇数个,那么偶数个就确定是有问题的。并发

if ((c->argc % 2) == 0) {
    /* Number of arguments must be odd to make sure that every
     * option has a corresponding value. */
    addReply(c,shared.syntaxerr);
    return;
}

接着,匹配到各选项分别处理,目前支持的选项有 listening-port、ip-address、capa、ack 和 getack,不支持的选项在报错后会返回,代码处理以下,app

for (j = 1; j < c->argc; j+=2) {
    if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
        long port;

        if ((getLongFromObjectOrReply(c,c->argv[j+1],
                &port,NULL) != C_OK))
            return;
        c->slave_listening_port = port;
    } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
        sds ip = c->argv[j+1]->ptr;
        if (sdslen(ip) < sizeof(c->slave_ip)) {
            memcpy(c->slave_ip,ip,sdslen(ip)+1);
        } else {
            addReplyErrorFormat(c,"REPLCONF ip-address provided by "
                "slave instance is too long: %zd bytes", sdslen(ip));
            return;
        }
    } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
        /* Ignore capabilities not understood by this master. */
        if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
            c->slave_capa |= SLAVE_CAPA_EOF;
    } else if {
        .....
    }
    } else {
        addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
            (char*)c->argv[j]->ptr);
        return;
    }
}

主从复制阶段

接下来,slave 会向 master 发送 SYNC/PSYNC 命令,请求进行彻底重同步或者部分重同步。master 为 slave 保存的状态记录在 client 的 replstate 属性中。less

从 master 的角度看,slave 须要经历的以下状态:
SLAVE_STATE_WAIT_BGSAVE_STARTSLAVE_REPL_WAIT_BGSAVE_ENDSLAVE_REPL_SEND_BULKSLAVE_REPL_ONLINE
状态转换图在前一篇文章开头画过,能够找下作参考。socket

SYNC/PSYNC 命令处理

SYNC/PSYNC 命令的处理函数为 syncCommandtcp

前置 check

首先,须要作一些必要的 check。

/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;

// 本节点是其余节点的 slave,可是尚未同步好数据,
// 此时不能为本节点的 slave 进行数据同步(由于数据不全)
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
    addReplyError(c,"Can't SYNC while not connected with my master");
    return;
}

/* 由于 master 接下来须要为该 slave 进行后台 RDB 数据转储了,
 * 同时须要将前台接收到的其余 client 命令请求缓存到该 slave client 的输出缓存中,
 * 这就须要一个彻底清空的输出缓存,才能为该 slave 保存从执行 BGSAVE 开始的命令流。
 *
 * 在 master 收到 slave 发来的 SYNC(PSYNC)命令以前,二者之间的交互信息都是比较短的,
 * 所以,在网络正常的状况下,slave client 中的输出缓存应该是很容易就发送给该 slave,并清空的。
 * 因此,若是不为空,说明可能有问题 */
if (clientHasPendingReplies(c)) {
    addReplyError(c,"SYNC and PSYNC are invalid with pending output");
    return;
}

彻底重同步 or 部分重同步

下面就开始进入正题,SYNC/PSYNC 命令进行了区别对待。

// slave 发来 psync 命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
    if (masterTryPartialResynchronization(c) == C_OK) {
        server.stat_sync_partial_ok++;
        return; /* No full resync needed, return. */
    } else {
        char *master_runid = c->argv[1]->ptr;

        /* Increment stats for failed PSYNCs, but only if the
         * runid is not "?", as this is used by slaves to force a full
         * resync on purpose when they are not albe to partially
         * resync. */
        if (master_runid[0] != '?') server.stat_sync_partial_err++;
    }
// slave 发来 sync 命令
} else {
    /* If a slave uses SYNC, we are dealing with an old implementation
     * of the replication protocol (like redis-cli --slave). Flag the client
     * so that we don't expect to receive REPLCONF ACK feedbacks. */
    c->flags |= CLIENT_PRE_PSYNC; // 老版本实例
}

从上面代码能够看出,当须要进行部分重同步时,函数会直接返回,不然,开始着手处理彻底重同步的状况,此时 master 要执行一次 rdb 。

处理 PSYNC 命令的函数是 masterTryPartialResynchronization,该函数经过返回值来进行区分是否进行部分重同步,C_OK 表示部分重同步,C_ERR 表示彻底重同步,下面进行具体分析。

首先,把本身的 runid 与 slave 发来的 master_runid 相匹配,若是不匹配,说明是一个新的 slave,此时须要进行彻底重同步,代码以下。

char *master_runid = c->argv[1]->ptr;
... ...

if (strcasecmp(master_runid, server.runid)) {

    // slave 经过发送 runid 为 `?` 来触发一次彻底重同步。
    if (master_runid[0] != '?') {
        serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
            "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
            master_runid, server.runid);
    } else {
        serverLog(LL_NOTICE,"Full resync requested by slave %s",
            replicationGetSlaveName(c));
    }
    goto need_full_resync;
}

而后,取出 slave 的复制偏移量 psync_offset,master 据此来判断是否能够进行彻底重同步,关于复制偏移量的问题,前面的文章已经提过。

if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
    C_OK) goto need_full_resync;
if (!server.repl_backlog ||
    psync_offset < server.repl_backlog_off ||
    psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
    serverLog(LL_NOTICE,
        "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
    if (psync_offset > server.master_repl_offset) {
        serverLog(LL_WARNING,
            "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
    }
    goto need_full_resync;
}

以上出现的两种须要进行彻底重同步的状况,都会进入 need_full_resync 的逻辑,最后返回 C_ERR

need_full_resync:
    /* We need a full resync for some reason... Note that we can't
     * reply to PSYNC right now if a full SYNC is needed. The reply
     * must include the master offset at the time the RDB file we transfer
     * is generated, so we need to delay the reply to that moment. */
    return C_ERR;

不然,表示须要进行部分重同步,进行相应变量的初始化,返回C_OK

c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;

listAddNodeTail(server.slaves,c);

// 这里不能用输出缓存,由于输出缓存只能用于累积命令流。
// 以前 master 向 slave 发送的信息不多,所以内核的输出缓存中应该会有空间,
// 因此,这里直接的 write 操做通常不会出错。

// 回复 slave +CONTINUE
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
    freeClientAsync(c);
    return C_OK;
}
// 将积压队列中 psync_offset 以后的数据复制到客户端输出缓存中
psync_len = addReplyReplicationBacklog(c,psync_offset);

/* Note that we don't need to set the selected DB at server.slaveseldb
 * to -1 to force the master to emit SELECT, since the slave already
 * has this state from the previous connection with the master. */

// 更新当前状态正常的 slave 数量
refreshGoodSlavesCount();
return C_OK; /* The caller can return, no full resync needed. */

addReplyReplicationBacklog 函数的逻辑也已经在前面讲过。

彻底重同步过程

首先,一些变量的更新,将 replstate 更新为 SLAVE_STATE_WAIT_BGSAVE_START 状态。

server.stat_sync_full++;

/* Setup the slave as one waiting for BGSAVE to start. The following code
    * paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
    anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);

彻底重同步时,master 须要作一次 rdb。后台 rdb 数据生成时须要作 fork,这对性能是有所牺牲的,因此要先看下是否有现成的 rdb 数据能够复用。分如下 3 种清理,

【1】若是后台有 rdb 任务在执行,而且使用的是有硬盘复制的方式(将 rdb 数据保存在本地临时文件),而后发送给 slave。

/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
    server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
    /* Ok a background save is in progress. Let's check if it is a good
     * one for replication, i.e. if there is another slave that is
     * registering differences since the server forked to save. */
    client *slave;
    listNode *ln;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        slave = ln->value;
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
    }

    /* To attach this slave, we check that it has at least all the
     * capabilities of the slave that triggered the current BGSAVE. */
    if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {

        /* Perfect, the server is already registering differences for
         * another slave. Set the right state, and copy the buffer. */
        copyClientOutputBuffer(c,slave);
        replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
        serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
    } else {

        /* No way, we need to wait for the next BGSAVE in order to
         * register differences. */
        serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
    }
}

代码中,在 master 全部 slave 中找到一个处于 SLAVE_STATE_WAIT_BGSAVE_END 状态的 slaveX。
将 slaveX 输出缓存内容 copy 一份给当前的 client,而后调用函数 replicationSetupSlaveForFullResync,将 client 状态设置为 SLAVE_STATE_WAIT_BGSAVE_END,并发送 +FULLRESYNC 回复,代码以下,

int replicationSetupSlaveForFullResync(client *slave, long long offset) {
    char buf[128];
    int buflen;

    slave->psync_initial_offset = offset;
    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;

    /* We are going to accumulate the incremental changes for this
     * slave as well. Set slaveseldb to -1 in order to force to re-emit
     * a SELECT statement in the replication stream. */
    server.slaveseldb = -1;

    /* Don't send this reply to slaves that approached us with
     * the old SYNC command. */
    if (!(slave->flags & CLIENT_PRE_PSYNC)) {
        buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                          server.runid,offset);
        if (write(slave->fd,buf,buflen) != buflen) {
            return C_ERR;
        }
    }
    return C_OK;
}

这个函数主要作了如下 4 件事:

  • 设置 slave 的 psync_initial_offset 属性,方便后面再进来的 slave,能够最大限度的复用。
  • 设置 slave 的当前状态为 WAIT_BGSAVE_END,代表 slave 能够从这个点来累积前台发过来的命令流,并等待 rdb 转储完成。
  • 设置 slave 的 slaveseldb 属性为 -1,这样能够在开始累积命令流时,强制增长一条 SELECT 命令到客户端输出缓存中,以避免第一条命令没有选择数据库。
  • 给 slave 一个 +FULLRESYNC 的回复。

该函数应当在如下 2 个时刻当即被调用:

  • 由复制而发起的一次成功的 bgsave 以后;
  • 找到了一个能够复用的 slave 以后。

若是找不到一个能够复用的 slave,那么 master 须要在当前的 bgsave 操做完成以后,再执行一次。

【2】若是后台有 rdb 任务在执行,而且使用的是无硬盘复制的方式。

此时,当前 slave 没法重用 rdb 数据,必须在当前的 bgsave 操做完成以后,再执行一次。代码以下,

/* CASE 2: BGSAVE is in progress, with socket target. */
else if (server.rdb_child_pid != -1 &&
            server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
    /* There is an RDB child process but it is writing directly to
     * children sockets. We need to wait for the next BGSAVE
     * in order to synchronize. */
    serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
}

【3】若是后台没有 rdb 任务在执行。

若当前 slave 使用的是无磁盘化复制,那么暂时先不进行 bgsave,把它推迟到 replicationCron 函数,这是为了等待更多的 slave,以减小执行 bgsave 的次数,由于使用 diskless 的方式进行主从复制,后来的 slave 不能 attach 到已有 slave 上,只能从新作 bgsave。

若当前 slave 使用的是有磁盘化复制,调用 startBgsaveForReplication 函数开始一次新的 bgsave,须要注意的是这里要避开后台的 aofrewite。代码以下,

/* CASE 3: There is no BGSAVE is progress. */
else {
    if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {

        /* Diskless replication RDB child is created inside
         * replicationCron() since we want to delay its start a
         * few seconds to wait for more slaves to arrive. */
        if (server.repl_diskless_sync_delay)
            serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
    } else {

        /* Target is disk (or the slave is not capable of supporting
         * diskless replication) and we don't have a BGSAVE in progress,
         * let's start one. */
        if (server.aof_child_pid == -1) {
            startBgsaveForReplication(c->slave_capa); // 直接进行 bgsave
        } else {
            serverLog(LL_NOTICE,
                "No BGSAVE in progress, but an AOF rewrite is active. "
                "BGSAVE for replication delayed");
        }
    }
}

最后,若是有必要的话,建立 backlog。

if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
    createReplicationBacklog();
void createReplicationBacklog(void) {
    serverAssert(server.repl_backlog == NULL);
    server.repl_backlog = zmalloc(server.repl_backlog_size);
    server.repl_backlog_histlen = 0;
    server.repl_backlog_idx = 0;

    // 避免以前使用过 backlog 的 slave 引起错误的 PSYNC 操做
    server.master_repl_offset++;

    // 尽管没有数据,但事实上,第一个字节的逻辑位置是 master_repl_offset 的下一个字节
    server.repl_backlog_off = server.master_repl_offset+1;
}

执行 bgsave 操做

接上一小节,bgsave 操做的处理函数为 startBgsaveForReplication
首先根据传入的参数,针对有无磁盘化复制调用不一样的处理函数,即,

int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;

serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
    socket_target ? "slaves sockets" : "disk");

if (socket_target)
    retval = rdbSaveToSlavesSockets();
else
    retval = rdbSaveBackground(server.rdb_filename);

参数 mincapa,表示 slave 的"能力",便是否能接受无硬盘复制的 rdb 数据。
若是选项 server.repl_diskless_sync 为真,且 mincapa 中包含 SLAVE_CAPA_EOF,说明能够为该 slave 直接发送无硬盘复制的 rdb 数据,调用 rdbSaveToSlavesSockets 函数,在后台将 rdb 数据经过 socket 发送给全部状态为 SLAVE_STATE_WAIT_BGSAVE_START 的 slave。
不然,调用rdbSaveBackground 函数,在后台将 rdb 数据转储到本地文件。

若是以上的 rdb 处理函数调用失败,从 slave 列表中删除处于 SLAVE_STATE_WAIT_BGSAVE_START 状态的 slave,并在 slave 中加入 CLIENT_CLOSE_AFTER_REPLY 标识,以便在回复错误消息后关闭链接。代码逻辑以下,

if (retval == C_ERR) {
    serverLog(LL_WARNING,"BGSAVE for replication failed");
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
            slave->flags &= ~CLIENT_SLAVE;
            listDelNode(server.slaves,ln);
            addReplyError(slave,
                "BGSAVE failed, replication can't continue");
            slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
        }
    }
    return retval;
}

若是使用的是有磁盘复制,那么从 slave 列表中找处处于 SLAVE_STATE_WAIT_BGSAVE_START 状态的 slave,调用 replicationSetupSlaveForFullResync 函数,把 slave 状态置为 SLAVE_STATE_WAIT_BGSAVE_END,并回复 +FULLRESYNC,这个前面说过。代码以下,

/* If the target is socket, rdbSaveToSlavesSockets() already setup
 * the salves for a full resync. Otherwise for disk target do it now.*/
if (!socket_target) {
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                replicationSetupSlaveForFullResync(slave,
                        getPsyncInitialOffset());
        }
    }
}

最后调用函数 replicationScriptCacheFlush 清空 lua 脚本缓存。

累积命令流过程

当 master 收到 client 发来的命令后,会调用 call 函数执行相应的命令处理函数。在代码中 PROPAGATE_REPL 标识表示须要将命令同步给 slave,有以下逻辑,

void call(client *c, int flags) {
    ......
   /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        ......
        /* Check if the command operated changes in the data set. If so
         * set for replication / AOF propagation. */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        ......

        /* If the client forced AOF / replication of the command, set
         * the flags regardless of the command effects on the data set. */
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;

        ......

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. */
        if (propagate_flags != PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
}


void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

如今来看重点处理函数 replicationFeedSlaves,如今分析以下。

首先,必要的 check。

// 若是 backlog 为空,且本节点没有 slave,那么下面的逻辑就不必走了
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

若是有必要的话,将 SELECT 命令添加到 backlog 和全部状态不是 SLAVE_STATE_WAIT_BGSAVE_START 的 slave 输出缓存中,其余命令也是如此,代码大概以下,

/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
    char aux[LONG_STR_SIZE+3];

    /* Add the multi bulk reply length. */
    // *..CRLF
    aux[0] = '*';
    len = ll2string(aux+1,sizeof(aux)-1,argc);
    aux[len+1] = '\r';
    aux[len+2] = '\n';
    feedReplicationBacklog(aux,len+3);// argc 转换成字符串的长度 + 3,即 * 以及 CRLF

    for (j = 0; j < argc; j++) {
        long objlen = stringObjectLen(argv[j]);

        /* We need to feed the buffer with the object as a bulk reply
         * not just as a plain string, so create the $..CRLF payload len
         * and add the final CRLF */
        aux[0] = '$';
        len = ll2string(aux+1,sizeof(aux)-1,objlen);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
        feedReplicationBacklogWithObject(argv[j]);
        feedReplicationBacklog(aux+len+1,2); // CRLF
    }
}

/* Write the command to every slave. */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
    client *slave = ln->value;

    /* Don't feed slaves that are still waiting for BGSAVE to start */
    if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

    /* Feed slaves that are waiting for the initial SYNC (so these commands
        * are queued in the output buffer until the initial SYNC completes),
        * or are already in sync with the master. */

    /* Add the multi bulk length. */
    addReplyMultiBulkLen(slave,argc);

    /* Finally any additional argument that was not stored inside the
        * static buffer if any (from j to argc). */
    for (j = 0; j < argc; j++)
        addReplyBulk(slave,argv[j]);
}

向 slave 输出缓存追加命令流时,调用的是 addReply 类的函数。

bgsave 收尾阶段

当完成 bgsave 后,不管是有无磁盘复制,都要调用 updateSlavesWaitingBgsave 函数进行最后的处理,主要是为了前面说过的被推迟的 bgsave

void updateSlavesWaitingBgsave(int bgsaveerr, int type) {.....}

遍历 slave 列表,若是 slave 的复制状态处于 SLAVE_STATE_WAIT_BGSAVE_START,那么调用 startBgsaveForReplication 函数,开始一次新的 bgsave。

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
    startbgsave = 1;
    mincapa = (mincapa == -1) ? slave->slave_capa :
                                (mincapa & slave->slave_capa);

若是 slave 的复制状态处于 SLAVE_STATE_WAIT_BGSAVE_END,说明该 slave 正在等待 rdb 数据处理完成,此时须要根据有无磁盘化复制,区别对待处理。

无磁盘复制

if (type == RDB_CHILD_TYPE_SOCKET) {
    serverLog(LL_NOTICE,
        "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
            replicationGetSlaveName(slave));

    /* Note: we wait for a REPLCONF ACK message from slave in
     * order to really put it online (install the write handler
     * so that the accumulated data can be transfered). However
     * we change the replication state ASAP, since our slave
     * is technically online now. */
    slave->replstate = SLAVE_STATE_ONLINE;
    slave->repl_put_online_on_ack = 1;
    slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
}

将 slave 的复制状态置为 SLAVE_STATE_ONLINE,属性 repl_put_online_on_ack 置为 1。
注意,在收到该 slave 第一个 replconf ack <offset> 命令以后,master 才真正调用 putSlaveOnline 函数将该 slave置为 REDIS_REPL_ONLINE 状态,而且开始发送缓存的命令流。

void replconfCommand(client *c) {
    .....
    else if (!strcasecmp(c->argv[j]->ptr,"ack")) {

        /* REPLCONF ACK is used by slave to inform the master the amount
         * of replication stream that it processed so far. It is an
         * internal only command that normal clients should never use. */
        long long offset;

        if (!(c->flags & CLIENT_SLAVE)) return;
        if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
            return;
        if (offset > c->repl_ack_off)
            c->repl_ack_off = offset;
        c->repl_ack_time = server.unixtime;

        /* If this was a diskless replication, we need to really put
         * the slave online when the first ACK is received (which
         * confirms slave is online and ready to get more data). */
        if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
            putSlaveOnline(c);

        /* Note: this command does not reply anything! */
        return;
    }
    ......
}

之因此这样设计,与这两种复制方式有关。

当使用有磁盘复制方式时,master 会先把 rdb 数据的长度以 $<len>/r/n 的格式发送给 slave,slave 在解析到 len 后,从 socket 中读取到特定长度的 rdb 数据。
当使用无磁盘复制方式时,master 预先没法获知 rdb 数据的长度,那 slave 如何判断 rdb 数据是否读完了呢?在发送 rdb 数据以前,master 会先以 $EOF:<40 bytes delimiter> 的格式发送一个 40 字节的魔数,当 rdb 数据发送完后,再次发送这个魔数,这样 slave 就能够检测到 rdb 数据发送结束了。

若是 master 发送完 rdb 数据后,直接将 slave 状态置为 SLAVE_STATE_ONLINE ,接着发送缓存的命令流。
当采用无磁盘复制方式时,slave 最后读到的数据颇有可能包含了命令流数据。所以,须要等到 slave 发送的第一个 replconf ack <offset> 命令以后,master 再把 slave 状态置为 SLAVE_STATE_ONLINE

有磁盘复制

if (bgsaveerr != C_OK) {
    freeClient(slave);
        serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
        continue;
}

if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
    redis_fstat(slave->repldbfd,&buf) == -1) {
    freeClient(slave);
    serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
    continue;
}

slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
    (unsigned long long) slave->repldbsize);

aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
    freeClient(slave);
    continue;
}

若是前面作 bgsave 出错了,那么这里会释放掉 client。
不然,打开生成的 rdb 文件,将 fd 保存到 repldbfd 属性中,状态置为 SLAVE_STATE_SEND_BULK,这表示要把 rdb 数据发送给 slave 了,将 rdb 大小写入 replpreamble 属性。
从新注册 slave 上的写事件,回调函数为 sendBulkToSlave,该函数作如下分析,

/* Before sending the RDB file, we send the preamble as configured by the
 * replication process. Currently the preamble is just the bulk count of
 * the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
    nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
    if (nwritten == -1) {
        serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
            strerror(errno));
        freeClient(slave);
        return;
    }
    server.stat_net_output_bytes += nwritten;
    sdsrange(slave->replpreamble,nwritten,-1);
    if (sdslen(slave->replpreamble) == 0) {
        sdsfree(slave->replpreamble);
        slave->replpreamble = NULL;
        /* fall through sending data. */
    } else {
        return;
    }
}

若是 replpreamble 属性不为空,说明是第一次触发该回调,那么先把这个 rdb 数据的长度信息发送给 slave。
不然,进入发送实际 rdb 数据阶段。从 rdb 文件中读取数据,而后发送给 slave,代码中使用 repldboff 属性记录累积发送过多少数据。
默认一次发送的数据量为 PROTO_IOBUF_LEN,大小为 16K。

/* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); // 读 16k 数据
if (buflen <= 0) {
    serverLog(LL_WARNING,"Read error sending DB to slave: %s",
        (buflen == 0) ? "premature EOF" : strerror(errno));
    freeClient(slave);
    return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
    if (errno != EAGAIN) {
        serverLog(LL_WARNING,"Write error sending DB to slave: %s",
            strerror(errno));
        freeClient(slave);
    }
    return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;

当 rdb 数据彻底发送完之后,关闭 rdb 文件 fd,删除 fd 的写事件,重置 repldbfd。

if (slave->repldboff == slave->repldbsize) { // 发送完 rdb 文件,删除可读事件
    close(slave->repldbfd);
    slave->repldbfd = -1;
    aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
    putSlaveOnline(slave);
}

最后调用 putSlaveOnline 函数,将 slave 的复制状态置为 SLAVE_STATE_ONLINE,从新注册 fd 的写事件,回调函数为 sendReplyToClient,向 slave 发送累积的命令流。

void putSlaveOnline(client *slave) {
    slave->replstate = SLAVE_STATE_ONLINE;
    slave->repl_put_online_on_ack = 0;
    slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendReplyToClient, slave) == AE_ERR) {
        serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
        freeClient(slave);
        return;
    }
    refreshGoodSlavesCount();
    serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
        replicationGetSlaveName(slave));
}

设置 slave 属性 repl_put_online_on_ack 为 0,表示该 slave 已完成初始同步,接下来进入命令传播阶段
最后,调用 refreshGoodSlavesCount 函数,更新当前状态正常的 slave 数量。


到此,主从复制过程当中 master 的逻辑就已经讲完了。

相关文章
相关标签/搜索