做者:喵叔
原文:blog.betacat.io/post/raft-i…node
Raft是近年来比较流行的一个一致性算法。它的原理比较容易理解,网上也有不少相关的介绍,所以这里我就再也不啰嗦原理了,而是打算以raft在etcd中的实现[1]为例,从工程的角度来说讲这个算法的一个具体实现,毕竟了解原理只算是“纸上谈兵”,离真正能把它应用起来还有很长一段距离。git
若是你还不熟悉raft,这个经典的动画演示、它的论文以及这个lecture可能会对你有帮助。或者你也能够直接观看下面的视频,这是我做的一次技术分享,讲的是etcd中raft模块的源码解析。说句题外话,不少Conference和Meetup都会把视频录像上传到YouTube上,YouTube简直就是程序员的衣柜,每逛一次都有新收获。www.youtube.com/watch?v=sL0…程序员
Etcd将raft协议实现为一个library,而后自己做为一个应用使用它。固然,多是为了推广它所实现的这个library,etcd还额外提供了一个叫raftexample的示例程序,向用户展现怎样在它所提供的raft library的基础上构建出一个分布式的KV存储引擎。github
在etcd中,raft做为底层的共识模块,运行在一个goroutine
里,经过channel
接受上层(etcdserver)传来的消息,并将处理后的结果经过另外一个channel
返回给上层应用,他们的交互过程大概是这样的:算法
这种全异步的交互方式好处就是它提升了性能,但坏处就是难以调试,代码看起来会很绕。拿etcd举例,不少时候你只看到它把一个消息push到一个slice/channel里面,而后这部分函数调用链就结束了,你没法直观的追踪到,究竟是谁最后处理了这个消息。数组
咱们来看一下这个raft library里面都有哪些文件:bash
$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'
.
├── raftpb
├── doc.go
├── log.go
├── log_unstable.go
├── logger.go
├── node.go
├── progress.go
├── raft.go
├── rawnode.go
├── read_only.go
├── status.go
├── storage.go
└── util.go
复制代码
下面按功能模块依次介绍:网络
Raft中的序列化是借助于Protocol Buffer来实现的,这个文件夹就定义了须要序列化的几个数据结构,咱们先从Entry
和Message
开始看起:数据结构
从总体上来讲,一个集群中的每一个节点都是一个状态机,而raft管理的就是对这个状态机进行更改的一些操做,这些操做在代码中被封装为一个个Entry
。app
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L203
type Entry struct {
Term uint64
Index uint64
Type EntryType
Data []byte
}
复制代码
Term
和Index
以后,一个log entry就能被惟一标识。Type
是EntryNormal
,那这里的Data就多是具体要更改的key-value pair,若是Type
是EntryConfChange
,那Data就是具体的配置更改项ConfChange。raft算法自己并不关心这个数据是什么,它只是把这段数据当作log同步过程当中的payload来处理,具体对这个数据的解析则有上层应用来完成。Raft集群中节点之间的通信都是经过传递不一样的Message
来完成的,这个Message
结构就是一个很是general的大容器,它涵盖了各类消息所需的字段。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L239
type Message struct {
Type MessageType
To uint64
From uint64
Term uint64
LogTerm uint64
Index uint64
Entries []Entry
Commit uint64
Snapshot Snapshot
Reject bool
RejectHint uint64
Context []byte
}
复制代码
MsgVote
会用到这个字段。MsgVote
的话,表明这个candidate最后一条日志的索引号,它跟上面的LogTerm
一块儿表明这个candidate所拥有的最新日志信息,这样别人就能够比较本身的日志是否是比candidata的日志要新,从而决定是否投票。MsgSnap
合用,用来放置具体的Snapshot值。顾名思义,unstable数据结构用于尚未被用户层持久化的数据,它维护了两部份内容snapshot
和entries
:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log_unstable.go#L23
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
logger Logger
}
复制代码
entries
表明的是要进行操做的日志,但日志不可能无限增加,在特定的状况下,某些过时的日志会被清空。那这就引入一个新问题了,若是此后一个新的follower
加入,而leader
只有一部分操做日志,那这个新follower
不是无法跟别人同步了吗?因此这个时候snapshot
就登场了 - 我没法给你以前的日志,但我给你全部以前日志应用后的结果,以后的日志你再以这个snapshot
为基础进行应用,那咱们的状态就能够同步了。所以它们的结构关系能够用下图表示[3]:
这里的前半部分是快照数据,然后半部分是日志条目组成的数组entries,另外unstable.offset成员保存的是entries数组中的第一条数据在raft日志中的索引,即第i条entries在raft日志中的索引为i + unstable.offset
。
这个文件定义了一个Storage接口,由于etcd中的raft实现并不负责数据的持久化,因此它但愿上面的应用层能实现这个接口,以便提供给它查询log的能力。
另外,这个文件也提供了Storage
接口的一个内存版本的实现MemoryStorage,这个实现一样也维护了snapshot
和entries
这两部分,他们的排列跟unstable
中的相似,也是snapshot
在前,entries
在后。从代码中看来etcdserver
和raftexample
都是直接用的这个实现来提供log的查询功能的。
有了以上的介绍unstable、Storage的准备以后,下面能够来介绍raftLog的实现,这个结构体承担了raft日志相关的操做。
raftLog由如下成员组成:
须要说明的是,一条日志数据,首先须要被提交(committed)成功,而后才能被应用(applied)到状态机中。所以,如下不等式一直成立:applied <= committed
。
raftLog结构体中,几部分数据的排列以下图所示[3:1]:
这个数据排布的状况,能够从raftLog的初始化函数中看出来:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log.go#L45
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
return log
}
复制代码
所以,从这里的代码能够看出,raftLog的两部分,持久化存储和非持久化存储,它们之间的分界线就是lastIndex,在此以前都是Storage
管理的已经持久化的数据,而在此以后都是unstable
管理的尚未持久化的数据。
以上分析中还有一个疑问,为何并无初始化unstable.snapshot成员,也就是unstable结构体的快照数据?缘由在于,上面这个是初始化函数,也就是节点刚启动的时候调用来初始化存储状态的函数,而unstable.snapshot数据,是在启动以后同步数据的过程当中,若是须要同步快照数据时才会去进行赋值修改的数据,所以在这里并无对它进行操做的地方。
Leader经过Progress
这个数据结构来追踪一个follower的状态,并根据Progress
里的信息来决定每次同步的日志项。这里介绍三个比较重要的属性:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/progress.go#L37
// Progress represents a follower’s progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
Match, Next uint64
State ProgressStateType
ins *inflights
}
复制代码
用来保存当前follower节点的日志状态的属性:
Next
开始发送日志。在正常状况下,Next = Match + 1
,也就是下一个要同步的日志应当是对方已有日志的下一条。
State
属性用来保存该节点当前的同步状态,它会有一下几种取值[4]:
探测状态,当follower拒绝了最近的append消息时,那么就会进入探测状态,此时leader会试图继续往前追溯该follower的日志从哪里开始丢失的。在probe状态时,leader每次最多append一条日志,若是收到的回应中带有RejectHint
信息,则回退Next
索引,以便下次重试。在初始时,leader会把全部follower的状态设为probe,由于它并不知道各个follower的同步状态,因此须要慢慢试探。
当leader确认某个follower的同步状态后,它就会把这个follower的state切换到这个状态,而且用pipeline
的方式快速复制日志。leader在发送复制消息以后,就修改该节点的Next
索引为发送消息的最大索引+1。
接收快照状态。当leader向某个follower发送append消息,试图让该follower状态跟上leader时,发现此时leader上保存的索引数据已经对不上了,好比leader在index为10以前的数据都已经写入快照中了,可是该follower须要的是10以前的数据,此时就会切换到该状态下,发送快照给该follower。当快照数据同步追上以后,并非直接切换到Replicate状态,而是首先切换到Probe状态。
ins
属性用来作流量控制,由于若是同步请求很是多,再碰上网络分区时,leader可能会累积不少待发送消息,一旦网络恢复,可能会有很是大流量发送给follower,因此这里要作flow control。它的实现有点相似TCP的滑动窗口,这里再也不赘述。
综上,Progress
其实也是个状态机,下面是它的状态转移图:
前面铺设了一大堆概念,如今终于轮到实现逻辑了。从名字也能够看出,raft协议的具体实现就在这个文件里。这其中,大部分的逻辑是由Step
函数驱动的。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raft.go#L752
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgHup:
//...
case pb.MsgVote, pb.MsgPreVote:
//...
default:
r.step(r, m)
}
}
复制代码
Step
的主要做用是处理不一样的[消息]({{< relref "#message" >}}),因此之后当咱们想知道raft对某种消息的处理逻辑时,到这里找就对了。在函数的最后,有个default
语句,即全部上面不能处理的消息都落入这里,由一个小写的step
函数处理,这个设计的缘由是什么呢?
实际上是由于这里的raft也被实现为一个状态机,它的step
属性是一个函数指针,根据当前节点的不一样角色,指向不一样的消息处理函数:stepLeader/stepFollower/stepCandidate。与它相似的还有一个tick
函数指针,根据角色的不一样,也会在tickHeartbeat和tickElection之间来回切换,分别用来触发定时心跳和选举检测。这里的函数指针感受像实现了OOP
里的多态。
node
的主要做用是应用层(etcdserver)和共识模块(raft)的衔接。将应用层的消息传递给底层共识模块,并将底层共识模块共识后的结果反馈给应用层。因此它的初始化函数建立了不少用来通讯的channel
,而后就在另外一个goroutine
里面开始了事件循环,不停的在各类channel
中倒腾数据(貌似这种由for-select-channel
组成的事件循环在Go里面很受欢迎)。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L286
for {
select {
case m := <-propc:
r.Step(m)
case m := <-n.recvc:
r.Step(m)
case cc := <-n.confc:
// Add/remove/update node according to cc.Type
case <-n.tickc:
r.tick()
case readyc <- rd:
// Cleaning after result is consumed by application
case <-advancec:
// Stablize logs
case c := <-n.status:
// Update status
case <-n.stop:
close(n.done)
return
}
}
复制代码
propc
和recvc
中拿到的是从上层应用传进来的消息,这个消息会被交给raft层的Step
函数处理,具体处理逻辑我上面有过介绍。
下面来解释下readyc
的做用。在etcd的这个实现中,node
并不负责数据的持久化、网络消息的通讯、以及将已经提交的log应用到状态机中,因此node
使用readyc
这个channel
对外通知有数据要处理了,并将这些须要外部处理的数据打包到一个Ready
结构体中:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L52
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
复制代码
应用程序获得这个Ready
以后,须要:
node.ApplyConfChange()
方法让node
知道。node.Advance()
告诉raft,这批状态更新处理完了,状态已经演进了,能够给我下一批Ready让我处理。前面咱们把整个包的结构过了一遍,下面来结合具体的代码看看raft对一个请求的处理过程是怎样的。我一直以为,若是能从代码的层面追踪到一个请求的处理过程,那不管是从宏观仍是微观的角度,对理解整个系统都是很是有帮助的。
首先,在node
的大循环里,有一个会定时输出的tick channel
,它来触发raft.tick()
函数,根据上面的介绍可知,若是当前节点是follower,那它的tick
函数会指向tickElection
。tickElection
的处理逻辑是给本身发送一个MsgHup
的内部消息,Step
函数看到这个消息后会调用campaign
函数,进入竞选状态。
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgHup:
r.campaign(campaignElection)
}
}
复制代码
campaign
则会调用becomeCandidate
把本身切换到candidate模式,并递增Term
值。而后再将本身的Term
及日志信息发送给其余的节点,请求投票。
func (r *raft) campaign(t CampaignType) {
//...
r.becomeCandidate()
// Get peer id from progress
for id := range r.prs {
//...
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
复制代码
另外一方面,其余节点在接受到这个请求后,会首先比较接收到的Term
是否是比本身的大,以及接受到的日志信息是否是比本身的要新,从而决定是否投票。这个逻辑咱们仍是能够从Step
函数中找到:
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
}
}
复制代码
最后当candidate节点收到投票回复后,就会计算收到的选票数目是否大于全部节点数的一半,若是大于则本身成为leader,并昭告天下,不然将本身置为follower:
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
}
复制代码
一个写请求通常会经过调用node.Propose
开始,Propose
方法将这个写请求封装到一个MsgProp
消息里面,发送给本身处理。
消息处理函数Step
没法直接处理这个消息,它会调用那个小写的step
函数,来根据当前的状态进行处理。
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
//...
m.To = r.lead
r.send(m)
}
}
复制代码
Leader收到这个消息后(无论是follower转发过来的仍是本身内部产生的)会有两步操做:
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
//...
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
}
}
复制代码
在follower接受完这个log后,会返回一个MsgAppResp
消息。
当leader确认已经有足够多的follower接受了这个log后,它首先会commit这个log,而后再广播一次,告诉别人它的commit状态。这里的实现就有点像两阶段提交了。
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgAppResp:
//...
if r.maybeCommit() {
r.bcastAppend()
}
}
}
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
//...
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}
复制代码
Etcd里的raft模块只实现了raft共识算法,而像消息的网络传输,数据存储都由上层应用来完成。这篇文章先介绍了基本的数据结构,而后在这些数据结构的基础上引入了raft算法。同时,这里还以一个投票请求和写请求为例,介绍了一个请求从接受到应答的完整处理过程。
但到目前为止,咱们还有不少细节没有涉及,好比说Linearizable Read,snapshot机制,WAL的存储与回放,因此但愿你能以这篇文章为基础,顺藤摸瓜,继续深刻研究下去。