做者:屈鹏git
在 《TiKV 源码解析(二)raft-rs proposal 示例情景分析》 中,咱们主要介绍了 raft-rs 的基本 API 使用,其中,与应用程序进行交互的主要 API 是:github
熟悉了以上 3 个 API,用户就能够写出基本的基于 Raft 的分布式应用的框架了,而 Raft 协议中将写入同步到多个副本中的任务,则由 raft-rs 库自己的内部实现来完成,无须应用程序进行额外干预。本文将对数据冗余复制的过程进行详细展开,特别是关于 snapshot 及流量控制的机制,帮助读者更深入地理解 Raft 的原理。网络
在 Raft leader 上,应用程序经过 RawNode::propose 发起的写入会被处理成一条 MsgPropose 类型的消息,而后调用 Raft::append_entry 和 Raft::bcast_append 将消息中的数据追加到 Raft 日志中并广播到其余副本上。总体流程如伪代码所示:app
fn Raft::step_leader(&mut self, mut m: Message) -> Result<()> { if m.get_msg_type() == MessageType::MsgPropose { // Propose with an empty entry list is not allowed. assert!(!m.get_entries().is_empty()); self.append_entry(&mut m.mut_entries()); self.bcast_append(); } }
这段代码中 append_entry
的参数是一个可变引用,这是由于在 append_entry
函数中会为每个 Entry 赋予正确的 term 和 index。term 由选举产生,在一个 Raft 系统中,每选举出一个新的 Leader,便会产生一个更高的 term。而 index 则是 Entry 在 Raft 日志中的下标。Entry 须要带上 term 和 index 的缘由是,在其余副本上的 Raft 日志是可能跟 Leader 不一样的,例如一个旧 Leader 在相同的位置(即 Raft 日志中具备相同 index 的地方)广播了一条过时的 Entry,那么当其余副本收到了重叠的、可是具备更高 term 的消息时,即可以用它们替换旧的消息,以便达成与最新的 Leader 一致的状态。框架
在 Leader 将新的写入追加到本身的 Raft log 中以后,即可以调用 bcast_append
将它们广播到其余副本了。注意这个函数并无任何参数,那么 Leader 如何知道应该给每个副本从哪个位置开始广播呢?原来在 Leader 上对每个副本,都关联维护了一个 Progress,该结构体定义以下:分布式
pub struct Progress { pub matched: u64, // 该副本指望接收的下一个 Entry 的 index pub next_idx: u64, // 未 commit 的消息的滑动窗口 pub ins: Inflights, // ProgressState::Probe:Leader 每一个心跳间隔中最多发送一条 MsgAppend // ProgressState::Replicate:Leader 在每一个心跳间隔中能够发送多个 MsgAppend // ProgressState::Snapshot:Leader 没法再继续发送 MsgAppend 给这个副本 pub state: ProgressState, // 是否暂停给这个副本发送 MsgAppend 了 pub paused: bool, // 一些其余字段…… }
如代码注释中所说的那样,Leader 在给副本广播新的日志时,会从对应的副本的 next_idx
开始。这就蕴含了两个问题:ide
next_idx
应该如何设置?next_idx
?第一个问题的答案在 Raft::reset
函数中。这个函数会在 Raft 完成选举以后选出的 Leader 上调用,会将 Leader 的全部其余副本的 next_idx
设置为跟 Leader 相同的值。以后,Leader 就能够会按照 Raft 论文里的规定,广播一条包含了本身的 term 的空 Entry 了。函数
第二个问题的答案在 Raft::handle_append_response
函数中。咱们继续考察上面的情景,Leader 的其余副本在收到 Leader 广播的最新的日志以后,可能会采起两种动做:优化
fn Raft::handle_append_entries(&mut self, m: &Message) { let mut to_send = Message::new_message_append_response(); match self.raft_log.maybe_append(...) { // 追加日志成功,将最新的 last index 上报给 Leader Some(last_index) => to_send.set_index(last_index), // 追加日志失败,设置 reject 标志,并告诉 Leader 本身的 last index None => { to_send.set_reject(true); to_send.set_reject_hint(self.raft_log.last_index()); } } } self.send(to_send);
其余副本调用 maybe_append
失败的缘由多是比 Leader 的日志更少,可是 Leader 在刚选举出来的时候将全部副本的 next_idx
设置为与本身相同的值了。这个时候这些副本就会在 MsgAppendResponse 中设置拒绝的标志。在 Leader 接收到这样的反馈以后,就能够将对应副本的 next_idx
设置为正确的值了。这个逻辑在 Raft::handle_append_response
中:spa
fn Raft::handle_append_response(&mut self, m: &Message, …) { if m.get_reject() { let pr: &mut Progress = self.get_progress(m.get_from()); // 将副本对应的 `next_idx` 回退到一个合适的值 pr.maybe_decr_to(m.get_index(), m.get_reject_hint()); } else { // 将副本对应的 `next_idx` 设置为 `m.get_index() + 1` pr.maybe_update(m.get_index()); } }
以上伪代码中咱们省略了一些丢弃乱序消息的代码,避免过多的细节形成干扰。
上一节咱们重点观察了 MsgAppend 及 MsgAppendResponse 消息的处理流程,原理是很是简单、清晰的。然而,这个未经任何优化的实现可以工做的前提是在 Leader 收到某个副本的 MsgAppendResponse 以前,再也不给它发送任何 MsgAppend。因为等待响应的时间取决于网络的 TTL,这在实际应用中是很是低效的,所以咱们须要引入 pipeline 优化,以及配套的流量控制机制来避免“优化”带来的网络壅塞。
Pipeline 在 Raft::prepare_send_entries
函数中被引入。这个函数在 Raft::send_append
中被调用,内部会直接修改对目标副本的 next_idex
值,这样,后续的 MsgAppend 即可以在此基础上继续发送了。而一旦以前的 MsgAppend 被该目标副本拒绝掉了,也能够经过上一节中介绍的 maybe_decr_to
机制将 next_idx
重置为正确的值。咱们来看一下这段代码:
// 这个函数在 `Raft::prepare_send_entries` 中被调用 fn Progress::update_state(&mut self, last: u64) { match self.state { ProgressState::Replicate => { self.next_idx = last + 1; self.ins.add(last); }, ProgressState::Probe => self.pause(), _ => unreachable!(), } }
Progress 有 3 种不一样的状态,如这个结构体的定义的代码片断所示。其中 Probe 状态和 Snapshot 状态会在下一节详细介绍,如今只须要关注 Replicate 状态。咱们已经知道 Pipeline 机制是由更新 next_idx
的那一行引入的了,那么下面更新 ins
的一行的做用是什么呢?
从 Progress 的定义的代码片断中咱们知道,ins
字段的类型是 Inflights,能够想象成一个相似 TCP 的滑动窗口:全部 Leader 发出了,可是还没有被目标副本响应的消息,都被框在该副本在 Leader 上对应的 Progress 的 ins
中。这样,因为滑动窗口的大小是有限的,Raft 系统中任意时刻的消息数量也会是有限的,这就实现了流量控制的机制。更具体地,Leader 在给某一副本发送 MsgAppend 时,会检查其对应的滑动窗口,这个逻辑在 Raft::send_append
函数中;在收到该副本的 MsgAppendResponse 以后,会适时调用 Inflights 的 free_to
函数,使窗口向前滑动,这个逻辑在 Raft::handle_append_response
中。
咱们已经在 Progress 结构体的定义以及上面一些代码片断中见过了 ProgressState 这个枚举类型。在 3 种可能的状态中,Replicate 状态是最容易理解的,Leader 能够给对应的副本发送多个 MsgAppend 消息(不超过滑动窗口的限制),并适时地将窗口向前滑动。然而,咱们注意到,在 Leader 刚选举出来时,Leader 上面的全部其余副本的状态却被设置成了 Probe。这是为何呢?
从 Progress 结构体的字段注释中,咱们知道当某个副本处于 Probe 状态时,Leader 只能给它发送 1 条 MsgAppend 消息。这是由于,在这个状态下的 Progress 的 next_idx
是 Leader 猜出来的,而不是由这个副本明确的上报信息推算出来的。它有很大的几率是错误的,亦即 Leader 极可能会回退到某个地方从新发送;甚至有可能这个副本是不活跃的,那么 Leader 发送的整个滑动窗口的消息均可能浪费掉。所以,咱们引入 Probe 状态,当 Leader 给处于这一状态的副本发送了 MsgAppend 时,这个 Progress 会被暂停掉(源码片断见上一节),这样在下一次尝试给这个副本发送 MsgAppend 时,会在 Raft::send_append
中跳过。而当 Leader 收到了这个副本上报的正确的 last index 以后,Leader 便知道下一次应该从什么位置给这个副本发送日志了,这一过程在 Progress::maybe_update
函数中:
fn Progress::maybe_update(&mut self, n: u64) { if self.matched < n { self.matched = n; self.resume(); // 取消暂停的状态 } if self.next_idx < n + 1 { self.next = n + 1; } }
ProgressState::Snapshot 状态与 Progress 中的 pause 标志十分类似,一个副本对应的 Progress 一旦处于这个状态,Leader 便不会再给这个副本发送任何 MsgAppend 了。可是仍有细微的差异:事实上在 Leader 收到 MsgHeartbeatResponse 时,也会调用 Progress::resume
来将取消对该副本的暂停,然而对于 ProgressState::Snapshot 状态的 Progress 则没有这个逻辑。这个状态会在 Leader 成功发送完成 Snapshot,或者收到了对应的副本的最新的 MsgAppendResponse 以后被改变,详细的逻辑请参考源代码,这里就不做赘述了。
咱们把篇幅留给在 Follower 上收到 Snapshot 以后的处理逻辑,主要是 Raft::restore_raft
和 RaftLog::restore
两个函数。前者中主要包含了对 Progress 的处理,由于 Snapshot 包含了 Leader 上最新的信息,而 Leader 上的 Configuration 是可能跟 Follower 不一样的。后者的主要逻辑伪代码以下所示:
fn RaftLog::restore(&mut self, snapshot: Snapshot) { self.committed = snapshot.get_metadata().get_index(); self.unstable.restore(snapshot); }
能够看到,内部仅更新了 committed,并无更新 applied。这是由于 raft-rs 仅关心 Raft 日志的部分,至于如何把日志中的内容更新到真正的状态机中,是应用程序的任务。应用程序须要从上一篇文章中介绍的 Ready 接口中把 Snapshot 拿到,而后自行将其应用到状态机中,最后再经过 RawNode::advance
接口将 applied 更新到正确的值。
Raft 日志复制及相关的流量控制、Snapshot 流程就介绍到这里,代码仓库仍然在 https://github.com/pingcap/raft-rs,source-code 分支。下一期 raft-rs 源码解析咱们会继续为你们带来 configuration change 相关的内容,敬请期待!