ZooKeeper系列之(七):领导者调用链

Leader经过启动LeaderZooKeeperServer来接收客户端请求。node

首先看下它的处理链的定义,从源码看出LeaderZooKeeperServer的处理链顺序以下:算法

 

  • LeaderRequestProcessor :Leader调用链开始
  • PrepRequestProcessor:写操做预准备,建立Request内容
  • ProposalRequestProcessor:将写请求封装成PROPOSAL包广播到全部的follower
  • CommitProcessor :判断写操做是否能够提交真正执行
  • toBeAppliedProcessor:将COMMIT后的Request移到,等待发送给FinalRequestProcessor。
  • FinalRequestProcessor:真正的执行Zookeeper命令的地方
  • SyncRequestProcessor:将写操做持久化到磁盘
  • AckRequestProcessor:当proposal持久化到磁盘,向本机发送ACK包

SyncRequestProcessor和AckRequestProcessor是在ProposalRequestProcessor内部被建立的,当Leader要处理PROPOSAL命令时先本身调用SyncRequestProcessor持久化,而后经过AckRequestProcessor直接告诉Leader处理ACK逻辑(不通过QuorumPacket传递)。数据库

一、 LeaderRequestProcessor

LeaderRequestProcessor是leader处理请求的第一个processor,所以他不须要转发到其余机器,只须要往下执行就好了。这里就是简单的调用nextProcessor的processRequest方法。session

主要处理代码以下:this

public void processRequest(Request request) throws RequestProcessorException {       
    Request upgradeRequest = null;
    try {
        upgradeRequest = lzks.checkUpgradeSession(request);
    } catch (KeeperException ke) {        
        request.setException(ke);
     }
    if (upgradeRequest != null) {
        nextProcessor.processRequest(upgradeRequest);
     }
     nextProcessor.processRequest(request);
}

二、 submitLearnerRequest

Leader收到Follower的REQUEST请求后调用submitLearnerRequest处理该写事务。spa

该方法就一条语句,调用PrepRequestProcessor的processRequest方法处理Request请求。方法定义以下:日志

public void submitLearnerRequest(Request request) {      
     prepRequestProcessor.processRequest(request);
}

三、 写请求

写请求是要广播到整个集群作数据一致性的,因此涉及到多台集群的交互。code

这里咱们以create事务为例说明。orm

首先客户端链接到Leader,而后Leader发送proposal消息给集群中全部Follower,proposal消息带有本次create事务的数据。server

Follower收到proposal后首先保存到磁盘防止proposal丢失,而后回复ACK给Leader。

Leader收集到足够多的ACK后再次发送COMMIT给全部Follower,同时Leader也在本地提交proposal执行,Follower收到COMMIT以后也在本地执行proposal事务。

Leader将执行结果返回给客户端。

经过以上过程集群中全部机器都会维护同一个完整的数据库,保证了数据一致性。

这里须要注意的一点是:写操做时Zk会调用Sync过程将写操做持久化到磁盘。

具体流程以下:

  • 1. 首先,客户端建立Socket链接到Leader,发送create请求给LeaderZooKeeperServer。
  • 2. LeaderZooKeeperServer中firstProcessor被触发,执行processRequest方法。
  • 3. 调用链执行到ProposalRequestProcessor,触发Leader的proposal方法被执行。
  • 4. proposal方法发送PROPOSAL消息给集群中全部的Follower。
  • 5. Follower的processPacket方法判断是PROPOSAL消息,则调用FollowerZooKeeperServer的logRequest方法记录WAL日志。
  • 6. Follwer接着的nextProcessor是SyncRequestProcessor,SendAckRequestProcessor类发送ACK消息给Leader。
  • 7. Leader端收到ACK消息,调用Leader的processAck方法处理ACK消息。
  • 8. 调用CommitProcessor处理,判断是否能够发送COMMIT给Follower,若是收全了Follower的ACK消息,则发送COMMIT消息给Follower;同时Leader执行到最后一个处理即FinalRequestProcessor,准备结束本次请求,Znode修改完成。
  • 9. Follower收到COMMIT消息,执行FollowerZooKeeperServer的commit方法,最终也执行到FinalRequestProcessor。

上述就是leader处理写Request请求的完整过程。

leader收到REQUEST包时,会调用submitLearnerRequest方法;收到ACK包时,会调用processAck方法,leader接收包的部分代码以下。

while (true) {
     qp = new QuorumPacket();
     ia.readRecord(qp, "packet");
     ByteBuffer bb;
     long sessionId;
     int cxid;
     int type;
     switch (qp.getType()) {
         case Leader.ACK:
            if (this.learnerType == LearnerType.OBSERVER) {                
            }
            syncLimitCheck.updateAck(qp.getZxid());
            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
            break; 
        case Leader.REQUEST:
            bb = ByteBuffer.wrap(qp.getData());
            sessionId = bb.getLong();
            cxid = bb.getInt();
            type = bb.getInt();
            bb = bb.slice();
            Request si;
            if(type == OpCode.sync){
                si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
            } else {
                si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
            }
          si.setOwner(this);
            leader.zk.submitLearnerRequest(si);
            break;
       default:
            LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
            break;
     }
 }

proposal

当LeaderZooKeeperServer收到客户端写事务请求时,会触发Leader的proposal方法执行,发送PROPOSAL消息给Follower。同时维护一个outstandingProposals字典表保存PROPOSA消息。

proposal方法的主要代码以下:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
     request.getHdr().serialize(boa, "hdr");
     if (request.getTxn() != null) {
         request.getTxn().serialize(boa, "txn");
     }
     baos.close();
} catch (IOException e) {
     LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
               baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;                
synchronized(this) {
     p.addQuorumVerifier(self.getQuorumVerifier());
    if (request.getHdr().getType() == OpCode.reconfig){
         self.setLastSeenQuorumVerifier(request.qv, true);                       
    }
    lastProposed = p.packet.getZxid();
    outstandingProposals.put(lastProposed, p);
    sendPacket(pp);
}

inform

和proposal相似,只不过是将写事务发给Observer,而且不须要Observer回复ACK。

public void inform(Proposal proposal) {
    QuorumPacket qp = new QuorumPacket(Leader.INFORM, 
proposal.request.zxid, proposal.packet.getData(), null);
    sendObserverPacket(qp);
}

processAck

Follower持久化PROPOSAL写事务请求到磁盘后会回复ACK消息给Leader,Leader经过LearnerHandler接收该包,并触发Leader的processAck方法。

Leader发送到Follower的PROPOSAL消息,需等待全部Follower回复Ack消息后判断是否知足COMMIT条件,若是知足COMMIT条件则发送COMMIT消息给全部Follower,Leader再往下执行。

synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
   if (!allowedToCommit) return;        
   if ((zxid & 0xffffffffL) == 0) {           
       return;
   }   
   if (lastCommitted >= zxid) {
      return;
   }
   Proposal p = outstandingProposals.get(zxid);  
   p.addAck(sid);   
   boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
   if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
     long curZxid = zxid;
     while (allowedToCommit && hasCommitted && p!=null){
         curZxid++;
         p = outstandingProposals.get(curZxid);
         if (p !=null) 
             hasCommitted = tryToCommit(p, curZxid, null);             
     }
   }
}

ProcessAck会统计多少Follower回复了Leader.ACK类型的QuorumPacket包,而后就走到了tryToCommit方法,tryToCommit判断是否知足COMMIT条件。

下面看看tryToCommit方法执行了哪些操做。

tryToCommit

确保写操做是按顺序被确认的。

尝试对Proposal按顺序进行Commit。Commit过的事务才是真正有效的事务。

事务确认必须按顺序进行,outstandingProposals中记录了全部等待确认的事务,只要前一条事务还未确认,则以后的事务都禁止确认,以确保事务的按顺序进行。

确认过的事务放入toBeApplied队列中等待下一步处理。

if (outstandingProposals.containsKey(zxid - 1)) 
     return false;
if (!p.hasAllQuorums()) {
     return false;                 
}
outstandingProposals.remove(zxid);
if (p.request != null) {
     toBeApplied.add(p);
}
if (p.request == null) {
     LOG.warn("Going to commmit null: " + p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {                                   
     Long designatedLeader = getDesignatedLeader(p, zxid);
     QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
     self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
     if (designatedLeader != self.getId()) {
          allowedToCommit = false;
     }
     commitAndActivate(zxid, designatedLeader);
     informAndActivate(p, designatedLeader);
} else {
     commit(zxid);
     inform(p);
}
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
     for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
          sendSync(r);
     }               
}

commit

通知Follower提交proposal执行。

一般只有当Leader从Follower收到超过半数(默认算法,固然你也能够实现本身的判断逻辑,好比改为超过2/3人数)的ACK才会发起commit,发送Leader.COMMIT包给Follower。

public void commit(long zxid) {
     synchronized(this){
         lastCommitted = zxid;
     }
     QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
     sendPacket(qp);
}

四、 读请求

读操做不涉及数据和状态的变动,所以不须要维护集群数据的一致性,流程相对于写操做要简单些。

具体读操做流程:

1. Request会发送到Leader的firstProcessor处理,这里是LeaderRequestProcessor

2. LeaderRequestProcessor对读操做会进入到PrepRequestProcessor

3. PrepRequestProcessor对读操做不执行事务相关处理,而后处理链一样进入ProposalRequestProcessor

4. ProposalRequestProcessor调用Leader的propose方法,propose方法会直接将读操做请求送达nextProcessor。

5. nextProcessor到达CommitProcessor。

6. CommitProcessor一样会直接将读操做请求送达toBeAppliedProcessor。

7. toBeAppliedProcessor将读操做请求直接发送到下一步FinalRequestProcessor。

8. FinalRequestProcessor在本机执行最终的create命令,调用ZkDataBase的processTxn方法,通过DataTree完成最终的节点建立。

相关文章
相关标签/搜索