FinalRequestProcessor是请求处理链中最后的一个处理器。数据库
public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks; }
FinalRequestProcessor只实现了RequestProcessor接口,须要实现process Request方法和shutdown方法。服务器
核心属性为zks,表示Zookeeper服务器,能够经过zks访问到Zookeeper内存数据库。session
咱们看一下核心方法process Request代码:less
synchronized (zks.outstandingChanges) { // Need to process local session requests // 当前节点,处理请求,若为事务性请求,则提交到ZooKeeper内存数据库中。 // 对于processTxn函数而言,其最终会调用DataTree的processTxn rc = zks.processTxn(request); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. //只有写请求才会有消息头 if (request.getHdr() != null) { TxnHeader hdr = request.getHdr(); Record txn = request.getTxn(); long zxid = hdr.getZxid(); //当outstandingChanges不为空且其首元素的zxid小于等于请求的zxid时, // 就会一直从outstandingChanges中取出首元素,而且对outstandingChangesForPath作相应的操做 while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = zks.outstandingChanges.remove(); if (cr.zxid < zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid); } if (zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. //判断是否为事务性请求则是经过调用isQuorum函数 //只将quorum包(事务性请求)添加进队列 //addCommittedProposal函数将请求添加至ZKDatabase的committedLog结构中 if (request.isQuorum()) { zks.getZKDatabase().addCommittedProposal(request); } }
根据请求的建立时间来更新Zookeeper服务器的延迟,updateLatency函数中会记录最大延迟、最小延迟、总的延迟和延迟次数。
而后更新响应中的状态,如请求建立到响应该请求总共花费的时间、最后的操做类型等。而后设置响应后返回ide
case OpCode.ping: { //更新延迟 zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; // 更新响应的状态 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime()); // 设置响应 cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; }
其余请求与此相似,
最后会根据其余请求再次更新服务器的延迟,设置响应的状态等函数
// 获取最后处理的zxid long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); // 响应头 ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); // 更新服务器延迟 zks.serverStats().updateLatency(request.createTime); // 更新状态 cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
最后使用sendResponse函数将响应发送给请求方。code
try { //返回相应 cnxn.sendResponse(hdr, rsp, "response"); if (request.type == OpCode.closeSession) { //关闭会话 cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); }