ZooKeeper系列之(五):领导者工做模式

领导者就是Leader,是整个集群的写事务流程负责人。数据库

一轮选举结束时产生新的Leader,而且Epoch加1。同时新的Leader先将本身的zxid设置为Epoch左移32位(Epoch<32),将是集群中最大的zxid。session

Leader监听Socket等待Follower的链接请求,每次新的Follower链接的时候都会启动一个LearnerHandler线程专门处理与该Follower的交互。LearnerHandler循环接收Follower的消息包,并交给Leader进行处理。并发

leader启动流程:函数

1. Leader选举完成以后,Peer确认了本身是Leader的身份,在QuromPeer的主线程中执行Leader的逻辑this

2. 建立Leader对象,并建立Server绑定在QuorumAddress上,用于和其余Follower之间相互通讯线程

3. 调用Leader::lead函数,执行Leader的真正的逻辑code

a) 调用ZooKeeperServer::loadData,从磁盘中恢复数据和session列表对象

b) 启用新的epoch,zookeeper中的zxid是64位,用于惟一标示一个操做,zxid的高32位是epoch,每次Leader切换加1,低32位为序列号,每次操做加1事务

c) 启动绑定在QuorumAddress上的Server,为每一个Follower的链接创建一个LearnerHandler,用于和Follower作交互,这里的逻辑另外单独论述get

d) 向全部的Follower发送一个NEWLEADER包,宣告本身额Leader身份,并在initLimit时间内等待大多数的Follower完成和Leader的同步,并发送ACK包,表示Follower已经和Leader完成同步并能够对外提供服务

e) 这时Leader和Client之间的交互在cnxnFactory的Server中,Leader和Follower之间的交互在LearnerHandler所属的线程中

f) 而后调用Leader::lead函数的QuromPeer线程在每一个tickTime中都会发送ping消息给其余的follower,follower在接收到ping消息后会回复一个ping消息,并附带上follower的session tracker里的全部session信息,leader收到follower的ping消息后,根据传回的session信息更新本身的session信息 。

Leader在接收到Follower的注册请求以后(Follower调用connectToLeader方法),等待收到FOLLOWERINFO包:

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() == Leader.OBSERVERINFO) {
     learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
peerLastZxid = ss.getLastZxid();

1) lastAcceptedEpoch:是Follower的Epoch值。

2) Zxid:是Follower的zxid

3) newEpoch:Leader根据FOLLOWERINF的值计算出新的Epoch

4) newLeaderZxid:根据新的Epoch计算新的Leader的zxid

而后给Follower发送LEADERINFO包,将新的zxid和Epoch告诉Follower,好让Follower知道应该要同步哪些数据。

Leader而后发送快照包给Follower,Follower根据快照包将本地数据库恢复到与Leader相同。

若是Follower的事务比Leader少一些(在minCommittedLog 和maxCommittedLog之间),则不需发SNAP包,而是发DIFF包,同时将需补充的事务经过PROPOSAL和COMMIT发给Follower执行。相关逻辑在syncFollower,queueCommittedProposals,startSendingPackets等方法中实现。这部分主要代码以下所示:

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, 
leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
if (needSnap) {             
    try {
       long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
       oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
       bufferedOutput.flush();
       leader.zk.getZKDatabase().serializeSnapshot(oa);
       oa.writeString("BenWasHere", "signature");
       bufferedOutput.flush();
     } finally {
       snapshot.close();
     }
} 
startSendingPackets();

startSendingPackets将须要同步的事务发送给Follower,事务同步完成后,Leader发送NEWLEADER包给Follower。

而后等Follower回复第一个ACK包。收到ACK以后调用Leader的waitForNewLeaderAck方法告诉Leader该Follower已经完成同步。

当Leader收到足够多的waitForNewLeaderAck方法调用后(一般超过半数),知道大部分Follower已经注册到本Leader上来了,这时候Leader才能确保正式发挥Leader的做用了。

相关文章
相关标签/搜索