ZooKeeper系列之(九):投票选举(1)

投票选举能够说是ZooKeeper中的难点所在,也是精华所在了。数据库

ZooKeeper服务端集群在正常工做时须要一个Leader和无数个Follower,他们都对外提供统一的服务接口,客户端链接任意一台服务端发送命令其结果都是相同的,而且结果都是严格按照客户端提交读写命令的顺序来执行。服务器

其中Leader就负责维持整个集群数据的一致性和写操做的顺序执行。网络

固然集群中老是可能发生故障,当某个Follower发生故障时,该Follower会从新启动选举流程并从新链接到Leader,成功链接后会同步Leader的数据,这样就算以前有写操做失败了,经过同步过程最后会恢复完整的数据库。Leader/Follower的数据同步过程会在后面详细论述。数据结构

集群经过投票选举过程确认同一时刻集群最多只会存在一个Leader。当有新节点加入集群时,新节点会启动FastLeaderElection过程来从新选举,寻找新的Leader。框架

1、QuorumPeerui

每台服务端主机都会启动一个QuorumPeer进程,它负责投票选举,从QuorumPeerMain类中启动QuorumPeer。spa

QuorumPeer共有4种工做模式,不一样的模式启动不一样的代码:线程

  1. LOOKING:选举模式,启动FastLeaderElection
  2. LEADING:领导者模式,启动Leader
  3. FOLLOWING:跟随者模式,启动Follower
  4. OBSERVING:旁观者模式,启动Observer

QuorumPeer刚启动时首先设置为LOOKING工做模式,它会启动投票选举过程,调用FastLeadElection类,启动Messenger接收选举信息。code

它的几种工做状态的切换流程是这样的:server

 

QuorumPeer知道集群各个机器的IP地址,当经过FastLeaderElection完成选举过程后会肯定本身是Leader仍是Follower,若是是Leader则调用lead方法进入领导者工做流程;若是是Follower则调用followLeader方法进入追随者工做流程。

QuorumPeer维持在4种状态之间不停切换。在LOOKING状态时有一个细节要关注一下,若是shuttingDownLE为true,则表示要关闭FastLeaderElection线程从新启动,这时候会经过Messenger来关闭消息接收,再也不接收集群的选举信息。

startLeaderElection方法是在LOOKING状态启动选举过程的。四种状态的切换过程代码以下:

while (running) {
       switch (getPeerState()) {
       case LOOKING:       
           try {
              reconfigFlagClear();
              if (shuttingDownLE) {
                    shuttingDownLE = false;
                    startLeaderElection();
              }
              setCurrentVote(makeLEStrategy().lookForLeader());
           } catch (Exception e) {
                  setPeerState(ServerState.LOOKING);
            }
            break;
       case OBSERVING:
            try {
                 setObserver(makeObserver(logFactory));
                 observer.observeLeader();
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e );
            } finally {
                 observer.shutdown();
                 setObserver(null);  
                 updateServerState();
            }
            break;
       case FOLLOWING:
            try {
                 setFollower(makeFollower(logFactory));
                 follower.followLeader();
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e);
            } finally {
                 follower.shutdown();
                 setFollower(null);
                 updateServerState();
            }
            break;
       case LEADING:
            try {
                 setLeader(makeLeader(logFactory));
                 leader.lead();
                 setLeader(null);
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                updateServerState();
            }
            break;
     }
 }

2、QuorumCnxManager

QuorumCnxManager是专门用于选举信息交换的Socket框架,Messenger接收的报文就是从QuorumCnxManager的接收队列中获取的。

QuorumCnxManager一般采用Netty框架负责底层Socket链接管理,这是一种非阻塞的Socket方式,提供Select方法在多个Socket之间轮询数据,以先到先得的方式处理来自不一样QuorumPeer的选举交换消息。

这套东西和客户端服务端的工做模式是同样的,区别只是这里是专门给选举过程使用的。

发送线程定义为SendWorker,接收线程定义为RecvWorker。对于每个QuorumPeer链接都会建立一个SendWorkder和一个RecvWorker。

发送队列是一个HashMap,定义以下:

ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

接收队列定义为:

ArrayBlockingQueue<Message> recvQueue;

Messenger发送消息时调用QuorumCnxManager的toSend方法,将发送消息添加到queueSendMap中,这里咱们看到只有发送给远方的消息才会真正放入queueSendMap,发给本身的消息直接放到接收队列recvQueue中。功能很清晰,具体代码这里就不看了。

3、Messenger

Messenger定义在FastLeaderElection类中,专用于QuorumPeer之间选举消息的传递,交换toSend/Notification消息体。

它由FastLeaderElection负责启动,负责选举过程当中QuorumPeer间传递选举消息。Messenger包括WorkerReceiver和WorkerSender两个子线程,一个负责接收QuorumPeer消息,另外一个负责发送QuorumPeer消息。

WorkerSender使用ToSend结构发送消息,而WorkerReceiver使用Notification结构接收消息。toSend和Notification消息体的属性基本是一致性的。

Messenger底层采用QuorumCnxManager做为Socket链接池,每一个QuorumPeer都会建立一个QuorumCnxManager,用于与集群其余全部的QuorumPeer的信息交互。

Notification类用于接收Vote信息的消息接收体,在选举过程当中集群交换的包都是Notification类型。

ToSend类和Nofitication相似,发送Vote信息的消息定义体。

Notification和ToSend只在Messenger中处理,除此以外的地方看不到这两种数据结构。

3.1 WorkerReceiver

WorkerReceiver是接收线程,专门接收参与选举的服务器返回的通知,进行相应的处理。

从QuorumCnxManager获取网络接收到的Message包,而后组装成Notification消息,放入recvqueue队列,等待后续处理。

若是消息来自观察者或者追随者,则将当前的领导者的zxid和sid当即回复给对方;

若是消息来自另外一个LOOKING状态的参与选举者,则将对方发来包里的zxid和sid等信息组装Nofitication消息,而后判断:

  1. 若是接收者是Looking状态,则将Nofitication放入FastLeaderElection的recvqueue等待处理(这种状况下,若是对方也是Looking状态,则判断哪方的Epoch和Zxid最大,大的为新的Leader候选,小的抛弃)。
  2. 若是接收者不是Looking状态而发送者是LOOKING,则将当前接收者认为的Leader发送给对方。

其余的包则忽略。

主要代码以下:

while (!stop) {
    response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
        if((ackstate == QuorumPeer.ServerState.LOOKING)
                             && (n.electionEpoch < logicalclock.get())){
             Vote v = getVote();
             QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                                        v.getId(),
                                        v.getZxid(),
                                        logicalclock.get(),
                                        self.getPeerState(),
                                        response.sid,
                                        v.getPeerEpoch(),
                                        qv.toString().getBytes());
              sendqueue.offer(notmsg);
          }
    } else {
        Vote current = self.getCurrentVote();
        if(ackstate == QuorumPeer.ServerState.LOOKING){
             QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend( ToSend.mType.notification,
                                         current.getId(),
                                         current.getZxid(),
                                         current.getElectionEpoch(),
                                         self.getPeerState(),
                                         response.sid,
                                         current.getPeerEpoch(),
                                         qv.toString().getBytes());
             sendqueue.offer(notmsg);
          }
     }
}

3.2 WorkerSender

WorkerSender是发送线程,专门负责发送ToSend消息给参与选举的其余机器。

WorkerSender的流程比较简单,就是将ToSend消息放入发送队列,发送线程循环发送。

主要代码以下:

public void run() {
      while (!stop) {
           try {
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
                process(m);
            } catch (InterruptedException e) {
                break;
            }
      }
      LOG.info("WorkerSender is down");
}
void process(ToSend m) {
      ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);
      manager.toSend(m.sid, requestBuffer);
}

manager就是指QuorumCnxManager。

相关文章
相关标签/搜索