Redis5.0版是Redis产品的重大版本发布,它的最新特色:
1 新的流数据类型(Stream data type) https://redis.io/topics/streams-intro 2 新的 Redis 模块 API:定时器、集群和字典 API(Timers, Cluster and Dictionary APIs) 3 RDB 增长 LFU 和 LRU 信息 4 集群管理器从 Ruby (redis-trib.rb) 移植到了redis-cli 中的 C 语言代码 5 新的有序集合(sorted set)命令:ZPOPMIN/MAX 和阻塞变体(blocking variants) 6 升级 Active defragmentation 至 v2 版本 7 加强 HyperLogLog 的实现 8 更好的内存统计报告 9 许多包含子命令的命令如今都有一个 HELP 子命令 10 客户端频繁链接和断开链接时,性能表现更好 11 许多错误修复和其余方面的改进 12 升级 Jemalloc 至 5.1 版本 13 引入 CLIENT UNBLOCK 和 CLIENT ID 14 新增 LOLWUT 命令 http://antirez.com/news/123 15 在不存在须要保持向后兼容性的地方,弃用 "slave" 术语 16 网络层中的差别优化 17 Lua 相关的改进 18 引入动态的 HZ(Dynamic HZ) 以平衡空闲 CPU 使用率和响应性 19 对 Redis 核心代码进行了重构并在许多方面进行了改进
Redis Cluster总览
1、简介
在官方文档Cluster Spec中,做者详细介绍了Redis集群为何要设计成如今的样子。最核心的目标有三个:
1 性能:增长集群功能后不能对性能产生太大影响,因此Redis采起了P2P而非Proxy方式、异步复制、客户端重定向等设计。 2 水平扩展:文档中称能够线性扩展到1000结点。 3 可用性:在Cluster推出以前,可用性要靠Sentinel保证。有了集群以后也自动具备了Sentinel的监控和自动Failover能力。
若是须要全面的了解,那必定要看官方文档Cluster Tutorial。
Redis Cluster是一个高性能高可用的分布式系统。由多个Redis实例组成的总体,数据按照Slot存储分布在多个Redis实例上,经过Gossip协议来进行节点之间通讯。功能特色以下:
1 全部的节点相互链接 2 集群消息通讯经过集群总线通讯,集群总线端口大小为客户端服务端口+10000(固定值) 3 节点与节点之间经过二进制协议进行通讯 4 客户端和集群节点之间通讯和一般同样,经过文本协议进行 5 集群节点不会代理查询 6 数据按照Slot存储分布在多个Redis实例上 7 集群节点挂掉会自动故障转移 8 能够相对平滑扩/缩容节点
关于Cluster相关的源码能够见:src/cluster.c 和 src/cluster.h
2、通讯
2.1 CLUSTER MEET
须要组建一个真正的可工做的集群,咱们必须将各个独立的节点链接起来,构成一个包含多个节点的集群。链接各个节点的工做使用CLUSTER MEET命令来完成。
CLUSTER MEET <ip> <port>
CLUSTER MEET命令实现:
1 节点A会为节点B建立一个 clusterNode 结构,并将该结构添加到本身的 clusterState.nodes 字典里面。 2 节点A根据CLUSTER MEET命令给定的IP地址和端口号,向节点B发送一条MEET消息。 3 节点B接收到节点A发送的MEET消息,节点B会为节点A建立一个clusterNode结构,并将该结构添加到本身的clusterState.nodes字典里面。 4 节点B向节点A返回一条PONG消息。 5 节点A将受到节点B返回的PONG消息,经过这条PONG消息节点A能够知道节点B已经成功的接收了本身发送的MEET消息。 6 节点A将向节点B返回一条PING消息。 7 节点B将接收到的节点A返回的PING消息,经过这条PING消息节点B能够知道节点A已经成功的接收到了本身返回的PONG消息,握手完成。 8 节点A会将节点B的信息经过Gossip协议传播给集群中的其余节点,让其余节点也与节点B进行握手,最终,通过一段时间后,节点B会被集群中的全部节点认识。
2.2 消息处理 clusterProcessPacket
1 更新接收消息计数器 2 查找发送者节点而且不是handshake节点 3 更新本身的epoch和slave的offset信息 4 处理MEET消息,使加入集群 5 从goosip中发现未知节点,发起handshake 6 对PING,MEET回复PONG 7 根据收到的心跳信息更新本身clusterState中的master-slave,slots信息 8 对FAILOVER_AUTH_REQUEST消息,检查并投票 9 处理FAIL,FAILOVER_AUTH_ACK,UPDATE信息
2.3 定时任务clusterCron
1 对handshake节点创建Link,发送Ping或Meet 2 向随机节点发送Ping 3 若是是从查看是否须要作Failover 4 统计并决定是否进行slave的迁移,来平衡不一样master的slave数 5 判断全部pfail报告数是否过半数
2.4 心跳数据
集群中的节点会不停(每几秒)的互相交换ping、pong包,ping和pong包具备相同的结构,只是类型不一样,ping、pong包合在一块儿叫作心跳包。一般节点会发送ping包并接收接收者返回的pong包,不过这也不是绝对,节点也有可能只发送pong包,而不须要让接收者发送返回包。
节点间经过ping保持心跳以及进行gossip集群状态同步,每次心跳时,节点会带上多个clusterMsgDataGossip消息体,通过屡次心跳,该节点包含的其余节点信息将同步到其余节点。
ping和pong包的内容能够分为header和gossip消息两部分:
- 发送消息头信息Header
- 所负责slots的信息
- 主从信息
- ip, port信息
- 状态信息
包含的信息:
1 NODE ID是一个160bit的伪随机字符串,它是节点在集群中的惟一标识 2 currentEpoch和configEpoch字段 3 node flag,标识节点是master仍是slave,另外还有一些其余的标识位,如PFAIL和FAIL。 4 节点提供服务的hash slot的bitmap 5 发送者的TCP端口 6 发送者认为的集群状态(down or ok) 7 若是是slave,则包含master的NODE ID
- 发送其余节点Gossip信息。包含了该节点认为的其余节点的状态,不过不是集群的所有节点(随机)
- ping_sent, pong_received
- ip, port信息
- 状态信息,好比发送者认为该节点已经不可达,会在状态信息中标记其为PFAIL或FAIL
包含的信息:
1 NODE ID 2 节点的IP和端口 3 NODE flags
clusterMsg结构的currentEpoch、sender、myslots等属性记录了发送者自身的节点信息,接收者会根据这些信息,在本身的clusterState.nodes字典里找到发送者对应的结构,并对结构进行更新。
Redis集群中的各个节点经过ping来心跳,经过Gossip协议来交换各自关于不一样节点的状态信息,其中Gossip协议由MEET、PING、PONG三种消息实现,这三种消息的正文都由两个clusterMsgDataGossip结构组成。
每次发送MEET、PING、PONG消息时,发送者都从本身的已知节点列表中随机选出两个节点(能够是主节点或者从节点),并将这两个被选中节点的信息分别保存到两个结构中。当接收者收到消息时,接收者会访问消息正文中的两个结构,并根据本身是否定识clusterMsgDataGossip结构中记录的被选中节点进行操做:
1 若是被选中节点不存在于接收者的已知节点列表,那么说明接收者是第一次接触到被选中节点,接收者将根据结构中记录的IP地址和端口号等信息,与被选择节点进行握手。 2 若是被选中节点已经存在于接收者的已知节点列表,那么说明接收者以前已经与被选中节点进行过接触,接收者将根据clusterMsgDataGossip结构记录的信息,对被选中节点对应的clusterNode结构进行更新。
2.5 数据结构
clusterNode 结构保存了一个节点的当前信息, 如记录了节点负责处理那些槽、建立时间、节点的名字、节点当前的配置纪元、节点的 IP 和端口等:
1 slots:位图,由当前clusterNode负责的slot为1 2 salve, slaveof:主从关系信息 3 ping_sent, pong_received:心跳包收发时间 4 clusterLink *link:节点间的链接 5 list *fail_reports:收到的节点不可达投票
clusterState 结构记录了在当前节点的集群目前所处的状态还有全部槽的指派信息:
1 myself:指针指向本身的clusterNode 2 currentEpoch:当前节点的最大epoch,可能在心跳包的处理中更新 3 nodes:当前节点记录的全部节点的字典,为clusterNode指针数组 4 slots:slot与clusterNode指针映射关系 5 migrating_slots_to,importing_slots_from:记录slots的迁移信息 6 failover_auth_time,failover_auth_count,failover_auth_sent,failover_auth_rank,failover_auth_epoch:Failover相关信息
clusterLink 结构保存了链接节点的有关信息, 好比套接字描述符, 输入缓冲区和输出缓冲区。
3、数据分布及槽信息
3.1 槽(slot)概念
Redis Cluster中有一个16384长度的槽的概念,他们的编号为0、一、二、3……1638二、16383。这个槽是一个虚拟的槽,并非真正存在的。正常工做的时候,Redis Cluster中的每一个Master节点都会负责一部分的槽,当有某个key被映射到某个Master负责的槽,那么这个Master负责为这个key提供服务,至于哪一个Master节点负责哪一个槽,这是能够由用户指定的,也能够在初始化的时候自动生成。在Redis Cluster中,只有Master才拥有槽的全部权,若是是某个Master的slave,这个slave只负责槽的使用,可是没有全部权。
3.2 数据分片
在Redis Cluster中,拥有16384个slot,这个数是固定的,存储在Redis Cluster中的全部的键都会被映射到这些slot中。数据库中的每一个键都属于这 16384 个哈希槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪一个槽,其中 CRC16(key) 用于计算键 key 的 CRC16 校验和,集群中的每一个节点负责处理一部分哈希槽。
3.3 节点的槽指派信息
clusterNode结构的slots属性和numslot属性记录了节点负责处理那些槽:
struct clusterNode { //… unsignedchar slots[16384/8]; };
Slots属性是一个二进制位数组(bitarray),这个数组的长度为16384/8=2048个字节,共包含16384个二进制位。Master节点用bit来标识对于某个槽本身是否拥有。好比对于编号为1的槽,Master只要判断序列的第二位(索引从0开始)是否是为1便可。时间复杂度为O(1)。
3.4 集群全部槽的指派信息
经过将全部槽的指派信息保存在clusterState.slots数组里面,程序要检查槽i是否已经被指派,又或者取得负责处理槽i的节点,只须要访问clusterState.slots[i]的值便可,复杂度仅为O(1)。
3.5 请求重定向
因为每一个节点只负责部分slot,以及slot可能从一个节点迁移到另外一节点,形成客户端有可能会向错误的节点发起请求。所以须要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不一样的重定向场景:
a) MOVED错误
-
请求的key对应的槽不在该节点上,节点将查看自身内部所保存的哈希槽到节点 ID 的映射记录,节点回复一个 MOVED 错误。
-
须要客户端进行再次重试。
b) ASK错误
-
请求的key对应的槽目前的状态属于MIGRATING状态,而且当前节点找不到这个key了,节点回复ASK错误。ASK会把对应槽的IMPORTING节点返回给你,告诉你去IMPORTING的节点查找。
-
客户端进行重试 首先发送ASKING命令,节点将为客户端设置一个一次性的标志(flag),使得客户端能够执行一次针对 IMPORTING 状态的槽的命令请求,而后再发送真正的命令请求。
-
没必要更新客户端所记录的槽至节点的映射。
4、数据迁移
当槽x从Node A向Node B迁移时,Node A和Node B都会有这个槽x,Node A上槽x的状态设置为MIGRATING,Node B上槽x的状态被设置为IMPORTING。
MIGRATING状态
-
若是key存在则成功处理
-
若是key不存在,则返回客户端ASK,客户端根据ASK首先发送ASKING命令到目标节点,而后发送请求的命令到目标节点
-
当key包含多个:
-
若是都存在则成功处理
-
若是都不存在,则返回客户端ASK
-
若是一部分存在,则返回客户端TRYAGAIN,通知客户端稍后重试,这样当全部的key都迁移完毕的时候客户端重试请求的时候回获得ASK,而后通过一次重定向就能够获取这批键
-
此时不刷新客户端中node的映射关系
IMPORTING状态
-
若是key不在该节点上,会被MOVED重定向,刷新客户端中node的映射关系
-
若是是ASKING则命令会被执行,key不在迁移的节点已经被迁移到目标的节点
-
Key不存在则新建
Key迁移的命令:
1 DUMP:在源(migrate)上执行 2 RESTORE:在目标(importing)上执行 3 DEL:在源(migrate)上执行
通过上面三步能够将键迁移,而后再将处于MIGRATING和IMPORTING状态的槽变为常态,完成整个从新分片的过程,具体的信息可见Redis Cluster部署、管理和测试 。
4.1 读写请求
槽里面的key还未迁移,而且槽属于迁移中。
假如槽x在Node A,须要迁移到Node B上,槽x的状态为migrating,其中的key1还没轮到迁移。此时访问key1则先计算key1所在的Slot,存在key1则直接返回。
4.2 MOVED请求
槽里面的key已经迁移过去,而且槽属于迁移完。
假如槽x在Node A,须要迁移到Node B上,迁移完成。此时访问key1则先计算key1所在的Slot,由于已经迁移至Node B上,Node A上不存在,则返回 moved slotid IP:PORT,再根据返回的信息去Node B访问key1。
4.3 ASK请求
槽里面的key已经迁移完,而且槽属于迁移中的状态。
假如槽x在Node A,须要迁移到Node B上,迁移完成,但槽x的状态为migrating。此时访问key1则先计算key1所在的Slot,不存在key1则返回ask slotid IP:PORT,再根据ask返回的信息发送asking请求到Node B,没问题后则最后再去Node B上访问key1。
5、通讯故障
5.1 故障检测
集群中的每一个节点都会按期地向集群中的其余节点发送PING消息,以此交换各个节点状态信息,检测各个节点状态:在线状态、疑似下线状态PFAIL、已下线状态FAIL。
当主节点A经过消息得知主节点B认为主节点D进入了疑似下线(PFAIL)状态时,主节点A会在本身的clusterState.nodes字典中找到主节点D所对应的clusterNode结构,并将主节点B的下线报告(failure report)添加到clusterNode结构的fail_reports链表中。
struct clusterNode { //... //记录全部其余节点对该节点的下线报告 list*fail_reports; //... };
若是集群里面,半数以上的主节点都将主节点D报告为疑似下线,那么主节点D将被标记为已下线(FAIL)状态,将主节点D标记为已下线的节点会向集群广播主节点D的FAIL消息,全部收到FAIL消息的节点都会当即更新nodes里面主节点D状态标记为已下线。
将 node 标记为 FAIL 须要知足如下两个条件:
1 有半数以上的主节点将 node 标记为 PFAIL 状态。 2 当前节点也将 node 标记为 PFAIL 状态。
5.2 多个从节点选主
选新主的过程基于Raft协议选举方式来实现的:
1 当从节点发现本身的主节点进行已下线状态时,从节点会广播一条CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求全部收到这条消息,而且具备投票权的主节点向这个从节点投票 2 若是一个主节点具备投票权,而且这个主节点还没有投票给其余从节点,那么主节点将向要求投票的从节点返回一条,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示这个主节点支持从节点成为新的主节点 3 每一个参与选举的从节点都会接收CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,并根据本身收到了多少条这种消息来统计本身得到了多少主节点的支持 4 若是集群里有N个具备投票权的主节点,那么当一个从节点收集到大于等于集群N/2+1张支持票时,这个从节点就成为新的主节点 5 若是在一个配置纪元没有从可以收集到足够的支持票数,那么集群进入一个新的配置纪元,并再次进行选主,直到选出新的主节点为止
5.3 故障转移
错误检测用于识别集群中的不可达节点是否已下线,若是一个master下线,会将它的slave提高为master,在gossip消息中,NODE flags的值包括两种PFAIL和FAIL。
PFAIL flag:
若是一个节点发现另一个节点不可达的时间超过NODE_TIMEOUT ,则会将这个节点标记为PFAIL,即Possible failure(可能下线)。节点不可达是说一个节点发送了ping包,可是等待了超过NODE_TIMEOUT时间仍然没有收到回应(NODE_TIMEOUT必须大于一个网络包来回的时间)。
FAIL flag:
PFAIL标志只是一个节点本地的信息,为了使slave提高为master,须要将PFAIL升级为FAIL。PFAIL升级为FAIL须要知足一些条件:
1 A节点将B节点标记为PFAIL 2 A节点经过gossip消息收集其余大部分master节点标识的B节点的状态 3 大部分master节点在NODE_TIMEOUT * FAIL_REPORT_VALIDITY_MULT(2s)时间段内,标识B节点为PFAIL或FAIL
若是知足以上条件,A节点会将B节点标识为FAIL而且向全部节点发送B节点FAIL的消息。收到消息的节点也都会将B标为FAIL。
注意:FAIL状态是单向的,只能从PFAIL升级为FAIL,而不能从FAIL降为PFAIL。
清除FAIL状态:
- 节点从新可达,而且是slave节点
- 节点从新可达,而且是master节点,可是不提供任何slot服务
- 节点从新可达,而且是master节点,可是长时间没有slave被提高为master来顶替它
PFAIL提高到FAIL使用的是一种弱协议:
- 节点收集的状态不在同一时间点,会丢弃时间较早的报告信息,可是也只能保证节点的状态在一段时间内大部分master达成了一致
- 检测到一个FAIL后,须要通知全部节点,可是没有办法保证每一个节点都能成功收到消息
当从节点发现本身的主节点变为已下线(FAIL)状态时,便尝试进Failover,成为新的主。如下是故障转移的执行步骤:
1 在下线主节点的全部从节点中选中一个从节点 2 被选中的从节点执行SLAVEOF NO NOE命令,成为新的主节点 3 新的主节点会撤销全部对已下线主节点的槽指派,并将这些槽所有指派给本身 4 新的主节点对集群进行广播PONG消息,告知其余节点已经成为新的主节点 5 新的主节点开始接收和处理槽相关的请求
相关结构
clusterNode:

typedef struct clusterNode { mstime_t ctime; // 该node建立时间 char name[CLUSTER_NAMELEN]; // 40位的node名字 // node 状态标识经过投CLUSTER_NODE 定义。 // 包括 maser slave self pfail fail handshake noaddr meet migrate_to null_name 这些状态 int flags; // 本节点最新epoch uint64_t configEpoch; // 当前node负责的slot 经过bit表示 unsigned char slots[CLUSTER_SLOTS/8]; int numslots; int numslaves; struct clusterNode **slaves; // 若是该node为从 则指向master节点 struct clusterNode *slaveof; mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned master condition */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; // 将该节点标记为失败的node list // 节点收到gossip消息后,若是gossip里标记该节点为pfail则加入改list // 好比:节点a向b发送gossip,消息包含了 c 节点且出于pfail,则a将被加入c的link。 list *fail_reports; } clusterNode;
clusterMsgData:节点间通信的数据结构 包含了 ping、fail、publish、update四种类型

struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping;
节点间经过ping保持心跳以及进行gossip集群状态同步,每次心跳时,节点会带上多个clusterMsgDataGossip(其余节点)消息体,通过屡次心跳,该节点包含的其余节点信息将同步到其余节点。
clusterState:定义了完整的集群信息

struct clusterState{ // 集群最新的epoch,为64位的自增序列 uint64_t currentEpoch; // 包含的全部节点信息 dict *nodes; // 每一个slot所属于的节点,包括处于migrating和importinng状态的slot clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; // 当前节点所包含的key 用于在getkeysinslot的时候返回key信息 zskiplist *slots_to_keys; ... }
redis启动,判断是否容许cluster模式,若是容许,则调用clusterInit进行cluster信息的初始化。clusterState被初始化为初始值。在后续节点meet及ping过程逐步更新clusterState信息。
Send Ping:
节点建立成功后,节点会向已知的其余节点发送ping消息保持心跳,ping消息体同时会携带已知节点的信息,并经过gossip同步到集群的其余节点。
node的ping由clusterCron负责调用,服务启动时,在serverCron内部会注册clusterCron,该函数每秒执行10次,在clusterCron内部,维护着static变量iteration记录该函数被执行的次数:经过if (!(iteration % 10)){}的判断,使得各节点每秒发送一次心跳。ping节点选择的代码逻辑以下:
clusterCron:

void clusterCron(void) { // ... // 若是没有设置handshake超时,则默认超时未1s handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; // 遍历nodes列表 while ((de = dictNext(di)) != NULL) { // 删除handshake超时的节点 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { clusterDelNode(node); continue; } // 若是该节点的link为空,则为该节点新建链接,而且初始化ping初始时间 if (node->link == NULL) { // 若是该节点处于meet状态,则直接发送meet让节点加入集群 // 不然发送向该节点发送ping clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); } // 函数每被调动10次,则发送一次ping,所以ping间隔为1s if (!(iteration % 10)) { int j; for (j = 0; j < 5; j++) { // 随机选取节点并过滤link为空的以及self de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; // 挑选距离上次pong间隔最久的节点 // redis会尽可能选择距离上次ping间隔最久的节点, // 以此防止随机不均匀致使某些节点一直收不到ping if (min_pong_node == NULL || min_pong > this->pong_received) { min_pong_node = this; min_pong = this->pong_received; } } } }
发送ping的时候,会优先给新加入的节点发送ping,其实再选择最久没被更新的节点,经过对旧节点选择的加权,尽量地保证了集群最新状态的一致。
每次ping请求,node会从已知的nodes列表里随机选取n个节点(n=1/10*len(nodes)&& n>=3),一段时间后,该节点已知的nodes将被同步到集群的其余节点,集群状态信息达成最终一致。具体实现代码以下(只列出部分代码,完整代码见cluster.c/clusterSendPing)
clusterSendPing:

void clusterSendPing(clusterLink *link, int type) { // 选取1/10 的节点数而且要求大于3. // 1/10是个魔数,为啥是1/10在源码里有解释 int freshnodes = dictSize(server.cluster->nodes) - 2; wanted = floor(dictSize(server.cluster->nodes) / 10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; while (freshnodes > 0 && gossipcount < wanted && maxiterations--) { // 经过随机函数随机选择一个节点,保证全部节点尽量被同步到整个集群 dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); // 为了保证失败的节点尽量快地同步到集群其余节点, // 优先选取处于pfail以及fail状态的节点 if (maxiterations > wanted * 2 && !(this->flags & (CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) continue; } // 若是被选中的节点处于 // 1.handshake 而且noaddr状态 // 2.其余节点没有包含该节点的信息,而且该节点没有拥有slot // 则跳过该节点而且将可用的节点数减1,以较少gossip数据同步的开销 if (this->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* Tecnically not correct, but saves CPU. */ continue; } }
经过随机选取合适数量的节点,以及对节点状态的过滤,保证了尽量快的达成最终一致性的同时,减小gossip的网络开销。
cluster监听cluster端口,并经过clusterAcceptHandler接受集群节点发起的链接请求,经过aeCreateFileEvent将clusterReadHandler注册进事件回调,读取node发送的数据包。clusterReadHandler读取到完整的数据包后,调用clusterProcessPacket处理包请求。clusterProcessPacket包含收到数据包后完整的处理逻辑。
clusterProcessPacket:

int clusterProcessPacket(clusterLink *link) { // 判断是否为ping请求并校验数据包长度 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { uint16_t count = ntohs(hdr->count); uint32_t explen; /* expected length of this packet */ explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip) * count); if (totlen != explen) return 1; } // ... // 是否为已知节点 sender = clusterLookupNode(hdr->sender); if (sender && !nodeInHandshake(sender)) { // 比较epoch并更新为最大的epoch if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is publishing a newer one. */ if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } } // 回复pong数据包 clusterSendPing(link, CLUSTERMSG_TYPE_PONG); // 获取gossip消息并处理gossip请求 if (sender) clusterProcessGossipSection(hdr, link); }
clusterProcessGossipSection 读取携带的gossip node内容,并判断这些node是否failover:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { // ... if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); } markNodeAsFailingIfNeeded(node); } else { // 若是该node并不是出于fail状态,则从fail link里删除该node if (clusterNodeDelFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s is back online.", sender->name, node->name); } } }

int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) { while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (fr->node == sender) break; } if (!ln) return 0; // 若是以前被标记为失败,则从失败list里删除 listDelNode(l, ln); }
cluster会根据收到的gossip包里的msgdata来更新集群的状态信息,包括epoch,以及其他节点的状态。若是node被标记为pfail或fail,则被加入fail_reports,当fail_reports长度超过半数节点数量时,该节点及被标记为failover。
总结:
Redis Cluster是去中心化的结构,集群元数据信息分布在每一个节点上,主备切换依赖于多个节点协商选主。采用无中心节点方式实现,无需proxy代理,客户端直接与redis集群的每一个节点链接,根据一样的hash算法计算出key对应的slot,而后直接在slot对应的Redis上执行命令。从CAP定理来看,Cluster支持了AP(Availability&Partition-Tolerancy),这样让Redis从一个单纯的NoSQL内存数据库变成了分布式NoSQL数据库。