Redis 源码分析之 cluster meet

Redis cluster 是 redis 官方提出的分布式集群解决方案,在此以前,有一些第三方的可选方案,如 codis、Twemproxy等。cluster 内部使用了 gossip 协议进行通讯,以达到数据的最终一致性。详细介绍可参考官网 Redis cluster tutorial
本文试图借着cluster meet 命令的实现来对其中的一些通讯细节一探究竟。
咱们都知道,当 redis server 以 cluster mode 启动时,节点 A 想加入节点 B 所在的集群,只须要执行 CLUSTER MEET ip port 这个命令便可,经过 gossip 通讯,最终 B 所在集群的其余节点也都会认识到 A。大概流程图以下:html

cluster 初始化

当 redis server 以 cluster mode 启动时,即配置文件中的 cluster-enabled 选项设置为 true,此时在服务启动时,会有一个 cluster 初始化的流程,这个在以前的文章 《Redis 启动流程》中有提到过,即执行函数 clusterInit。在 cluster 中有三个数据结构很重要, clusterStateclusterNodeclusterLink
每一个节点都保存着一个 clusterState 结构,这个结构记录了在当前节点的视角下,集群目前所处的状态,即“我看到的世界是什么样子”。
每一个节点都会使用一个 clusterNode 结构来记录本身的状态, 并为集群中的全部其余节点(包括主节点和从节点)都建立一个相应的 clusterNode 结构, 以此来记录其余节点的状态。
clusterNode 结构的 link 属性是一个 clusterLink 结构, 该结构保存了链接节点所需的有关信息, 好比套接字描述符, 输入缓冲区和输出缓冲区。
更多的细节能够经过网页 《redis 设计与实现 - 节点》进行了解。
该初始化很简单,首先是建立一个 clusterState 结构,并初始化一些成员,以下:node

server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;     // 新节点的 currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; // 初始状态置为 FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots

而后给 node.conf 文件加锁,确保每一个节点使用本身的 cluster 配置文件。redis

if (clusterLockConfig(server.cluster_configfile) == C_ERR)
    exit(1);

借着这个机会学习下 redis 如何使用的文件锁。数据库

int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
    serverLog(LL_WARNING,
              "Can't open %s in order to acquire a lock: %s",
              filename, strerror(errno));
    return C_ERR;
}

if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
    if (errno == EWOULDBLOCK) {
        serverLog(LL_WARNING,
                  "Sorry, the cluster configuration file %s is already used "
                  "by a different Redis Cluster node. Please make sure that "
                  "different nodes use different cluster configuration "
                  "files.", filename);
    } else {
        serverLog(LL_WARNING,
                  "Impossible to lock %s: %s", filename, strerror(errno));
    }
    close(fd);
    return C_ERR;
}

而后加载 node.conf 文件,这个过程还会检查这个文件是否合理。服务器

若是加载失败(或者配置文件不存在),则以 REDIS_NODE_MYSELF|REDIS_NODE_MASTER 为标记,建立一个clusterNode 结构表示本身自己,置为主节点,并设置本身的名字为一个40字节的随机串;而后将该节点添加到server.cluster->nodes中,这说明这是个新启动的节点,生成的配置文件进行刷盘。数据结构

if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
    myself = server.cluster->myself =
        createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
    serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
              myself->name);
    clusterAddNode(myself);
    saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); // 新节点,将配置刷到配置文件中,fsync

接下来,调用 listenToPort 函数,在集群 gossip 通讯端口上建立 socket fd 进行监听。集群内 gossip 通讯端口是在 Redis 监听端口基础上加 10000,好比若是Redis监听客户端的端口为 6379,则集群监听端口就是16379,该监听端口用于接收其余集群节点发送过来的 gossip 消息。app

而后注册监听端口上的可读事件,事件回调函数为 clusterAcceptHandlerdom

#define CLUSTER_PORT_INCR 10000

if (listenToPort(server.port+CLUSTER_PORT_INCR,
                 server.cfd,&server.cfd_count) == C_ERR)
{
    exit(1);
} else {
    int j;
    for (j = 0; j < server.cfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, 
                              clusterAcceptHandler, NULL) == AE_ERR)
            serverPanic("Unrecoverable error creating Redis Cluster "
                        "file event.");
    }
}

当前节点收到其余集群节点发来的TCP建链请求以后,就会调用 clusterAcceptHandler 函数 accept 链接。在 clusterAcceptHandler函数中,对于每一个已经 accept 的连接,都会建立一个clusterLink 结构表示该连接,并注册 socket fd上的可读事件,事件回调函数为 clusterReadHandlersocket

#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    ... ...
    // 若是服务器正在启动,不要接受其余节点的链接, 由于 UPDATE 消息可能会干扰数据库内容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        ... ...
        // 建立一个 link 结构来处理链接
        // 刚开始的时候, link->node 被设置成 null,由于如今咱们不知道是哪一个节点
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

最后是 reset mf 相关的参数。tcp

CLUSTER MEET

A 节点接收 CLUSTER MEET 命令

A 节点在cluster.c -> clusterCommand 函数中,接收到 CLUSTER MEET 命令,即

if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    long long port;

    // CLUSTER MEET <ip> <port>
    if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
        addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
        return;
    }
    if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
    {
        addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
    } else {
        addReply(c,shared.ok);
    }
}

能够看到重点在 clusterStartHandshake 这个函数。

int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[NET_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP and Port sanity check */
    ... ...
        
    // 检查节点(flag) norm_ip:port 是否正在握手
    if (clusterHandshakeInProgress(norm_ip,port)) { 
        errno = EAGAIN;
        return 0;
    }
    // 建立一个含随机名字的 node,type 为 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
    // 相关信息会在 handshake 过程当中被修复
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
    clusterNode *node = zmalloc(sizeof(*node));
    if (nodename)
        memcpy(node->name, nodename, CLUSTER_NAMELEN);
    else
        // 在本地新建一个 nodename 节点,节点名字随机,跟它通讯时它会告诉我真实名字
        getRandomHexChars(node->name, CLUSTER_NAMELEN);
    node->ctime = mstime(); // mstime
    node->configEpoch = 0;
    node->flags = flags;
    memset(node->slots,0,sizeof(node->slots));
    node->slaveof = NULL;
    ... ...
    node->link = NULL; // link 为空, 在 clusterCron 中能检查的到
    memset(node->ip,0,sizeof(node->ip));
    node->port = 0;
    node->fail_reports = listCreate();
    ... ...
    listSetFreeMethod(node->fail_reports,zfree);
    return node;
}

这个函数会首先进行一些 ip 和 port 的合理性检查,而后去遍历所看到的 nodes,这个 ip:port 对应的 node 是否是正处于 CLUSTER_NODE_HANDSHAKE 状态,是的话,就说明这是重复 meet,不必往下走。以后,经过 createClusterNode 函数建立一个带有 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET 标记的节点,名字为一个随机的 40 字节字符串(由于此时对 A 来讲,B 是一个陌生的节点,信息除了 ip 和 port,其余都不知道),经过 clusterAddNode 函数加到本身的 nodes 中。
这个过程成功后,就返回给客户端 OK 了,其余事情须要经过 gossip 通讯去作。

A 节点发送 MEET gossip 消息给 B 节点

A 节点在定时任务 clusterCron 中,会作一些事情。

handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

// 检查是否有 disconnected nodes 而且从新创建链接
di = dictGetSafeIterator(server.cluster->nodes); // 遍历全部节点
while((de = dictNext(di)) != NULL) {
    clusterNode *node = dictGetVal(de);
    
     // 忽略掉 myself 和 noaddr 状态的节点
    if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; 
    
    // 节点处于 handshake 状态,且状态维持时间超过 handshake_timeout,那么从 nodes中删掉它
    if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
        clusterDelNode(node);
        continue;
    }

    // 刚刚收到 cluster meet 命令建立的新 node ,或是 server 刚启动,或是因为某种缘由断开了
    if (node->link == NULL) { 
        int fd;
        mstime_t old_ping_sent;
        clusterLink *link;

        // 对端 gossip 通讯端口为 node 端口 + 10000,建立 tcp 链接, 本节点至关于 client
        fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
        ... ...
        link = createClusterLink(node);
        link->fd = fd;
        node->link = link;

        // 注册 link->fd 上的可读事件,事件回调函数为 clusterReadHandler
        aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
        ... ...

        // 若是 node 带有 MEET flag,咱们发送一个 MEET 包而不是 PING,
        // 这是为了强制让接收者把咱们加到它的 nodes 中
        clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
        ... ...
        node->flags &= ~CLUSTER_NODE_MEET;
        ... ...
    }
}
dictReleaseIterator(di);

能够看到,遍历本身看到的 nodes,当遍历到 B 节点时,因为 node->link == NULL,所以会监听 B 的启动端口号+10000,即 gossip 通讯端口,而后注册可读事件,处理函数为 clusterReadHandler。接着会发送 CLUSTER_NODE_MEET 消息给 B 节点,消除掉 B 节点的 meet 状态。

B 节点处理 A 发来的 MEET gossip 消息

当 B 节点接收到 A 节点发送 gossip 时,回调函数 clusterAcceptHandler 进行处理,而后会 accept 对端的 connect(B 做为 server,对端做为 client),注册可读事件,回调函数为 clusterReadHandler,基本逻辑以下,

void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 若是服务器正在启动,不要接受其余节点的连接,由于 UPDATE 消息可能会干扰数据库内容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) { // 1000 个请求
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
   
        // 建立一个 link 结构来处理链接
        // 刚开始的时候, link->node 被设置成 null,由于如今咱们不知道是哪一个节点
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

能够看到每次 accept 对端connect时,都会建立一个 clusterLink 结构用来接收数据,

typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    int fd;                     /* TCP socket file descriptor */
    sds sndbuf;                 /* Packet send buffer */
    sds rcvbuf;                 /* Packet reception buffer */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;

clusterLink 有一个指针是指向 node 自身的。
B 节点接收到 A 节点发送过来的信息,放到 clusterLinkrcvbuf 字段,而后使用 clusterProcessPacket 函数来处理(接收数据过程很简单,不作分析)。
因此 clusterProcessPacket 函数的做用是处理别人发过来的 gossip 包。

if (!sender && type == CLUSTERMSG_TYPE_MEET) {
    clusterNode *node;

    // 建立一个带有 CLUSTER_NODE_HANDSHAKE 标记的 cluster node,名字随机
    node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
    nodeIp2String(node->ip,link); // ip 和 port 信息均从 link 中得到
    node->port = ntohs(hdr->port);

    clusterAddNode(node);
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);

因为这时 B 节点还不认识 A 节点,所以 B 节点从本身的 nodes 中找 A 节点是找不到的,因此 sender 是空,所以会走进如上的这段逻辑。一样以随机的名字,CLUSTER_NODE_HANDSHAKE 为 flag 建立一个 node,加入本身的 nodes 中。
在这个逻辑末尾会给 A 节点回复一个 PONG 消息。

A 节点处理 B 节点回复的 PONG gossip 消息

一样是在 clusterProcessPacket 中处理 gossip 消息。

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
    ... ...
    if (link->node) {
        if (nodeInHandshake(link->node)) { // node 处于握手状态
            ... ...
            clusterRenameNode(link->node, hdr->sender); // 修正节点名
            link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 消除 handshake 状态
            link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
}

这个时候 A 节点会根据 B 节点发来的消息,更正 A 节点 nodes 中关于 B 节点的名字,以及消除 handshake 状态。

B 节点发送 PING gossip 消息给 A 节点

当 B 节点在作 clusterCron 时,发现本身看到的 A 节点中的 link 为空,即 node->link == NULL,这与上面讲的 A 节点给 B 节点发 MEET 消息相似,不过在 B 节点看了 A 节点没有 meet flag,所以发送的是 PING 消息。

A 节点处理 B 节点发来的 PING 消息

作一些逻辑,不过跟此次要讨论的事情无关,后面会详写。

对于 PING 和 MEET 消息,不管如何都是会回复一个 PONG 消息的

B 节点处理 A 节点回复的 PONG 消息

逻辑同上,将 B 节点的 nodes 中 A 节点的名字进行更正,而后去掉 A 节点的 handshake flag。

小结

至此,一个 cluster meet 命令执行的完整过程就解释清楚了,画了一个流程图能够帮助更好的理解这个流程。

cluster meet

相关文章
相关标签/搜索