领导者就是Leader,是整个集群的写事务流程负责人。数据库
一轮选举结束时产生新的Leader,而且Epoch加1。同时新的Leader先将本身的zxid设置为Epoch左移32位(Epoch<32),将是集群中最大的zxid。session
Leader监听Socket等待Follower的链接请求,每次新的Follower链接的时候都会启动一个LearnerHandler线程专门处理与该Follower的交互。LearnerHandler循环接收Follower的消息包,并交给Leader进行处理。并发
leader启动流程:函数
1. Leader选举完成以后,Peer确认了本身是Leader的身份,在QuromPeer的主线程中执行Leader的逻辑this
2. 建立Leader对象,并建立Server绑定在QuorumAddress上,用于和其余Follower之间相互通讯线程
3. 调用Leader::lead函数,执行Leader的真正的逻辑code
a) 调用ZooKeeperServer::loadData,从磁盘中恢复数据和session列表对象
b) 启用新的epoch,zookeeper中的zxid是64位,用于惟一标示一个操做,zxid的高32位是epoch,每次Leader切换加1,低32位为序列号,每次操做加1事务
c) 启动绑定在QuorumAddress上的Server,为每一个Follower的链接创建一个LearnerHandler,用于和Follower作交互,这里的逻辑另外单独论述get
d) 向全部的Follower发送一个NEWLEADER包,宣告本身额Leader身份,并在initLimit时间内等待大多数的Follower完成和Leader的同步,并发送ACK包,表示Follower已经和Leader完成同步并能够对外提供服务
e) 这时Leader和Client之间的交互在cnxnFactory的Server中,Leader和Follower之间的交互在LearnerHandler所属的线程中
f) 而后调用Leader::lead函数的QuromPeer线程在每一个tickTime中都会发送ping消息给其余的follower,follower在接收到ping消息后会回复一个ping消息,并附带上follower的session tracker里的全部session信息,leader收到follower的ping消息后,根据传回的session信息更新本身的session信息 。
Leader在接收到Follower的注册请求以后(Follower调用connectToLeader方法),等待收到FOLLOWERINFO包:
QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); peerLastZxid = ss.getLastZxid();
1) lastAcceptedEpoch:是Follower的Epoch值。
2) Zxid:是Follower的zxid
3) newEpoch:Leader根据FOLLOWERINF的值计算出新的Epoch
4) newLeaderZxid:根据新的Epoch计算新的Leader的zxid
而后给Follower发送LEADERINFO包,将新的zxid和Epoch告诉Follower,好让Follower知道应该要同步哪些数据。
Leader而后发送快照包给Follower,Follower根据快照包将本地数据库恢复到与Leader相同。
若是Follower的事务比Leader少一些(在minCommittedLog 和maxCommittedLog之间),则不需发SNAP包,而是发DIFF包,同时将需补充的事务经过PROPOSAL和COMMIT发给Follower执行。相关逻辑在syncFollower,queueCommittedProposals,startSendingPackets等方法中实现。这部分主要代码以下所示:
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null); queuedPackets.add(newLeaderQP); if (needSnap) { try { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { snapshot.close(); } } startSendingPackets();
startSendingPackets将须要同步的事务发送给Follower,事务同步完成后,Leader发送NEWLEADER包给Follower。
而后等Follower回复第一个ACK包。收到ACK以后调用Leader的waitForNewLeaderAck方法告诉Leader该Follower已经完成同步。
当Leader收到足够多的waitForNewLeaderAck方法调用后(一般超过半数),知道大部分Follower已经注册到本Leader上来了,这时候Leader才能确保正式发挥Leader的做用了。