【译】实现Raft协议:Part 3 - 持久性和优化

翻译自Eli Bendersky系列博客,已得到原做者受权。git

本文是系列文章中的第一部分,本系列文章旨在介绍Raft分布式一致性协议及其Go语言实现。文章的完整列表以下:github

在这一部分,咱们会添加持久性和一些优化来完善Raft的基础实现。全部代码已上传到这个目录算法

持久性

相似Raft这样的一致性算法的目标,就是经过在独立的服务器之间复制任务来建立一个更具高可用性的系统。在此以前,咱们主要关注的是网络分区的故障,也就是集群中一些服务器与其它服务器(或客户端)断开了链接。还有另外一种失败模式就是崩溃,也就是一台服务器中止工做并重启。数据库

对于其它服务器,这看起来很像网络分区——服务器暂时断开链接,可是对于崩溃服务器自身来讲,状况就彻底不一样了,由于重启以后其内部全部的易失性存储状态都丢失了。编程

正是因为这个缘由,Raft论文中的图2中清楚地标注了哪些状态应该持久化,持久化的状态在每次更新的时候都须要刷新到非易失性存储中。在服务器发起下一次RPC或响应正在进行的RPC以前,全部须要持久化的状态都须要保存好。安全

Raft能够经过仅持久化其状态的一个子集来实现,也就是:服务器

  • currentTerm - 此服务器观察到的最新任期
  • votedFor - 在最新任期中,此服务器投赞同票的服务器ID
  • log - Raft日志条目

Q:为何commitIndexlastApplied是易失性的?网络

AcommitIndex字段是易失性的,由于在重启以后,Raft只根据持久化状态就能够获得正确的值。一旦领导者成功提交了一条新指令,它也就知道在此以前的全部指令都已经提交了。若是一个追随者崩溃又从新接入集群中,当前领导者向其发送AE请求时,会告诉其正确的commitIndexapp

重启以后,lastApplied是从0开始的,由于基本的Raft算法假定了服务(如键-值数据库)不会保存任何持久化状态。所以,须要经过重放日志条目来从新建立它的状态。固然,这是至关低效的,也有不少可行的优化方法。Raft支持在日志变大时对其进行快照,这在Raft论文的第6章节有描述,不过这也超出了本系列的讨论范围。分布式

指令传递语义

在Raft中,根据不一样状况,一条指令会屡次发给客户端。有几种场景会出现这样的状况,包括崩溃和重启(重放日志恢复服务)。

在消息传递语义方面,Raft站在至少一次阵营。一旦一条指令被呈递,它最终会被复制给全部的客户端,可是有些客户端可能会屡次看到同一条指令。

所以,建议指令须要携带惟一的ID,而客户端要忽略已经收到的指令。这个在Raft论文的第8节有更详细的描述。

存储接口

为了实现持久性,咱们在代码中添加了以下的接口:

type Storage interface {
  Set(key string, value []byte)

  Get(key string) ([]byte, bool)

  // HasData returns true if any Sets were made on this Storage.
  HasData() bool
}
复制代码

你能够将它看做是字符串到通用字节切片的映射,由持久性存储实现。

恢复和保存状态

如今CM构造函数要接受Storage做为参数并进行调用:

if cm.storage.HasData() {
  cm.restoreFromStorage(cm.storage)
}
复制代码

这里的restoreFromStorage方法也是新加的,该方法会从存储中加载持久化的状态变量,使用标准的encoding/go包对其进行反序列化:

func (cm *ConsensusModule) restoreFromStorage(storage Storage) {
  if termData, found := cm.storage.Get("currentTerm"); found {
    d := gob.NewDecoder(bytes.NewBuffer(termData))
    if err := d.Decode(&cm.currentTerm); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("currentTerm not found in storage")
  }
  if votedData, found := cm.storage.Get("votedFor"); found {
    d := gob.NewDecoder(bytes.NewBuffer(votedData))
    if err := d.Decode(&cm.votedFor); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("votedFor not found in storage")
  }
  if logData, found := cm.storage.Get("log"); found {
    d := gob.NewDecoder(bytes.NewBuffer(logData))
    if err := d.Decode(&cm.log); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("log not found in storage")
  }
}
复制代码

镜像方法是persistToStorage——将全部的状态变量编码并保存到提供的存储介质中。

func (cm *ConsensusModule) persistToStorage() {
  var termData bytes.Buffer
  if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("currentTerm", termData.Bytes())

  var votedData bytes.Buffer
  if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("votedFor", votedData.Bytes())

  var logData bytes.Buffer
  if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("log", logData.Bytes())
}
复制代码

咱们只须要在这些状态值每次变化时都调用pesistToStorage方法便可实现持久化。若是你比对一下第二部分和本部分的CM代码,就能看的该方法的调用散步在少数几个地方。

固然,这并非最有效的持久化方式,可是它简单有效,所以也能够知足咱们的须要。效率最低的部分是保存整个日志,日志在实际的应用中可能会很大。为了真正解决整个问题,Raft论文的第7节提出了一个日志压缩机制。咱们不会实现压缩,可是能够将其做为联系添加到已有的实现中。

崩溃恢复

实现持久性以后,咱们的Raft集群能够在必定程度上应对崩溃。只要是集群中的少数服务器崩溃并在以后的某个时间重启,集群对于客户端都是一直可用的(若是领导者是崩溃的服务器之一,可能还须要等集群选举出新的领导者)。提醒一下,拥有2N+1个服务器的集群能够容忍N台服务器出现故障,而且只要其它N+1台机器保持互连,集群就是一直可用的。

不可靠的RPC传递

咱们在这一部分增强了测试,我想提醒您注意关于系统弹性的另外一个方面——不可靠的RPC传递。在此以前,咱们都假设在链接的服务器之间的RPC请求都会成功到达,可能会有很小的延时。若是你看一下server.go,你会注意到其中使用了一个RPCProxy类型来实现随机延迟。每一个RPC请求会延迟1-5ms,以模拟真实世界中同一数据中心的同伴服务器通讯延时。

RPCProxy还帮助咱们实现了可选的不可靠传递。若是启动了RAFT_UNRELIABLE_RPC系统变量,RPC会偶尔出现明显的延迟(75ms)或者被直接丢弃,用于模拟真实世界中的网络故障。

咱们能够设置RAFT_UNRELIABLE_RPC以后运行以前的测试,观察Raft集群在出现这些故障时的行为——另外一个强烈推荐的练习。若是您学有余力,能够修改一下RPCProxy,对RPC应答也进行延迟,这应该只须要改几行代码。

优化发送AE

我在第二部分提到过,目前领导者的实现效率很低。领导者在leaderSendHeartbeats方法中发送AE请求,而该方法由定时器每50ms触发一次。假设有一天新的指令被呈递,领导者不会当即通知全部的追随者,而是等待下一个50ms的触发。更糟糕的是,要通知追随者某条指令被提交须要经过两次AE请求。下图展现了目前的工做流程: tries-50ms-boundary.png)

Timing diagram with AE on 50 ms boundaries
在时刻1,领导者向追随者发送了一次心跳AE,并在几ms后收到了响应。

以后有一条新指令被提交(假设是35ms后)。

领导者等到下一个50ms计时结束,也就是时刻2再向追随者发送更新后的日志。

在时刻3,追随者回复指令已经成功添加到本地的日志中。此时,领导者已经修改了它的commit index(假定已获得多数服务器确认)并能够当即通知追随者。可是,领导者一直等到下一个50ms边界(时刻4)才这样作。

最后,当追随者收到更新后的leaderCommit时,将最新的提交指令通知到客户端。

咱们的实现中,领导者Submit(X)和追随者commitChan <-X之间等待的大部分时间都是没必要要的。

咱们真正想要的执行顺利应该像下面这样:

appendentries-immediate

这正是本部分代码所作的。咱们先从startLeader开始看一下实现中的新代码:

func (cm *ConsensusModule) startLeader() {
  cm.state = Leader

  for _, peerId := range cm.peerIds {
    cm.nextIndex[peerId] = len(cm.log)
    cm.matchIndex[peerId] = -1
  }
  cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log)

  /*********如下代码是新增部分********/
  /* 该goroutine在后台运行并向同伴服务器发送AE请求: - triggerAEChan通道发送任何内容时 - 若是triggerAEChan通道没有内容时,每50ms执行一次 */
  go func(heartbeatTimeout time.Duration) {
    // Immediately send AEs to peers.
    cm.leaderSendAEs()

    t := time.NewTimer(heartbeatTimeout)
    defer t.Stop()
    for {
      doSend := false
      select {
      case <-t.C:
        doSend = true

        // Reset timer to fire again after heartbeatTimeout.
        t.Stop()
        t.Reset(heartbeatTimeout)
      case _, ok := <-cm.triggerAEChan:
        if ok {
          doSend = true
        } else {
          return
        }

        // Reset timer for heartbeatTimeout.
        if !t.Stop() {
          <-t.C
        }
        t.Reset(heartbeatTimeout)
      }

      if doSend {
        cm.mu.Lock()
        if cm.state != Leader {
          cm.mu.Unlock()
          return
        }
        cm.mu.Unlock()
        cm.leaderSendAEs()
      }
    }
  }(50 * time.Millisecond)
}
复制代码

startLeader方法中的循环不仅是等待50ms的触发器,并且等待两种可能的状况之一:

  • cm.triggerAEChan通道上的数据发送
  • 50ms的定时器计时结束

咱们稍后会看到是什么触发了cm.triggerAEChan,这个信号表示如今要发送一条AE请求。不管什么时候触发该通道,定时器都会重置,从而实现心跳逻辑——若是领导者没有新信息须要发送,最多会等待50ms。

还要注意,真正发送AE请求的方法名改成了leaderSendAEs,以便更好地反映新代码的设计意图。

如咱们所料,触发cm.triggerAEChan的方法之一 就是Submit

func (cm *ConsensusModule) Submit(command interface{}) bool {
  cm.mu.Lock()
  cm.dlog("Submit received by %v: %v", cm.state, command)
  if cm.state == Leader {
    cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
    cm.persistToStorage()
    cm.dlog("... log=%v", cm.log)
    cm.mu.Unlock()
    cm.triggerAEChan <- struct{}{}
    return true
  }

  cm.mu.Unlock()
  return false
}
复制代码

更改以下:

  • 每当收到新指令时,调用cm.persistToStorage对新的日志条目进行持久化。这与心跳请求的优化无关,可是我仍是要在这里说明一下,由于第2部分的代码没有实现该功能,并且该功能是在本文的前面描述的。
  • cm.persistToStorage上发送空结构体。这会通知领导者goroutine中的循环。
  • 锁处理顺序作了轻微调整。咱们不想在使用cm.persistToStorage发送数据时持有锁,由于在某些状况下会致使死锁。

你能猜到代码中还有什么地方会通知triggerAEChan吗?

就是在领导者处理AE应答并修改commit index的代码中,我这里就不贴出整个方法,只复制了修改的部分:

if cm.commitIndex != savedCommitIndex {
    cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
    // Commit index改变:代表领导者认为新指令能够被提交了。
	// 经过commit channel向领导者的客户端发送新指令。
	// 发送AE请求通知全部的追随者
    cm.newCommitReadyChan <- struct{}{}
    cm.triggerAEChan <- struct{}{}
  }
复制代码

这是一个重要的优化,可让咱们的代码对新指令的响应速度更快。

批量化指令提交

上一节中的代码看起来可能会让你有些不舒服,如今有不少行为是每次调用Submit时触发的——领导者当即向追随者广播RPC请求。若是咱们一次提交多条命令时会怎样?链接Raft集群的网络可能被RPC请求淹没。

尽管看起来效率低下,但其实是安全的。Raft的RPC请求都是幂等的,意味着屡次收到包含相同信息的RPC请求不会形成什么危害。

若是你担忧同时提交多条指令时致使的网络拥塞,批处理是很容易实现的。最简单的方法就是提供一种将整个指令片断发送给Submit的方式。所以,Raft实现中只有不多的代码须要修改,而后客户端就能够呈递一组指令而不会产生太多RPC通讯。做为练习试试看!

优化AE冲突解决

我想在这篇文章中讨论的另外一个优化,是在一些场景中减小领导者更新追随者日志时被拒绝的AE请求数量。回想一下,nextIndex机制从日志的尾端开始,而且每次追随者拒绝AE请求时都减1。在极少数状况下,追随者会出现严重过期,由于每次RPC请求只会回退一条指令索引,因此更新该追随者日志会花费很长时间。

论文在5.3节的最后提到了这种优化,可是没有提供任何的实现细节。为了实现它,咱们在AE应答中扩展了新的字段:

type AppendEntriesReply struct {
  Term    int
  Success bool

  // Faster conflict resolution optimization (described near the end of section
  // 5.3 in the paper.)
  ConflictIndex int
  ConflictTerm  int
}
复制代码

你能够在本部分的代码中看到其它改动。有两个地方作了改动:

  • AppendEntries是AE请求处理方法,当追随者拒绝AE请求时,会填入ConflictIndexConflictTerm
  • leaderSendAEs方法在收到AE应答时进行更新,并经过ConflictIndexConflictTerm更有效地回溯nextIndex

Raft论文中写:

在实践中,咱们怀疑这种优化是否必要,由于失败不多发生,并且不大可能有不少不一致的条目。

我彻底赞成。为了测试这个优化点,我不得不想出一个至关刻意的测试。恕我直言,在现实生活中出现这种状况的几率很是低,并且一次性节省几百ms并不能保证代码复杂度。我在这里只是将它做为Raft中特殊状况下的优化案例之一。就编程而言,这是一个很好的例子,说明在某些特定状况下,能够对Raft算法进行轻微修改来调整其行为逻辑。

Raft的设计意图是保证在普通状况下有至关快的处理速度,而且以牺牲特殊状况下的性能为代价(实际发生故障的状况)。我相信这是绝对正确的设计选择。在上一节中说到的快速发送AE请求优化是颇有必要的,由于这会直接影响公共路径。

另外一方面,像快速回溯冲突索引这样的优化,虽然在技术上颇有趣,可是在实践中并不重要,由于它们只是在集群生命周期中出现时间<0.01%的特殊场景中作出了有限的优化。

总结

至此,咱们结束了有关Raft分布式一致性算法的4篇文章,感谢阅读!

若是您对于文章内容或代码有任何问题,能够给我发送邮件或者在Github发布issue。

若是您对工业级、有实战经验的Raft实现感兴趣,我向您推荐:

  • etcd/raft是分布式键值数据库etcd中的Raft模块。
  • hashicorp/raft是一个独立的Raft一致性模块,能够帮到到不一样的客户端。

它们都实现了Raft论文中的全部特性,包括:

  • Section 6: 集群成员关系变更——若是一台Raft服务器永久离线,在不关闭集群的状况下使用另外一台服务器对其替换是很实用的。
  • Section 7:日志压缩——在实际的应用程序中,日志会变得很是大,每次修改都彻底持久化日志或者在崩溃时彻底重放日志都是很不切实际的。日志压缩定义了一种检查点机制,该机制使得集群能够有效复制很是大的日志。
相关文章
相关标签/搜索