ZooKeeper系列之(六):请求调用链概述

ZooKeeper服务端在收到客户端请求以后,根据leader或follower角色不一样,执行一系列不一样的操做。ZooKeeper设计者将这些操做抽象成了接口叫processRequest,并提供了该接口的多个实现类。leader或follower将这些实现类串联在一块儿构成一个执行序列,leader或follower的执行序列不尽相同。经过这种接口抽象的方式提升了代码的复用性。session

服务端处理的客户端消息叫Request,Request消息的处理分散在LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer中。app

ZooKeeperServer收到客户端的操做请求后生成Request,而后是经过调用链的方式来处理Request。调用链定义了处理单元的链式执行。每一个处理单元继承了Processor接口,而且可设置nextProcessor。异步

服务端调用链都是从firstProcessor开始往下顺序执行,每一个调用模块执行完成以后沿着nextProcessor的方向依次执行下一个调用处理单元。函数

不一样类型的ZooKeeperServer的调用链是不一样的。主要有三种不一样的调用链方式:oop

A. LeaderZooKeeperServer的调用链,它的firstProcessor是LeaderRequestProcessor。this

B. FollowerZooKeeperServer的调用链,它的firstProcessor是FollowerRequestProcessor。线程

C. Observer ZooKeeperServer的调用链,它的firstProcessor是ObserverRequestProcessor。设计

ZooKeeperServer中定义的调用单元有如下这些类,它们都实现了Processor接口的processRequest方法。日志

调用单元列表:code

Ø LeaderRequesrProcessor:Leader的第一个processor

Ø FollowerRequesrProcessor: Follower的第一个processor

Ø ObserverRequesrProcessor:Observer的第一个processor

Ø ProposalProcessor:命令提议,群发Proposal到Follower

Ø CommitProcessor:事务提交,

Ø SyncRequestProcessor:持久化Proposal到磁盘

Ø AckRequestProcessor:Leader给本身ACK消息

Ø SendAckRequestProcessor:Learner回复ACK给Leader

Ø toBeAppliedProcessor:等待执行

Ø PrepRequestProcessor:Leader使用,定义在LeaderRequestProcessor和ProposalRequestProcessor之间。将写操做记录到outstandingChanges,在FinalRequestProcessor中确保按顺序执行Request,防止后续读操做在前一个写操做还未完成的状况下就被调用。

Ø FinalRequestProcessor:最后的processor,调用ZkDatabase方法执行最终读写操做

下面咱们来具体看看各个Processor的实现逻辑。

一、ProposalProcessor

用于Leader,处理写操做命令,生成PROPOSAL包通知Follower同步写操做。

两段主要代码:

1) ProposalRequestProcessor()

AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);

建立SyncProcessor,当生成PROPOSAL包时,首先经过SyncProcessor持久化到磁盘,而后经过AckRequestProcessor回复确认(ACK包)。

2)processRequest

nextProcessor.processRequest(request);
if (request.getHdr() != null) {
      try {
          zks.getLeader().propose(request);
      } catch (XidRolloverException e) {
          throw new RequestProcessorException(e.getMessage(), e);
      }
      syncProcessor.processRequest(request);
}

这里的nextProcessor在Leader中的定义是CommitProcessor,等待COMMIT。

而后调用Leader的proposal方法来处理Request。

最后调用SyncProcessor将PROPOSAL持久化,并给本身发ACK。

二、 CommitProcessor

CommitProcessor是一个很是重要的Processor,一句话归纳其做用就是:判断请求处理是否能够往下执行?

对于读请求,通常CommitProcessor直接将请求包往下执行就能够了;而对于写请求,CommitProcessor则须要等待leader发送COMMIT请求才能往下执行,那么leader什么状况下会发送COMMIT包呢?

leader在超过半数的follower已经成功写WAL日志,收到这些超过半数的follower的ACK包时,才会给这些follower发送COMMIT消息。

CommitProcessor是异步工做模式,所以设计了Queue来保存和处理Request。

对Leader发送过来的Proposal进行确认,而且确保Leader发送过来的多条写事务请求按顺序依次执行,只有前一条写事务执行完毕才继续执行下一条写事务。

CommitProcessor是处理须要确认的写事务请求,读操做不须要确认会直接忽略掉。它的nextProcessor是FinalRequestProcessor,执行具体的写事务。

定义了3个队列:

1) LinkedBlockingQueue<Request> queuedRequests ;

请求队列,全部请求经过processPacket方法首先放入该队列,等待后续处理。

2) LinkedBlockingQueue<Request> committedRequests ;

能够commit的请求包,这里的数据是能够往下走的。

3) HashMap<Long, LinkedList<Request>> pendingRequests ;

当FollowerZooKeeperServer收到Leader发来的COMMIT包时,会触发commit方法被执行,此时请求包会放入commitedRequests队列等待下一步的处理。

是否须要commit

needCommit方法判断Request是否须要Commit。只有写操做才会须要Commit过程,不须要Commit的读操做直接进入nextProcessor处理。

protected boolean needCommit(Request request) {
     switch (request.type) {
         case OpCode.create:
         case OpCode.create2:
         case OpCode.createTTL:
         case OpCode.createContainer:
         case OpCode.delete:
         case OpCode.deleteContainer:
         case OpCode.setData:
         case OpCode.reconfig:
         case OpCode.multi:
         case OpCode.setACL:
             return true;
         case OpCode.sync:
             return matchSyncs;    
         case OpCode.createSession:
         case OpCode.closeSession:
             return !request.isLocalSession();
         default:
             return false;
     }
 }

请求包处理

有Request到达时,先调用processRequest方法将请求放入queuedRequest队列。

CommitProcessor有个run方法。run方法是个循环,不断处理这三个队列。对于queuedRequest队列,则将请求包放入peddingRequest队列。

若是commit方法被触发,则会将入参的请求包放入committedRequests队列,等待下一步FinalRequestProcessor的处理。

run的主要逻辑代码归纳以下:

public void run() {
        try {         
            int requestsToProcess = 0;
            boolean commitIsWaiting = false;
	do {              
                commitIsWaiting = !committedRequests.isEmpty();
                requestsToProcess =  queuedRequests.size();
                // Avoid sync if we have something to do
                if (requestsToProcess == 0 && !commitIsWaiting){
                    // Waiting for requests to process
                    synchronized (this) {
                        while (!stopped && requestsToProcess == 0
                                && !commitIsWaiting) {
                            wait();
                            commitIsWaiting = !committedRequests.isEmpty();
                            requestsToProcess = queuedRequests.size();
                        }
                    }
                }
               
                Request request = null;
                while (!stopped && requestsToProcess > 0
                        && (request = queuedRequests.poll()) != null) {
                    requestsToProcess--;
                    if (needCommit(request)
                            || pendingRequests.containsKey(request.sessionId)) {
                        // Add request to pending
                        LinkedList<Request> requests =pendingRequests.get(request.sessionId);
                        if (requests == null) {
                            requests = new LinkedList<Request>();
                            pendingRequests.put(request.sessionId, requests);
                        }
                        requests.addLast(request);
                    }
                    else {
                        sendToNextProcessor(request);
                    }                   
                    if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){                        
                        commitIsWaiting = true;
                        break;
                    }
                }
                // Handle a single committed request
                if (commitIsWaiting && !stopped){
                    waitForEmptyPool();
                    if (stopped){
                        return;
                    }
                    // Process committed head
                    if ((request = committedRequests.poll()) == null) {
                        throw new IOException("Error: committed head is null");
                    }                 
                    LinkedList<Request> sessionQueue = pendingRequests
                            .get(request.sessionId);
                    if (sessionQueue != null) {
                        // If session queue != null, then it is also not empty.
                        Request topPending = sessionQueue.poll(); 
                        topPending.setHdr(request.getHdr());
                        topPending.setTxn(request.getTxn());
                        topPending.zxid = request.zxid;
                        request = topPending;
                    }

                    sendToNextProcessor(request);
                    waitForEmptyPool();
                    if (sessionQueue != null) {
                        while (!stopped && !sessionQueue.isEmpty()
                                && !needCommit(sessionQueue.peek())) {
                            sendToNextProcessor(sessionQueue.poll());
                        }
                        // Remove empty queues
                        if (sessionQueue.isEmpty()) {
                            pendingRequests.remove(request.sessionId);
                        }
                    }
                }
            } while (!stoppedMainLoop);
        } catch (Throwable e) {
            handleException(this.getName(), e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

1) 首先等待Request的到来

2) 而后判断是否须要COMMIT,若是须要则将Request放入pendingRequests等待被commit;不然直接送给FinalRequestProcessor处理。

3) 若是pendingRequests和committedRequests都不为空,则跳出小循环准备执行committedRequests中的请求。

4) 根据committedRequests中的Request找到该Requests对应的Session,并从该Session的pendingRequests中找出第一个Request执行,这样保证Request是按照Session的顺序执行的,而不会出现后续Request反而先被执行的场景。

5) 最后再按Session中的顺序执行读操做,防止后续读操做在写操做以前被执行,从而破坏了事务的顺序性。

三、 SyncRequestProcessor

将写事务请求持久化到磁盘,防止事务丢失。它的nextProcessor分红3种场景设置:

(1)Leader

这时候nextProcessor设置为AckRequestProcessor,AckRequestProcessor直接调用Leader的processAck方法。

(2)Follower

这时候nextProcessor设置为SendAckRequestProcessor,SendAckRequestProcessor向Leader发送Ack确认包。

(3)Observer

这时候nextProcessor设置为null,表示不用回复ACK给Leader。

SyncRequestProcessor类的processRequest方法定义以下:

public void processRequest(Request request) {
     queuedRequests.add(request);
}

它的逻辑是将Request放入queueRequests队列,等待run主线程处理:

run线程主要逻辑:

while (true) {
   Request si = null;
   if (toFlush.isEmpty()) {
        si = queuedRequests.take();
   } else {
       si = queuedRequests.poll();
       if (si == null) {
             flush(toFlush);
             continue;
       }
   }
   if (si == requestOfDeath) {
        break;
   }
   if (si != null) {
      if (zks.getZKDatabase().append(si)) {
          logCount++;
          if (logCount > (snapCount / 2 + randRoll)) {
             randRoll = r.nextInt(snapCount/2);
             // roll the log
             zks.getZKDatabase().rollLog();
             // take a snapshot
             if (snapInProcess != null && snapInProcess.isAlive()) {
                LOG.warn("Too busy to snap, skipping");
             } else {
                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                   public void run() {
                      try {
                           zks.takeSnapshot();
                      } catch(Exception e) {
                           LOG.warn("Unexpected exception", e);
                      }
                   }
            };
            snapInProcess.start();
         }
         logCount = 0;
      }
  } else if (toFlush.isEmpty()) {
        if (nextProcessor != null) {
            nextProcessor.processRequest(si);
            if (nextProcessor instanceof Flushable) {
                 ((Flushable)nextProcessor).flush();
            }
        }
        continue;
  }
  toFlush.add(si);
  if (toFlush.size() > 1000) {
       flush(toFlush);
  }
}

将Request写到事务日志文件中,根据条件决定是否重启新的日志滚动文件;日志文件写入完成后调用nextProcessor处理Request。

四、 AckRequestProcessor

专门给Leader使用的ACK处理单元,当Follower收到Proposal时都是经过SendAckRequestProcessor发送ACK包给Leader,而Leader在发出Proposal命令时,会调用Sync过程将Proposal写到Leader的磁盘,而后Sync过程调用AckRequestProcessor直接通知Leader收到Leader本身的ACK包,直接调用Leader的processAck方法。

public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if(self != null)
            leader.processAck(self.getId(), request.zxid, null);
        else
            LOG.error("Null QuorumPeer");
}

五、SendAckRequestProcessor

SyncRequestProcessor的nextProcessor是SendAckRequestProcessor。

当Follower持久化写事务请求到磁盘后,会当即调用SendAckRequestProcessor的processRequest方法给Leader发送ACK消息,等待Leader的COMMIT消息。

SendAckRequestProcessor的代码示例:

public void processRequest(Request si) {
   if(si.type != OpCode.sync){
       QuorumPacket qp = new QuorumPacket(Leader.ACK, 
                       si.getHdr().getZxid(), null,   null);
       try {
           learner.writePacket(qp, false);
       } catch (IOException e) {
       }
   }
}

六、 toBeAppliedProcessor

最简单的一个处理单元,只是维护一个toBeApplied列表,而后每次将列表中一条记录发送到FinalRequestProcess or处理,同时删除该记录。

七、 PrepRequestProcessor

主要目的是将写操做请求记录在ZooKeeperServer的outstandingChanges队列中, outstandingChanges主要用于FinalRequestProcessor的processRequest方法中,用来确保FinalRequestProcessor是按照outstandingChanges的顺序来执行Request。

八、FinalRequestProcessor

最终的一个调用链处理单元。

该类用于执行最终的命令,经过ZkDatabase相关接口完成操做。

不论是Leader仍是Follower,最后的处理单元必定是FinalRequestProcessor。

主函数是processRequest,真正处理客户端请求的代码。

主要代码可分红两部分。

第一部分是写操做,代码逻辑片断以下:

synchronized (zks.outstandingChanges) {
      rc = zks.processTxn(request);
}

该方法主要是调用ZooKeeperServer的processTxn方法来处理。具体逻辑可参考ZooKeeperServer一节。

第二部分是读操做,直接经过ZKDatabase等接口实现,好比getData和getChildren两个读操做的代码逻辑以下:

case OpCode.getData: {
     lastOp = "GETD";
     GetDataRequest getDataRequest = new GetDataRequest();
     ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);
     DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());    
     Stat stat = new Stat();
     byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                   getDataRequest.getWatch() ? cnxn : null);
     rsp = new GetDataResponse(b, stat);
     break;
}
case OpCode.getChildren: {
     lastOp = "GETC";
     GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
     ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
     DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());     
     List<String> children = zks.getZKDatabase().getChildren(
                        getChildrenRequest.getPath(), null, getChildrenRequest
                                .getWatch() ? cnxn : null);
     rsp = new GetChildrenResponse(children);
     break;
}
相关文章
相关标签/搜索