Follower使用的ZooKeeperServer的子类FollowerZooKeeperServer,同标准的ZooKeeperServer很类似,除了更换了调用链。node
首先看下它的调用链的定义:缓存
FollowerZooKeeperServer的调用链的初始化代码以下:session
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
从源码看出LeaderZooKeeperServer的调用链顺序以下:异步
Follower的调用链第一个处理单元,FollowerZooKeeper的firstProcessor指向。FollowerRequestProcessor是异步方式处理请求,因此用本地队列缓存一下。FollowerRequestProcessor首先将请求包转发给leader处理,而后等待leader处理以后的返回结果。oop
它的processRequest方法只是将请求放入Queue,真正处理是run方法来执行的。run方法循环不断的从Queue取出下一条请求,而后交给nextProcessor处理。this
public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (request == Request.requestOfDeath) { break; } nextProcessor.processRequest(request); switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }
从FollowerZooKeeperServer的调用链设置方法中能够看到,这个nextProcessor是CommitProcessor。spa
要注意的是若是请求是写请求类(好比建立、设置、修改znode数据等),会额外增长一个动做,就是调用Follower类的的request方法(即父类Learner的request方法)处理。request方法会发送REQUEST消息给Leader。Leader收到REQUEST后触发LeaderZooKeeperServer的submitLearnerRequestrequest方法执行。日志
若是不是写请求类,就不须要调用Follower的request方法。code
当客户端链接的是FollowerZooKeeperServer时,写事务请求首先须要被Follower转换成REQUEST消息发给Leader,Leader收到REQUEST后从调用链的PrepRequestProcessor开始处理,后续流程就和LeaderZooKeeperServer同样了。blog
全部Follower自己是不直接处理写请求的!
读操做不涉及数据和状态的变动,所以不须要维护集群数据的一致性,流程相对于写操做要简单些。
具体读操做流程:
1. Request会发送到Follower的firstProcessor处理,这里是FollowerRequestProcessor
2. FollowerRequestProcessor首先经过nextProcessor处理该请求,这里nextProcessor是CommitProcessor,这里CommitProcessor对读操做不作处理直接将命令发送到下个处理环节(FinalRequestProcessor直接执行)。
3. FinalRequestProcessor执行读操做请求,返回结果给客户端,本次客户端命令结束。
Follower接收到Leader发送的PROPOSAL包时,会触发logRequest方法,记录到WAL日志。
logRequest中主要代码片断以下:
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } syncProcessor.processRequest(request);
首先将PROPOSAL携带的事务添加到pendingTxns队列等待执行(等待COMMIT消息),而后经过syncProcessor将PROPOSAL持久化到磁盘,接着调用syncProcessor的nextProcessor处理单元,这里的nextProcessor是SendAckRequestProcessor。
SendAckRequestProcessor的processRequest方法回复一条Leader.ACK类型的QuorumPacket包给Leader。Leader经过LearnerHandler收到ACK包并调用Leader的processAck方法处理。当Leader收到足够的ACK后会发送COMMIT消息给Follower,此时Follower执行pendingTxns中等待被执行的proposal。