当ZooKeeper集群启动以后,须要完成leader和follower之间的数据同步。数据库
首先leader和observer有一个共同的父类learner,里面定义了一些公共方法。集群正常运行后会有一个leader和多个follower(这里observer就不单独说了,和follower的行为是相似的)。缓存
一、 注册过程oop
follower在提供服务给客户端以前必须完成注册到leader的动做。this
注册分为如下3个主要步骤:设计
a) 调用connectToLeader方法链接到Leader。code
b) 调用registerWithLeader方法注册到Leader,交换各自的sid、zxid和Epoch等信息,Leader以此决定事务同步的方式。server
c) 调用SyncWithLeader跟Leader进行事务数据同步,处理SNAP/DIFF/TRUNC包。blog
这3个方法都定义在父类Learner类中。下面咱们以Follower做为例子说明注册到Leader的完整流程。事务
二、connectToLeaderrem
connectToLeader方法功能较简单,建立Socket链接到Leader。该方法定义在Follower的父类Learner中。它加了重试机制,具体的代码这里就不给出了。
最多能够尝试5次链接。链接成功后Leader会建立一个LearnerHandler专门处理与该Follower之间的QuorunPacket消息的传递。
三、registerWithLeader
Follower链接Leader成功以后,立刻调用registerWithLeader方法。
registerWithLeader方法首先发送FOLLOWERINFO包给Leader,告诉Leader本身的身份属性(Follower的zxid,sid)。而后等待Leader回复的LEADINFO包,获取Leader的Epoch和zxid值,并更新Follower的Epoch和zxid值,以Leader信息为准。
最后给Leader发ACKEPOCH包,告诉Leader此次Follower已经与Leader的zxid同步了。
这里acceptedEpoch就是Leader的Epoch。
整个resigerWithLeader流程以下图所示:
接下来Follower就要进入syncWithLeader方法来与Leader同步数据了。
四、SyncWithLeader
SyncWithLeader方法同步Leader的事务到Follower,该方法较长,这里分段介绍其整个过程。
首先读取同步数据包,主要代码以下:
QuorumPacket qp = new QuorumPacket(); readPacket(qp); if (qp.getType() == Leader.SNAP){ zk.getZKDatabase().deserializeSnapshot(leaderIs); }else if (qp.getType() == Leader. TRUNC) { boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid())); System.exit(13); } }else if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", ong.toHexString(qp.getZxid())); snapshotNeeded = false; }
同步方式分红3种:
当前面都执行完成后,还有一段代码处理后续消息(这里是QuorumPacket类型),好比:PROPOSAL、COMMIT、NEWLEADER等。例如PROPOSAL是指同步期间收到的leader发送的写请求信息,缓存在packetsNotCommitted里,等后续处理,这这部分代码能够先无论。
这部分的主要代码是这样的:
while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: pif = packetsNotCommitted.peekFirst(); if (!writeToTxnLog) { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } else { packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; case Leader.UPTODATE: if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: if (snapshotNeeded) { zk.takeSnapshot(); } self.setCurrentEpoch(newEpoch); writeToTxnLog = true; isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } }
而后Leader会发送NEWLEADER包,Follower收到NEWLEADER包后回复ACK给Leader。
最后Leader发UPTODATE包表示同步完成,Follower这时启动服务端并跳出本次循环,准备结束整个注册过程。
五、 Follower主流程
Follower是Learner的子类,Follower的启动方法就是followLeader。
followLeader的主要代码片断以下:
connectToLeader(addr); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); }
启动时首先与Leader同步数据,而后启动FollowerZooKeeperServer,在FollowerZooKeeperServer运行的同时,额外启动while循环等待Peer的QuorumPacket包,调用processPacket方法处理这些包。
processPacket处理QuorumPeer传送的QuorumPacket,最主要是处理两种QuorumPacket:PROPOSAL和COMMIT。固然还有PING、COMMITANDACTIVATE等包类型,为便于简化梳理代码设计思路,这里就再也不详述了。
该方法在收到Leader发送过来的QuorumPacket时被调用,主要是响应PROPOSAL和COMMIT两种类型的消息。
PROPOSAL是Leader将要执行的写事务命令;COMMIT是提交命令。Follower只有在收到COMMIT消息后才会让PROPOSAL命令的内容生效。
同一个写事务命令会在Leader和多个Follower上都执行一次,保证集群数据的一致性。
代码片断:
case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break;
Follower收到PROPOSAL消息后调用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息后调用FollowerZooKeeperServer的commit方法。
Leader发送给集群中全部follower的写请求包。
Leader执行写操做时须要告之集群中的Learner,让你们也执行写操做,保证集群数据的一致性。PROPOSAL是严格按照顺序执行的,这也是ZOOKEEPER的核心设计思想之一。
当Leader认为一个Proposal已被大多数Follower持久化并等待执行后会发送COMMIT包,通知各Follower能够提交执行该Proposal了,最后调用到FinalRequestProcessor执行写操做,经过这种机制保证写操做能被大半数集群机器执行。
六、 Observer主流程
Observer和Follower功能相似,主要的差异就是不参与选举。
Observer的入口方法是observerLeader。当QuorumPeer的状态是OBSERVING时会启动Observer并调用observerLeader方法。
observerLeader同Follower的followLeader方法相似,首先注册到Leader,事务同步后进入QuorumPacket包循环处理过程,调用processPacket方法处理QuorumPacket。
processPacket比Follower要简单许多,最主要是处理INFORM包来执行Leader的写请求命令。
这里处理的是INFORM消息,Leader群发写事务时,给Follower发的是PROPOSAL并要等待Follower确认;而给Observer发的则是INFORM消息而且不须要Obverver回复ACK消息来确认。