投票选举能够说是ZooKeeper中的难点所在,也是精华所在了。数据库
ZooKeeper服务端集群在正常工做时须要一个Leader和无数个Follower,他们都对外提供统一的服务接口,客户端链接任意一台服务端发送命令其结果都是相同的,而且结果都是严格按照客户端提交读写命令的顺序来执行。服务器
其中Leader就负责维持整个集群数据的一致性和写操做的顺序执行。网络
固然集群中老是可能发生故障,当某个Follower发生故障时,该Follower会从新启动选举流程并从新链接到Leader,成功链接后会同步Leader的数据,这样就算以前有写操做失败了,经过同步过程最后会恢复完整的数据库。Leader/Follower的数据同步过程会在后面详细论述。数据结构
集群经过投票选举过程确认同一时刻集群最多只会存在一个Leader。当有新节点加入集群时,新节点会启动FastLeaderElection过程来从新选举,寻找新的Leader。框架
1、QuorumPeerui
每台服务端主机都会启动一个QuorumPeer进程,它负责投票选举,从QuorumPeerMain类中启动QuorumPeer。spa
QuorumPeer共有4种工做模式,不一样的模式启动不一样的代码:线程
QuorumPeer刚启动时首先设置为LOOKING工做模式,它会启动投票选举过程,调用FastLeadElection类,启动Messenger接收选举信息。code
它的几种工做状态的切换流程是这样的:server
QuorumPeer知道集群各个机器的IP地址,当经过FastLeaderElection完成选举过程后会肯定本身是Leader仍是Follower,若是是Leader则调用lead方法进入领导者工做流程;若是是Follower则调用followLeader方法进入追随者工做流程。
QuorumPeer维持在4种状态之间不停切换。在LOOKING状态时有一个细节要关注一下,若是shuttingDownLE为true,则表示要关闭FastLeaderElection线程从新启动,这时候会经过Messenger来关闭消息接收,再也不接收集群的选举信息。
startLeaderElection方法是在LOOKING状态启动选举过程的。四种状态的切换过程代码以下:
while (running) { switch (getPeerState()) { case LOOKING: try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { setPeerState(ServerState.LOOKING); } break; case OBSERVING: try { setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } }
2、QuorumCnxManager
QuorumCnxManager是专门用于选举信息交换的Socket框架,Messenger接收的报文就是从QuorumCnxManager的接收队列中获取的。
QuorumCnxManager一般采用Netty框架负责底层Socket链接管理,这是一种非阻塞的Socket方式,提供Select方法在多个Socket之间轮询数据,以先到先得的方式处理来自不一样QuorumPeer的选举交换消息。
这套东西和客户端服务端的工做模式是同样的,区别只是这里是专门给选举过程使用的。
发送线程定义为SendWorker,接收线程定义为RecvWorker。对于每个QuorumPeer链接都会建立一个SendWorkder和一个RecvWorker。
发送队列是一个HashMap,定义以下:
ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
接收队列定义为:
ArrayBlockingQueue<Message> recvQueue;
Messenger发送消息时调用QuorumCnxManager的toSend方法,将发送消息添加到queueSendMap中,这里咱们看到只有发送给远方的消息才会真正放入queueSendMap,发给本身的消息直接放到接收队列recvQueue中。功能很清晰,具体代码这里就不看了。
3、Messenger
Messenger定义在FastLeaderElection类中,专用于QuorumPeer之间选举消息的传递,交换toSend/Notification消息体。
它由FastLeaderElection负责启动,负责选举过程当中QuorumPeer间传递选举消息。Messenger包括WorkerReceiver和WorkerSender两个子线程,一个负责接收QuorumPeer消息,另外一个负责发送QuorumPeer消息。
WorkerSender使用ToSend结构发送消息,而WorkerReceiver使用Notification结构接收消息。toSend和Notification消息体的属性基本是一致性的。
Messenger底层采用QuorumCnxManager做为Socket链接池,每一个QuorumPeer都会建立一个QuorumCnxManager,用于与集群其余全部的QuorumPeer的信息交互。
Notification类用于接收Vote信息的消息接收体,在选举过程当中集群交换的包都是Notification类型。
ToSend类和Nofitication相似,发送Vote信息的消息定义体。
Notification和ToSend只在Messenger中处理,除此以外的地方看不到这两种数据结构。
3.1 WorkerReceiver
WorkerReceiver是接收线程,专门接收参与选举的服务器返回的通知,进行相应的处理。
从QuorumCnxManager获取网络接收到的Message包,而后组装成Notification消息,放入recvqueue队列,等待后续处理。
若是消息来自观察者或者追随者,则将当前的领导者的zxid和sid当即回复给对方;
若是消息来自另外一个LOOKING状态的参与选举者,则将对方发来包里的zxid和sid等信息组装Nofitication消息,而后判断:
其余的包则忽略。
主要代码以下:
while (!stop) { response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){ Vote v = getVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } else { Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } }
3.2 WorkerSender
WorkerSender是发送线程,专门负责发送ToSend消息给参与选举的其余机器。
WorkerSender的流程比较简单,就是将ToSend消息放入发送队列,发送线程循环发送。
主要代码以下:
public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); }
manager就是指QuorumCnxManager。