聊聊kingbus的starRaft

本文主要研究一下kingbus的starRaftnode

starRaft

kingbus/server/server.gogit

func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
    var (
        etcdRaftNode etcdraft.Node
        id           types.ID
        cl           *membership.RaftCluster
        remotes      []*membership.Member
        appliedIndex uint64
    )

    prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
    if err != nil {
        return err
    }

    store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
    if err != nil {
        log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    }

    //store, err := storage.NewMemoryStorage(cfg.DataDir)
    //if err != nil {
    //    log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    //}

    defer func() {
        //close storage when occur error
        if err != nil {
            store.Close()
        }
    }()

    logExist := utils.ExistLog(cfg.DataDir)
    switch {
    case !logExist && !cfg.NewCluster:
        if err = cfg.VerifyJoinExisting(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
        existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
        if gerr != nil {
            return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
        }
        if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
            return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
        }

        remotes = existingCluster.Members()
        cl.SetID(existingCluster.GetID())
        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
    case !logExist && cfg.NewCluster:
        if err = cfg.VerifyBootstrap(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        m := cl.MemberByName(cfg.Name)
        if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
            return fmt.Errorf("member %s has already been bootstrapped", m.ID)
        }

        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
    case logExist:
        if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
            return fmt.Errorf("cannot write to member directory: %v", err)
        }
        //node restart, read states from storage
        //get applied index
        appliedIndex = raft.MustGetAppliedIndex(store)
        cfg.AppliedIndex = appliedIndex
        id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
        cl.SetStore(store)
    default:
        return fmt.Errorf("unsupported bootstrap config")
    }

    s.raftNode = raft.NewNode(
        raft.NodeConfig{
            IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
            Node:        etcdRaftNode,
            Heartbeat:   cfg.HeartbeatMs,
            Storage:     store,
        },
    )
    //committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
    //set appliedIndex when applyEntries will check the entry continuity
    s.raftNode.SetAppliedIndex(appliedIndex)

    s.id = id
    s.wait = wait.New()
    s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
    s.stopping = make(chan struct{})
    s.errorc = make(chan error)
    s.applyBroadcast = utils.NewBroadcast()
    s.stats = stats.NewServerStats(cfg.Name, id.String())
    s.lstats = stats.NewLeaderStats(id.String())
    s.store = store

    tr := &rafthttp.Transport{
        TLSInfo:     transport.TLSInfo{},
        DialTimeout: DialTimeout,
        ID:          id,
        URLs:        cfg.PeerURLs,
        ClusterID:   cl.GetID(),
        Raft:        s,
        ServerStats: s.stats,
        LeaderStats: s.lstats,
        ErrorC:      s.errorc,
    }
    if err = tr.Start(); err != nil {
        return err
    }
    // add all remotes into transport
    //Add remotes to rafthttp, who help newly joined members catch up the
    //progress of the cluster. It supports basic message sending to remote, and
    //has no stream connection for simplicity. remotes will not be used
    //after the latest peers have been added into rafthttp.
    for _, m := range remotes {
        if m.ID != id {
            tr.AddRemote(m.ID, m.PeerURLs)
        }
    }
    for _, m := range cl.Members() {
        if m.ID != id {
            tr.AddPeer(m.ID, m.PeerURLs)
        }
    }
    s.raftNode.Transport = tr
    s.cluster = cl

    return nil
}
  • starRaft方法先经过rafthttp.NewRoundTripper建立http.RoundTripper,以后经过storage.NewDiskStorage建立DiskStorage,以后根据logExist及cfg.NewCluster作不一样处理;若两者都为false则更新membership.RaftCluster的id为存在的cluster的id,而后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后建立rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

startEtcdRaftNode

kingbus/server/server.gogithub

func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
    id types.ID, n etcdraft.Node) {
    member := cl.MemberByName(cfg.Name)
    peers := make([]etcdraft.Peer, len(ids))

    for i, id := range ids {
        ctx, err := json.Marshal((*cl).Member(id))
        if err != nil {
            log.Log.Panicf("marshal member should never fail: %v", err)
        }
        peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
    }
    id = member.ID
    log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())

    c := &etcdraft.Config{
        ID:                        uint64(id),
        ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
        HeartbeatTick:             1,
        Storage:                   store,
        MaxSizePerMsg:             cfg.MaxRequestBytes,
        MaxInflightMsgs:           maxInflightMsgs,
        CheckQuorum:               true,
        PreVote:                   cfg.PreVote,
        DisableProposalForwarding: true,
        Logger:                    log.Log,
    }

    n = etcdraft.StartNode(c, peers)
    raft.AdvanceTicks(n, c.ElectionTick)
    return id, n
}
  • startEtcdRaftNode方法经过指定的ids建立peers,以后执行etcdraft.StartNode及raft.AdvanceTicks

restartEtcdNode

kingbus/server/server.gojson

func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
    types.ID, etcdraft.Node, *membership.RaftCluster) {
    cl, err := membership.GetRaftClusterFromStorage(store)
    if err != nil {
        if err != nil {
            log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
        }
    }

    log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())

    //get id from raftCluster
    member := cl.MemberByName(cfg.Name)
    if member == nil {
        log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
            cl.String(), cfg.Name)
    }
    c := &etcdraft.Config{
        ID:                        uint64(member.ID),
        ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
        HeartbeatTick:             1,
        Applied:                   cfg.AppliedIndex, //set appliedIndex
        Storage:                   store,
        MaxSizePerMsg:             cfg.MaxRequestBytes,
        MaxInflightMsgs:           maxInflightMsgs,
        CheckQuorum:               true,
        PreVote:                   cfg.PreVote,
        DisableProposalForwarding: true,
        Logger:                    log.Log,
    }

    n := etcdraft.RestartNode(c)
    return member.ID, n, cl
}
  • restartEtcdNode方法经过membership.GetRaftClusterFromStorage(store)获取RaftCluster,以后经过cl.MemberByName(cfg.Name)获取Member,而后使用member.ID构造etcdraft.Config,最后根据etcdraft.Config执行etcdraft.RestartNode

小结

starRaft方法先经过rafthttp.NewRoundTripper建立http.RoundTripper,以后经过storage.NewDiskStorage建立DiskStorage,以后根据logExist及cfg.NewCluster作不一样处理;若两者都为false则更新membership.RaftCluster的id为存在的cluster的id,而后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后建立rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeerbootstrap

doc