etcd raft如何实现成员变动

成员变动在一致性协议里稍复杂一些,因为不一样的成员不可能在同一时刻从旧成员组切换至新成员组,因此可能出现两个不相交的majority,从而致使同一个term出现两个leader,进而致使同一个index的日志不一致,违反一致性协议。下图是个例子:
node

raft做者提出了一种比较简单的方法,一次只增长或减小一个成员,这样可以保证任什么时候刻,都不可能出现两个不相交的majority,因此,能够从旧成员组直接切到新成员组。以下图:
算法

切换的时机是把成员变动日志写盘的时候,无论是否commit。这个切换时机带来的问题是若是这条成员变动日志最终没有commit,在发生leader切换的时候,成员组就须要回滚到旧的成员组。app

etcd raft为了实现简单,将切换成员组的实机选在apply成员变动日志的时候。ui

下面看看etcd raft library如何实现的:日志

应用调用code

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
    data, err := cc.Marshal()
    if err != nil {
        return err
    }
    return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}

能够看出,ConfChange是和普通的log entry同样封装在MsgProp消息中,进入propc,orm

跑raft算法的goroutine从propc中拿到消息后,会作以下判断:blog

for i, e := range m.Entries {
            if e.Type == pb.EntryConfChange {
                if r.pendingConf {
                    r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                }
                r.pendingConf = true
            }
}

检查已经有成员变动正在作,就忽略新的成员变动。而后将pendingConf置为true,意味着目前有成员变动正在作了,从这里能够看出,多个成员变动不能同时进行。follower接收端的处理和普通log entry同样。接口

若是成员变动日志达成了一致,则会被封装在Ready中,应用拿到后,作以下处理:rem

if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
}

ApplyConfChange:

func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
    var cs pb.ConfState
    select {
    case n.confc <- cc:
    case <-n.done:
    }
    select {
    case cs = <-n.confstatec:
    case <-n.done:
    }
    return &cs
}

讲ConfChange放入confc,而后阻塞在confstatec上,跑raft协议的goroutine从confc中拿出ConfChange,作相应的增长/删除节点操做,而后将成员组放入confstatec。

switch cc.Type {
            case pb.ConfChangeAddNode:
                r.addNode(cc.NodeID)
            case pb.ConfChangeRemoveNode:
                // block incoming proposal when local node is
                // removed
                if cc.NodeID == r.id {
                    propc = nil
                }
                r.removeNode(cc.NodeID)
            case pb.ConfChangeUpdateNode:
                r.resetPendingConf()
            default:
                panic("unexpected conf type")
            }
            select {
            case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
            case <-n.done:
}

增长/删除节点操做都只是更新prs,map的每一个元素保存一个peer的状态,其中最重要的状态莫过于

Match, Next uint64

看过raft小论文的人一看变量名就很明确意义,Match表明最大的已经落盘的log index,Next表明下一条须要发给这个peer的log index。而后将pendingConf置为false,表明成员变动结束。

重启如何恢复成员组:

hs, cs, err := c.Storage.InitialState()

Storage接口中:

// InitialState returns the saved HardState and ConfState information.
    InitialState() (pb.HardState, pb.ConfState, error)

Storage是个接口,其中InitialState()用于恢复成员组,须要应用本身实现,一般将ConfState记在最后一次Snapshot的Metadata中:

message SnapshotMetadata {
    optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
    optional uint64    index      = 2 [(gogoproto.nullable) = false];
    optional uint64    term       = 3 [(gogoproto.nullable) = false];
}

ConfState:

message ConfState {
    repeated uint64 nodes = 1;
}

拿到ConfState后就能够初始化上面提到的prs,snapshot后续的已经commit的log entry同样,经过Ready封装,应用进行apply,若是其中有ConfChange,则调用

s.Node.ApplyConfChange(cc)
相关文章
相关标签/搜索