对于请求处理链而言,全部请求处理器的父接口为RequestProcessor。node
RequestProcessor内部类RequestProcessorException,用来表示处理过程当中的出现的异常,而proceequest和shutdown方法则是核心方法,是子类必需要实现的方法,处理的主要逻辑在proceequest中,经过proce***equest方法能够将请求传递到下个处理器。而shutdown表示关闭处理器,其意味着该处理器要关闭和其余处理器的链接。服务器
public interface RequestProcessor { @SuppressWarnings("serial") public static class RequestProcessorException extends Exception { public RequestProcessorException(String msg, Throwable t) { super(msg, t); } } void proce***equest(Request request) throws RequestProcessorException; void shutdown(); }
实现RequestProcessor的processor有不少,PrepRequestProcessor,一般是请求处理链的第一个处理器。session
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}
PrepRequestProcessor继承了ZooKeeperCriticalThread类并实现了RequestProcessor接口,表示其能够做为线程使用。ide
//已提交的请求队列 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); //下一个处理器 private final RequestProcessor nextProcessor; // zk服务器 ZooKeeperServer zks;
while (true) { //从队列获取请求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } //requestOfDeath类型的请求,表明当前处理器已经关闭,再也不处理请求。 if (Request.requestOfDeath == request) { break; } //调用关键函数 pRequest(request); }
pRequest会肯定请求类型,并根据请求类型不一样生成不一样的请求对象,咱们以建立节点为例子分析函数
//设置消息头和事务为空 request.setHdr(null); request.setTxn(null); try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: //建立节点请求 CreateRequest create2Request = new CreateRequest(); //处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; //省略其余代码 //给请求的zxid赋值 request.zxid = zks.getZxid(); //交给下一个处理器继续处理 nextProcessor.proce***equest(request);
pRequest2Txn函数是实际的处理请求的函数,对于建立方法会调用pRequest2TxnCreate函数this
//设置请求头 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); switch (type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { pRequest2TxnCreate(type, request, record, deserialize); break; }
pRequest2TxnCreate方法以下:线程
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { //反序列化,将ByteBuffer转化为Record ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { //转换createRequest对象 CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); //获取父节点路径 String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); //获取父节点的record ChangeRecord parentRecord = getRecordForPath(parentPath); //检查ACL列表 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); //是否建立顺序节点 if (createMode.isSequential()) { //子路径后追加一串数字,顺序的 path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; //父节点不能是临时节点 if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } //新的子节点版本号 int newCversion = parentRecord.stat.getCversion()+1; //新生事务 if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { //是否临时节点 s.setEphemeralOwner(request.sessionId); } //拷贝 parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); //子节点数量+1 parentRecord.childCount++; //设置新版本号 parentRecord.stat.setCversion(newCversion); //将parentRecord添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(parentRecord); // 将新生成的ChangeRecord(包含了StatPersisted信息)添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
addChangeRecord函数将ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。code
private void addChangeRecord(ChangeRecord c) { synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); zks.outstandingChangesForPath.put(c.path, c); } }
outstandingChanges 位于ZooKeeperServer 中,用于存放刚进行更改尚未同步到ZKDatabase中的节点信息。orm
znode节点会因为用户的读写操做频繁发生变化,为了提高数据的访问效率,ZooKeeper中有一个三层的数据缓冲层用于存放节点数据。对象
outstandingChanges->ZKDatabase->FileSnap+FileTxnLog