早在2013年11月份,在raft论文还只能在网上下载到草稿版时,我曾经写过一篇blog对其进行简要分析。4年过去了,各类raft协议的讲解铺天盖地,raft也确实获得了普遍的应用。其中最知名的应用莫过于etcd。etcd将raft协议自己实现为一个library,位于https://github.com/coreos/etcd/tree/master/raft,而后自己做为一个应用使用它。html
本文不讲解raft协议核心内容,而是站在一个etcd raft library使用者的角度,讲解要用上这个library须要了解的东西。node
这个library使用起来相对来讲仍是有点麻烦。官方有一个使用示例在 https://github.com/coreos/etcd/tree/master/contrib/raftexample。总体来讲,这个库实现了raft协议核心的内容,好比append log的逻辑,选主逻辑,snapshot,成员变动等逻辑。须要明确的是:library没有实现消息的网络传输和接收,库只会把一些待发送的消息保存在内存中,用户自定义的网络传输层取出消息并发送出去,而且在网络接收端,须要调一个library的函数,用于将收到的消息传入library,后面会详细说明。同时,library定义了一个Storage接口,须要library的使用者自行实现。git
Storage接口以下:github
// Storage is an interface that may be implemented by the application // to retrieve log entries from storage. // // If any Storage method returns an error, the raft instance will // become inoperable and refuse to participate in elections; the // application is responsible for cleanup and recovery in this case. type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available. Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (pb.Snapshot, error) }
这些接口在library中会被用到。熟悉raft协议的人不难理解。上面提到的官方示例https://github.com/coreos/etcd/tree/master/contrib/raftexample中使用了library自带的MemoryStorage,和etcd的wal和snap包作持久化,重启的时候从wal和snap中获取日志恢复MemoryStorage。网络
要提供这种IO/网络密集型的东西,提升吞吐最好的手段就是batch加批处理了。etcd raft library正是这么作的。并发
下面看一下为了作这事,etcd提供的核心抽象Ready结构体:app
// Ready encapsulates the entries and messages that are ready to read, // be saved to stable storage, committed or sent to other peers. // All fields in Ready are read-only. type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
能够说,这个Ready结构体封装了一批更新,这些更新包括:async
库的使用者从node结构体提供的一个ready channel中不断的pop出一个个的Ready进行处理,库使用者经过以下方法拿到Ready channel:函数
func (n *node) Ready() <-chan Ready { return n.readyc }
应用须要对Ready的处理包括:ui
应用经过raft.StartNode()来启动raft中的一个副本,函数内部经过启动一个goroutine运行
func (n *node) run(r *raft)
来启动服务。
应用经过调用
func (n *node) Propose(ctx context.Context, data []byte) error
来Propose一个请求给raft,被raft开始处理后返回。
增删节点经过调用
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
node结构体包含几个重要的channel:
// node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} tickc chan struct{} done chan struct{} stop chan struct{} status chan chan Status logger Logger }
recvc: 应用自定义的transport在收到Message后须要调用
func (n *node) Step(ctx context.Context, m pb.Message) error来把Message放入recvc中,通过一些处理后,一样,会把须要发送的Message放入到对应peers的mailbox中。后续经过自定义transport发送出去。
readyc/advancec: readyc和advancec都是没有buffer的channel,node.run()内部把相关的一些状态更新打包成Ready结构体(其中一种状态就是上面提到的msgs)放入readyc中。应用从readyc中pop出Ready中,对相应的状态进行处理,处理完成后,调用
rc.node.Advance()往advancec中push一个空结构体告诉raft,已经对这批Ready包含的状态进行了相应的处理,node.run()内部从advancec中获得通知后,对内部一些状态进行处理,好比把已经持久化到storage中的entries从内存(对应type unstable struct)中删除等。
confc/confstatec:应用从Ready中拿出CommittedEntries,检查其若是含有成员变动类型的日志,则须要调用
func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState
这个函数会push ConfChange到confc中,confc一样是个无buffer的channel,node.run()内部会从confc中拿出ConfChange,而后进行真正的增减peers操做,以后将最新的成员组push到confstatec中,而ApplyConfChange函数从confstatec pop出最新的成员组返回给应用。
能够说,要想用上etcd的raft library仍是须要了解很多东西的。