翻译自Eli Bendersky的系列博客,已得到原做者受权。git
本文是系列文章中的第一部分,本系列文章旨在介绍Raft分布式一致性协议及其Go语言实现。文章的完整列表以下:github
在这一部分,咱们会大幅强化Raft的实现,作到可以实际处理客户端提交的指令,并在Raft集群中复制它们。代码结构与第一部分相同,会有一些新的结构体和函数定义,对旧代码也会有一些改动——我会对这些作简短的解释。算法
本部分的全部代码都在这个目录。数据库
咱们在序言中对客户端交互进行了简短的讨论,我强烈建议您返回去从新读一下对应章节。接下来,咱们不会关注客户端如何找到领导者,相反,咱们讨论的是当他已经找到领导者时,会发生什么。数组
首先说明一下术语。如前所述,客户端使用Raft协议来复制一系列的指令,这些指令能够视为通用状态机的输入。就咱们的Raft实现而言,这些指令能够是彻底任意的,咱们使用Go中的空指针类型(interface{})来进行表示。在Raft的一致性历程中,一条指令会经历如下步骤:安全
注意在呈递和提交指令之间的不对称性——在检查咱们即将讨论的实现策略时,牢记这一点很重要。一条指令会呈递给单个Raft服务器,可是一段时候后多个服务器(特别是已链接/活动的同伴服务器)都会提交这个指令并通知各自的客户端。服务器
回顾一下序言中的示意图:网络
状态机表明使用Raft协议进行复制的任意服务,如键值数据库。已提交的指令会改变服务的状态(如:在数据库中新增一个键值对)。app
当咱们在Raft ConsensusModule
上下文中讨论客户端时,一般指的是(上面说的)服务,由于这也是提交所通知的对象。换句话说,上图中的一致性模块指向服务状态机的黑色箭头就是这里所说的通知。分布式
客户端还有另外的含义,就是该服务的客户端(好比键值数据库的用户)。服务与其客户端之间的交互是服务自身的业务,在本文中,咱们只关注Raft与服务之间的交互。
译者注:做者在这里对于指令的阶段分别使用了submit和commit进行描述,在一般的翻译中,这两个词都表示提交。我我的理解,
submit
表示提交时,倾向于对象A向对象B提交某些内容,而commit
表示提交时,倾向于本地记录的确认。为了不歧义,这里将submit
翻译为呈递
,表示服务向Raft一致性模块发送了指令;commit
仍译为提交
,表示Raft一致性模块确认本地的指令记录。在这里特别解释一下,若有更好的建议,能够联系我进行修改。
在咱们的实现中,在新建ConsensusModule
时会接收一个commit channel
做为参数——CM可使用该通道向调用方发送已提交的指令:commitChan chan<- CommitEntry
。
CommitEntry
定义以下:
/* CommitEntry就是Raft向提交通道发送的数据。每一条提交的条目都会通知客户端, 代表指令已知足一致性,能够应用到客户端的状态机上。 */
type CommitEntry struct {
// Command 是被提交的客户端指令
Command interface{}
// Index 是被提交的客户端指令对应的日志索引
Index int
// Term 是被提交的客户端指令对应的任期
Term int
}
复制代码
使用channel是一种设计选择,可是这不是惟一的解决方法。咱们也能够改用回调;在建立ConsensusModule
时调用方会注册一个回调函数,一旦咱们须要提交指令,就能够执行这个回调函数。
咱们很快会看到经过channel发送日志条目的代码,在此以前,咱们必须讨论Raft服务器如何复制命令并决定是否提交。
在这个系列中,Raft日志已经被说起不少次了,可是咱们尚未对此进行过多的介绍。日志就是要应用于状态机的指令的线性序列,若是须要的话,日志要可以从某个起始状态开始”重放“状态机。正常运行时,全部Raft同伴的日志的相同的。当领导者收到新指令时,会先加入本身的日志中,而后将其复制给全部的追随者。追随者将命令放在日志中并向领导者确认,后者会记录已安全复制到集群中多数服务器的最新日志索引。
Raft论文中有一些日志的示意图,相似:
每一个方格是一条日志条目。方格顶部的数字是该 条目添加到日志时的任期(也就是第一部分所说的任期);方格底部是日志条目包含的键-值指令。每一个日志条目都有一个线性索引[2],方格的颜色用另外一种方式体现了任期。
若是上面的日志应用到一个空的键-值存储中,最终的结果就是x = 4, y = 7
。
在咱们的实现中,日志条目的结构以下:
type LogEntry struct {
Command interface{}
Term int
}
复制代码
每一个ConsensusModule
的日志属性就是数组log []LogEntry
。客户端一般不在意任期,可是,任期对于Raft的正确性相当重要,所以在阅读代码时请务必牢记。
咱们首先看一下新加的方法Submit
,客户端经过该方法呈递新指令:
/* Submit方法会向CM呈递一条新的指令。这个函数是非阻塞的; 客户端读取构造函数中传入的commit channel,以得到新提交条目的通知。 若是当前CM是领导者返回true——表示指令被接受了。 若是返回false,客户端会寻找新的服务器呈递该指令。 */
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.dlog("... log=%v", cm.log)
return true
}
return false
}
复制代码
逻辑很简单,若是CM是领导者,则将新指令添加到日志中,并返回true
。不然,忽略请求并返回false
。
Q:Submit
方法返true
是否足以代表客户端已经将指令呈递到领导者?
A:很遗憾并非。在极少数状况下,领导者可能会与其它Raft服务器之间出现网络分区,而其它服务器很快会从新选举新的领导者,可是客户端可能仍然链接在旧的领导者。客户端对于其呈递的指令应该等待一段合理的时间,检查该指令是否出如今commit channel中;若是没有的话,就代表它链接的是错误的领导者,应该链接其它领导者进行重试。
咱们刚刚看到呈递给领导者的指令被追加到了日志的末尾,可是这条新指令如何传给追随者呢?领导者的执行步骤在Raft论文的Figure 2
中的服务器规则
部分有详细描述(能够返回第一部分的附言查看详细内容)。咱们在leaderSendHeartbeats
方法中完成该逻辑,这是一个新方法[3]:
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
ni := cm.nextIndex[peerId]
prevLogIndex := ni - 1
prevLogTerm := -1
if prevLogIndex >= 0 {
prevLogTerm = cm.log[prevLogIndex].Term
}
entries := cm.log[ni:]
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: cm.commitIndex,
}
cm.mu.Unlock()
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
if cm.state == Leader && savedCurrentTerm == reply.Term {
if reply.Success {
cm.nextIndex[peerId] = ni + len(entries)
cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)
savedCommitIndex := cm.commitIndex
for i := cm.commitIndex + 1; i < len(cm.log); i++ {
if cm.log[i].Term == cm.currentTerm {
matchCount := 1
for _, peerId := range cm.peerIds {
if cm.matchIndex[peerId] >= i {
matchCount++
}
}
if matchCount*2 > len(cm.peerIds)+1 {
cm.commitIndex = i
}
}
}
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
} else {
cm.nextIndex[peerId] = ni - 1
cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
}
}
}
}(peerId)
}
}
复制代码
这确实比第一部分的逻辑复杂得多,但它实际上也就是按照论文的图2所写的。关于这段代码的几点说明:
success
字段,用于告知领导者,其请求的追随者是否获得了匹配的prevLogIndex
和prevLogTerm
。根据该字段,领导者会更新追随者对应的nextIndex
。commitIndex
是根据已复制某天日志条目的追随者数量来更新的,若是某条索引已复制到集群中的多数服务器,commitIndex
就会修改成该索引值。代码的这一部分对于前面讨论发客户端交互很是重要:
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
复制代码
newCommitReadyChan
是CM内部使用的一个通道,用来通知在commit channel
上有新条目能够发生给客户端。它是经过下面的方法起做用的,CM启动时会在goroutine运行该方法:
/* commitChanSender负责在cm.commitChan上发送已提交的日志条目。 它会监听newCommitReadyChan的通知并检查哪些条目能够发送(给客户端)。 该方法应该在单独的后台goroutine中运行;cm.commitChan可能会有缓冲来限制客户端消费已提交指令的速度。 当newCommitReadyChan关闭时方法结束。 */
func (cm *ConsensusModule) commitChanSender() {
for range cm.newCommitReadyChan {
// 查找须要执行哪些指令
cm.mu.Lock()
savedTerm := cm.currentTerm
savedLastApplied := cm.lastApplied
var entries []LogEntry
if cm.commitIndex > cm.lastApplied {
entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
cm.lastApplied = cm.commitIndex
}
cm.mu.Unlock()
cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)
for i, entry := range entries {
cm.commitChan <- CommitEntry{
Command: entry.Command,
Index: savedLastApplied + i + 1,
Term: savedTerm,
}
}
}
cm.dlog("commitChanSender done")
}
复制代码
该方法会更新lastApplied
状态变量,以了解哪些条目已发送到客户端,并保证只发送新的条目。
咱们已经讨论过领导者如何处理新日志条目,如今来查看一下追随者的代码,尤为是其中的AppendEntries
方法。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
// 请求中的任期大于本地任期,转换为追随者状态
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
// 若是当前状态不是追随者,则变为追随者
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
// 如下代码为第二部分新增
// 检查本地的日志在索引PrevLogIndex处是否包含任期与PrevLogTerm匹配的记录?
// 注意在PrevLogIndex=-1的极端状况下,这里应该是true
if args.PrevLogIndex == -1 ||
(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
reply.Success = true
// 找到插入点 —— 索引从PrevLogIndex+1开始的本地日志与RPC发送的新条目间出现任期不匹配的位置。
logInsertIndex := args.PrevLogIndex + 1
newEntriesIndex := 0
for {
if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
break
}
if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
break
}
logInsertIndex++
newEntriesIndex++
}
/* 循环结束时: - logInsertIndex指向本地日志结尾,或者是与领导者发送日志间存在任期冲突的索引位置 - newEntriesIndex指向请求条目的结尾,或者是与本地日志存在任期冲突的索引位置 */
if newEntriesIndex < len(args.Entries) {
cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
cm.dlog("... log is now: %v", cm.log)
}
// Set commit index.
if args.LeaderCommit > cm.commitIndex {
cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
cm.dlog("... setting commitIndex=%d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
}
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
复制代码
这段代码严格遵循了论文图2中的算法(AppendEntries的Received implementation
部分),并且也给出了很好的注释。
注意代码当中,若是领导者的LeaderCommit
大于自身的cm.commitIndex
时,会在cm.newCommitReadyChan
通道发送数据。这就是追随者从领导者处知道新增的日志条目能够提交的时间。
当领导者在AE请求中发送新的日志条目时,会出现如下状况:
success=true
matchIndex
。当有足够的追随者的matchIndex
指向下一索引时,领导者会更新commitIndex
并在下一次AE请求中发给全部追随者(在leaderCommit
字段中)leaderCommit
,它们会意识到有新的日志条目被提交了,它们就会经过commit channel把这些指令发给其客户端。**Q:**提交一条新指令须要多少次RPC往返?
**A:**2次。第一次请求中,领导者发送下一条日志条目给追随者,追随者进行确认。领导者处理AE应答时,可能会根据返回结果更新commit index。第二次RPC请求中,领导者会发送更新后的commit index给追随者,以后追随者会将这些日记条目标记为已提交并经过commit channel将它们发送给客户端。做为练习,请回到上面的示例代码中,找到这些步骤对应的片断。
到目前为止,咱们已经研究了为支持日志复制而添加的新代码。可是,日志也会对Raft选举产生影响。Raft论文中在5.4.1小节(选举约束)中进行了描述。除非候选人的日志与集群中多数同伴服务器同样新,不然Raft的选举程序会阻止其胜选[4]。
所以,RV请求中包含lastLogIndex
和lastLogTerm
字段。当候选人发送RV请求时,它会填入其最新日志条目的相关信息。追随者会与自己的属性进行对比,并决定该候选人是否有资格当选。
下面是最新的startElection
代码:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
var votesReceived int32 = 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
/*---------如下代码为新增--------*/
cm.mu.Lock()
savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
cm.mu.Unlock()
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
LastLogIndex: savedLastLogIndex,
LastLogTerm: savedLastLogTerm,
}
/*---------以上代码为新增--------*/
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
var reply RequestVoteReply
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&votesReceived, 1))
if votes*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
复制代码
其中是lastLogIndexAndTerm
是一个新的辅助方法:
// lastLogIndexAndTerm方法返回服务器最新的日志索引及最新的日志条目对应的任期
// (若是没有日志返回-1)要求cm.mu锁定
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
if len(cm.log) > 0 {
lastIndex := len(cm.log) - 1
return lastIndex, cm.log[lastIndex].Term
} else {
return -1, -1
}
}
复制代码
提醒一下,咱们实现中的索引是从0开始的,而不像Raft论文中是从1开始的,所以-1常常做为一个标记值。
下面是更新后的RV处理逻辑,实现了选举安全性检查:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
// 任期相同,未投票或已投票给当前请求同伴,且候选人的日志知足安全性要求, 则返回同意投票;
// 不然,返回反对投票。
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
复制代码
在第一部分中,咱们讨论了一个场景。在具备三台服务器的集群中,服务器B断开链接几秒钟,致使其变为候选人而且每隔150-300ms就发起一轮选举。当它从新连入集群中时,其任期要比留在集群中不知道有新一轮选举的同伴服务器高不少。
如今正好能够回顾这个场景,并考虑一下,若是链接正常的同伴服务器在此期间复制了新的日志条目,将会发生什么。
虽然B重返集群时会引起从新选举(领导者会在AE的应答中看到更高的任期而转换为追随者),可是由于它的日志不如A和C完整,因此B不可能胜选。这就是由于上一节所说的选举安全性检查。A或C会赢得新一轮的选举,所以(此次选举)对集群的破坏力相对较小。
若是您仍然担忧这个没必要要的影响(为何要改选?),Ongaro的论文在Preventing disruptions when a server rejoins a cluster
一节讨论了这个确切的问题。这个问题的经常使用解决方案就是”预投票“,即服务器在成为候选人以前先执行一些检查。
由于这是真的很是规情形的优化,我就不在这个主题上花费太多时间。你们能够去查看论文——Raft网站提供了连接。
结束本节以前,咱们看一些在学习和实现Raft时常见的问题。若是你有其它问题,请随时给我发邮件——我会收集最多见的问题并更新文章。
Q:为何commitIndex
和lastApplied
是分开的?咱们能不能只记录由于RPC请求(或RPC响应)致使commitIndex
变化了多少,而后只将这些变化的指令发给客户端?
A:这二者分开是为了将快速操做(RPC处理)与较慢的操做(向客户端发送命令)进行解耦。考虑一下,当追随者收到AE请求,发现领导者的commitIndex
比本身大时,会发生什么?此时,它能够向commit channel发送一下日志指令。可是在channel发送数据(或执行回调函数)多是一个潜在的阻塞操做,而咱们但愿尽量快地应答RPC请求。lastApplied
就能够帮助咱们将两者进行解耦,RPC方法只须要更新commitIndex
,后台的commitChanSender
goroutine会观察这些变化,并在空闲时把新提交的指令发送给客户端。
那你可能会问对于newCommitReadyChan
通道的操做是否是也存在这个问题?观察很仔细,可是通道是有缓冲的,并且因为通道两边都是咱们控制的,咱们能够设置一个小的缓冲区,来保证在绝大多数状况下不会阻塞。尽管如此,在极少数状况下,由于Raft代码中不可能提供无限的缓冲区,很是慢的客户端会拖延RPC请求。这未必是一件坏事,由于它会造成一种天然的背压机制。
Q:咱们在领导者中须要为每一个同伴都保存nextIndex
和matchIndex
吗?
A:只有matchIndex
时算法也仍然是有效的,可是在有些状况下效率会很低。考虑一个领导改变的状况,新的领导者不能假设任何关于其同伴的最新状况,因此将matchIndex
初始化为-1,所以就会尝试向每一个追随者都发送整个日志。可是追随者(至少大部分)极可能拥有几乎相同的日志条目;nextIndex
帮助领导者从日志末尾开始探查追随者(所需的日志),而没必要复制大量的日志。
我再一次强烈建议您研究一下代码——运行测试用例,观察输出日志。
到目前为止,咱们已经有了一个基本可使用的Raft实现,除了尚未处理持久性。这意味着咱们的实现难以应对崩溃故障,即服务器崩溃并重启。
Raft算法对此作了规定,这是第三部分将讨论的内容。增长持久性会使咱们可以应对更严格的测试,包括最坏状况下的服务器崩溃。
此外,第三部分会讨论这里提到的一些优化。更重要的是,若是领导者有新消息要发送给追随者时,应该更及时地发送AE请求,可是如今领导者只会在每50ms发送一次AE。这个也会在下一部分被修正。
举例来讲,在规模为5的集群中,领导者指望获得2个追随者的确认回复,这样总数就是3个(2个追随者加领导者自身),也就知足了多数的要求。 ↩︎
这里须要注意,虽然Raft论文中的日志索引是从1开始的,在咱们的实现中索引是从0开始的,由于这样的代码感受更天然。这些索引对于ConsensusModule
的客户端/用户没有任何实质性影响。 ↩︎
在这里将这个方法命名为leaderSendHeartbeats
有点不恰当,由于它不仅是发送心跳。可是,由于在这一部分中,该方法每隔50ms都须要发送AE请求,因此保留了这个名字。在第三部分中咱们会修正。 ↩︎
这里用了一个很是简单的解释,实际状况很复杂。这里对于正确性的推理至关复杂,我建议阅读论文以获取更多详细信息。若是你是形式主义的拥趸,Ongaro的毕业论文有章节TLA+ spec of Raft来证实这些不变式的正确性。 ↩︎