ZooKeeper服务端在收到客户端请求以后,根据leader或follower角色不一样,执行一系列不一样的操做。ZooKeeper设计者将这些操做抽象成了接口叫processRequest,并提供了该接口的多个实现类。leader或follower将这些实现类串联在一块儿构成一个执行序列,leader或follower的执行序列不尽相同。经过这种接口抽象的方式提升了代码的复用性。session
服务端处理的客户端消息叫Request,Request消息的处理分散在LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer中。app
ZooKeeperServer收到客户端的操做请求后生成Request,而后是经过调用链的方式来处理Request。调用链定义了处理单元的链式执行。每一个处理单元继承了Processor接口,而且可设置nextProcessor。异步
服务端调用链都是从firstProcessor开始往下顺序执行,每一个调用模块执行完成以后沿着nextProcessor的方向依次执行下一个调用处理单元。函数
不一样类型的ZooKeeperServer的调用链是不一样的。主要有三种不一样的调用链方式:oop
A. LeaderZooKeeperServer的调用链,它的firstProcessor是LeaderRequestProcessor。this
B. FollowerZooKeeperServer的调用链,它的firstProcessor是FollowerRequestProcessor。线程
C. Observer ZooKeeperServer的调用链,它的firstProcessor是ObserverRequestProcessor。设计
ZooKeeperServer中定义的调用单元有如下这些类,它们都实现了Processor接口的processRequest方法。日志
调用单元列表:code
Ø LeaderRequesrProcessor:Leader的第一个processor
Ø FollowerRequesrProcessor: Follower的第一个processor
Ø ObserverRequesrProcessor:Observer的第一个processor
Ø ProposalProcessor:命令提议,群发Proposal到Follower
Ø CommitProcessor:事务提交,
Ø SyncRequestProcessor:持久化Proposal到磁盘
Ø AckRequestProcessor:Leader给本身ACK消息
Ø SendAckRequestProcessor:Learner回复ACK给Leader
Ø toBeAppliedProcessor:等待执行
Ø PrepRequestProcessor:Leader使用,定义在LeaderRequestProcessor和ProposalRequestProcessor之间。将写操做记录到outstandingChanges,在FinalRequestProcessor中确保按顺序执行Request,防止后续读操做在前一个写操做还未完成的状况下就被调用。
Ø FinalRequestProcessor:最后的processor,调用ZkDatabase方法执行最终读写操做
下面咱们来具体看看各个Processor的实现逻辑。
一、ProposalProcessor
用于Leader,处理写操做命令,生成PROPOSAL包通知Follower同步写操做。
两段主要代码:
1) ProposalRequestProcessor()
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
建立SyncProcessor,当生成PROPOSAL包时,首先经过SyncProcessor持久化到磁盘,而后经过AckRequestProcessor回复确认(ACK包)。
2)processRequest
nextProcessor.processRequest(request); if (request.getHdr() != null) { try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); }
这里的nextProcessor在Leader中的定义是CommitProcessor,等待COMMIT。
而后调用Leader的proposal方法来处理Request。
最后调用SyncProcessor将PROPOSAL持久化,并给本身发ACK。
二、 CommitProcessor
CommitProcessor是一个很是重要的Processor,一句话归纳其做用就是:判断请求处理是否能够往下执行?
对于读请求,通常CommitProcessor直接将请求包往下执行就能够了;而对于写请求,CommitProcessor则须要等待leader发送COMMIT请求才能往下执行,那么leader什么状况下会发送COMMIT包呢?
leader在超过半数的follower已经成功写WAL日志,收到这些超过半数的follower的ACK包时,才会给这些follower发送COMMIT消息。
CommitProcessor是异步工做模式,所以设计了Queue来保存和处理Request。
对Leader发送过来的Proposal进行确认,而且确保Leader发送过来的多条写事务请求按顺序依次执行,只有前一条写事务执行完毕才继续执行下一条写事务。
CommitProcessor是处理须要确认的写事务请求,读操做不须要确认会直接忽略掉。它的nextProcessor是FinalRequestProcessor,执行具体的写事务。
定义了3个队列:
1) LinkedBlockingQueue<Request> queuedRequests ;
请求队列,全部请求经过processPacket方法首先放入该队列,等待后续处理。
2) LinkedBlockingQueue<Request> committedRequests ;
能够commit的请求包,这里的数据是能够往下走的。
3) HashMap<Long, LinkedList<Request>> pendingRequests ;
当FollowerZooKeeperServer收到Leader发来的COMMIT包时,会触发commit方法被执行,此时请求包会放入commitedRequests队列等待下一步的处理。
是否须要commit
needCommit方法判断Request是否须要Commit。只有写操做才会须要Commit过程,不须要Commit的读操做直接进入nextProcessor处理。
protected boolean needCommit(Request request) { switch (request.type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.multi: case OpCode.setACL: return true; case OpCode.sync: return matchSyncs; case OpCode.createSession: case OpCode.closeSession: return !request.isLocalSession(); default: return false; } }
请求包处理
有Request到达时,先调用processRequest方法将请求放入queuedRequest队列。
CommitProcessor有个run方法。run方法是个循环,不断处理这三个队列。对于queuedRequest队列,则将请求包放入peddingRequest队列。
若是commit方法被触发,则会将入参的请求包放入committedRequests队列,等待下一步FinalRequestProcessor的处理。
run的主要逻辑代码归纳以下:
public void run() { try { int requestsToProcess = 0; boolean commitIsWaiting = false; do { commitIsWaiting = !committedRequests.isEmpty(); requestsToProcess = queuedRequests.size(); // Avoid sync if we have something to do if (requestsToProcess == 0 && !commitIsWaiting){ // Waiting for requests to process synchronized (this) { while (!stopped && requestsToProcess == 0 && !commitIsWaiting) { wait(); commitIsWaiting = !committedRequests.isEmpty(); requestsToProcess = queuedRequests.size(); } } } Request request = null; while (!stopped && requestsToProcess > 0 && (request = queuedRequests.poll()) != null) { requestsToProcess--; if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) { // Add request to pending LinkedList<Request> requests =pendingRequests.get(request.sessionId); if (requests == null) { requests = new LinkedList<Request>(); pendingRequests.put(request.sessionId, requests); } requests.addLast(request); } else { sendToNextProcessor(request); } if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){ commitIsWaiting = true; break; } } // Handle a single committed request if (commitIsWaiting && !stopped){ waitForEmptyPool(); if (stopped){ return; } // Process committed head if ((request = committedRequests.poll()) == null) { throw new IOException("Error: committed head is null"); } LinkedList<Request> sessionQueue = pendingRequests .get(request.sessionId); if (sessionQueue != null) { // If session queue != null, then it is also not empty. Request topPending = sessionQueue.poll(); topPending.setHdr(request.getHdr()); topPending.setTxn(request.getTxn()); topPending.zxid = request.zxid; request = topPending; } sendToNextProcessor(request); waitForEmptyPool(); if (sessionQueue != null) { while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) { sendToNextProcessor(sessionQueue.poll()); } // Remove empty queues if (sessionQueue.isEmpty()) { pendingRequests.remove(request.sessionId); } } } } while (!stoppedMainLoop); } catch (Throwable e) { handleException(this.getName(), e); } LOG.info("CommitProcessor exited loop!"); }
1) 首先等待Request的到来
2) 而后判断是否须要COMMIT,若是须要则将Request放入pendingRequests等待被commit;不然直接送给FinalRequestProcessor处理。
3) 若是pendingRequests和committedRequests都不为空,则跳出小循环准备执行committedRequests中的请求。
4) 根据committedRequests中的Request找到该Requests对应的Session,并从该Session的pendingRequests中找出第一个Request执行,这样保证Request是按照Session的顺序执行的,而不会出现后续Request反而先被执行的场景。
5) 最后再按Session中的顺序执行读操做,防止后续读操做在写操做以前被执行,从而破坏了事务的顺序性。
三、 SyncRequestProcessor
将写事务请求持久化到磁盘,防止事务丢失。它的nextProcessor分红3种场景设置:
(1)Leader
这时候nextProcessor设置为AckRequestProcessor,AckRequestProcessor直接调用Leader的processAck方法。
(2)Follower
这时候nextProcessor设置为SendAckRequestProcessor,SendAckRequestProcessor向Leader发送Ack确认包。
(3)Observer
这时候nextProcessor设置为null,表示不用回复ACK给Leader。
SyncRequestProcessor类的processRequest方法定义以下:
public void processRequest(Request request) { queuedRequests.add(request); }
它的逻辑是将Request放入queueRequests队列,等待run主线程处理:
run线程主要逻辑:
while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } }
将Request写到事务日志文件中,根据条件决定是否重启新的日志滚动文件;日志文件写入完成后调用nextProcessor处理Request。
四、 AckRequestProcessor
专门给Leader使用的ACK处理单元,当Follower收到Proposal时都是经过SendAckRequestProcessor发送ACK包给Leader,而Leader在发出Proposal命令时,会调用Sync过程将Proposal写到Leader的磁盘,而后Sync过程调用AckRequestProcessor直接通知Leader收到Leader本身的ACK包,直接调用Leader的processAck方法。
public void processRequest(Request request) { QuorumPeer self = leader.self; if(self != null) leader.processAck(self.getId(), request.zxid, null); else LOG.error("Null QuorumPeer"); }
五、SendAckRequestProcessor
SyncRequestProcessor的nextProcessor是SendAckRequestProcessor。
当Follower持久化写事务请求到磁盘后,会当即调用SendAckRequestProcessor的processRequest方法给Leader发送ACK消息,等待Leader的COMMIT消息。
SendAckRequestProcessor的代码示例:
public void processRequest(Request si) { if(si.type != OpCode.sync){ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); try { learner.writePacket(qp, false); } catch (IOException e) { } } }
六、 toBeAppliedProcessor
最简单的一个处理单元,只是维护一个toBeApplied列表,而后每次将列表中一条记录发送到FinalRequestProcess or处理,同时删除该记录。
七、 PrepRequestProcessor
主要目的是将写操做请求记录在ZooKeeperServer的outstandingChanges队列中, outstandingChanges主要用于FinalRequestProcessor的processRequest方法中,用来确保FinalRequestProcessor是按照outstandingChanges的顺序来执行Request。
八、FinalRequestProcessor
最终的一个调用链处理单元。
该类用于执行最终的命令,经过ZkDatabase相关接口完成操做。
不论是Leader仍是Follower,最后的处理单元必定是FinalRequestProcessor。
主函数是processRequest,真正处理客户端请求的代码。
主要代码可分红两部分。
第一部分是写操做,代码逻辑片断以下:
synchronized (zks.outstandingChanges) { rc = zks.processTxn(request); }
该方法主要是调用ZooKeeperServer的processTxn方法来处理。具体逻辑可参考ZooKeeperServer一节。
第二部分是读操做,直接经过ZKDatabase等接口实现,好比getData和getChildren两个读操做的代码逻辑以下:
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath()); List<String> children = zks.getZKDatabase().getChildren( getChildrenRequest.getPath(), null, getChildrenRequest .getWatch() ? cnxn : null); rsp = new GetChildrenResponse(children); break; }