Leader经过启动LeaderZooKeeperServer来接收客户端请求。node
首先看下它的处理链的定义,从源码看出LeaderZooKeeperServer的处理链顺序以下:算法
SyncRequestProcessor和AckRequestProcessor是在ProposalRequestProcessor内部被建立的,当Leader要处理PROPOSAL命令时先本身调用SyncRequestProcessor持久化,而后经过AckRequestProcessor直接告诉Leader处理ACK逻辑(不通过QuorumPacket传递)。数据库
LeaderRequestProcessor是leader处理请求的第一个processor,所以他不须要转发到其余机器,只须要往下执行就好了。这里就是简单的调用nextProcessor的processRequest方法。session
主要处理代码以下:this
public void processRequest(Request request) throws RequestProcessorException { Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { request.setException(ke); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
Leader收到Follower的REQUEST请求后调用submitLearnerRequest处理该写事务。spa
该方法就一条语句,调用PrepRequestProcessor的processRequest方法处理Request请求。方法定义以下:日志
public void submitLearnerRequest(Request request) { prepRequestProcessor.processRequest(request); }
写请求是要广播到整个集群作数据一致性的,因此涉及到多台集群的交互。code
这里咱们以create事务为例说明。orm
首先客户端链接到Leader,而后Leader发送proposal消息给集群中全部Follower,proposal消息带有本次create事务的数据。server
Follower收到proposal后首先保存到磁盘防止proposal丢失,而后回复ACK给Leader。
Leader收集到足够多的ACK后再次发送COMMIT给全部Follower,同时Leader也在本地提交proposal执行,Follower收到COMMIT以后也在本地执行proposal事务。
Leader将执行结果返回给客户端。
经过以上过程集群中全部机器都会维护同一个完整的数据库,保证了数据一致性。
这里须要注意的一点是:写操做时Zk会调用Sync过程将写操做持久化到磁盘。
具体流程以下:
上述就是leader处理写Request请求的完整过程。
leader收到REQUEST包时,会调用submitLearnerRequest方法;收到ACK包时,会调用processAck方法,leader接收包的部分代码以下。
while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { } syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitLearnerRequest(si); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); break; } }
proposal
当LeaderZooKeeperServer收到客户端写事务请求时,会触发Leader的proposal方法执行,发送PROPOSAL消息给Follower。同时维护一个outstandingProposals字典表保存PROPOSA消息。
proposal方法的主要代码以下:
ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.getHdr().serialize(boa, "hdr"); if (request.getTxn() != null) { request.getTxn().serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.warn("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized(this) { p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig){ self.setLastSeenQuorumVerifier(request.qv, true); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); sendPacket(pp); }
inform
和proposal相似,只不过是将写事务发给Observer,而且不须要Observer回复ACK。
public void inform(Proposal proposal) { QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); }
processAck
Follower持久化PROPOSAL写事务请求到磁盘后会回复ACK消息给Leader,Leader经过LearnerHandler接收该包,并触发Leader的processAck方法。
Leader发送到Follower的PROPOSAL消息,需等待全部Follower回复Ack消息后判断是否知足COMMIT条件,若是知足COMMIT条件则发送COMMIT消息给全部Follower,Leader再往下执行。
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (!allowedToCommit) return; if ((zxid & 0xffffffffL) == 0) { return; } if (lastCommitted >= zxid) { return; } Proposal p = outstandingProposals.get(zxid); p.addAck(sid); boolean hasCommitted = tryToCommit(p, zxid, followerAddr); if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){ long curZxid = zxid; while (allowedToCommit && hasCommitted && p!=null){ curZxid++; p = outstandingProposals.get(curZxid); if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); } } }
ProcessAck会统计多少Follower回复了Leader.ACK类型的QuorumPacket包,而后就走到了tryToCommit方法,tryToCommit判断是否知足COMMIT条件。
下面看看tryToCommit方法执行了哪些操做。
tryToCommit
确保写操做是按顺序被确认的。
尝试对Proposal按顺序进行Commit。Commit过的事务才是真正有效的事务。
事务确认必须按顺序进行,outstandingProposals中记录了全部等待确认的事务,只要前一条事务还未确认,则以后的事务都禁止确认,以确保事务的按顺序进行。
确认过的事务放入toBeApplied队列中等待下一步处理。
if (outstandingProposals.containsKey(zxid - 1)) return false; if (!p.hasAllQuorums()) { return false; } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null: " + p); } else if (p.request.getHdr().getType() == OpCode.reconfig) { Long designatedLeader = getDesignatedLeader(p, zxid); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } commitAndActivate(zxid, designatedLeader); informAndActivate(p, designatedLeader); } else { commit(zxid); inform(p); } zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } }
commit
通知Follower提交proposal执行。
一般只有当Leader从Follower收到超过半数(默认算法,固然你也能够实现本身的判断逻辑,好比改为超过2/3人数)的ACK才会发起commit,发送Leader.COMMIT包给Follower。
public void commit(long zxid) { synchronized(this){ lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); }
读操做不涉及数据和状态的变动,所以不须要维护集群数据的一致性,流程相对于写操做要简单些。
具体读操做流程:
1. Request会发送到Leader的firstProcessor处理,这里是LeaderRequestProcessor
2. LeaderRequestProcessor对读操做会进入到PrepRequestProcessor
3. PrepRequestProcessor对读操做不执行事务相关处理,而后处理链一样进入ProposalRequestProcessor
4. ProposalRequestProcessor调用Leader的propose方法,propose方法会直接将读操做请求送达nextProcessor。
5. nextProcessor到达CommitProcessor。
6. CommitProcessor一样会直接将读操做请求送达toBeAppliedProcessor。
7. toBeAppliedProcessor将读操做请求直接发送到下一步FinalRequestProcessor。
8. FinalRequestProcessor在本机执行最终的create命令,调用ZkDataBase的processTxn方法,通过DataTree完成最终的节点建立。