本文首发于 泊浮目的简书: https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.5.23 | 文章首发 |
当咱们向zk发出一个数据更新请求时,这个请求会通过怎样的处理流程呢?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,咱们进入今天的正文。java
在分析源码以前,必须先和你们简单的科普一下责任链模式,由于这和本文的内容密切相关。简单的说:责任链模式将多个对象组成一条指责链,而后按照它们在职责链的顺序一个个地找出到底谁来负责处理。node
那它的好处是什么呢?仔细想一想,该设计模式像极面向对象版本的if...else if...else
(咱们都知道if...else if...else
属于面向过程),但因为面向对象的特性,会比面向过程更容易复用。算法
咱们先从ZooKeeperServer
这个类入手,查看其实现类。咱们须要关心的有(常见的zk服务器角色以下):sql
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }
整理一下顺序:数据库
接下来分两个分支:apache
@Override public void processRequest(Request request) throws RequestProcessorException { // Check if this is a local session and we are trying to create // an ephemeral node, in which case we upgrade the session Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { LOG.debug("Updating header"); request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request " + ke.getMessage()); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
这段逻辑很清楚。因须要检查会话是否过时,去建立一个临时节点。若是失败那么就抛出异常。设计模式
该类有1000多行代码,故此会挑出较为典型的代码进行剖析。在此以前,咱们先看注释:服务器
This request processor is generally at the start of a RequestProcessor
change. It sets up any transactions associated with requests that change the
state of the system. It counts on ZooKeeperServer to update
outstandingRequests, so that it can take into account transactions that are
in the queue to be applied when generating a transaction.
简单来讲,它通常位于请求处理链的头部,它会设置事务型请求(改变系统状态的请求)。session
对于建立型请求逻辑大体为:并发
case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break;
跳往pRequest2Txn
。
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); switch (type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { pRequest2TxnCreate(type, request, record, deserialize); break; } //....多余代码再也不展现
跳往pRequest2TxnCreate
:
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
大体能够总结下逻辑:
outstandingChanges
队列事务型请求:
case OpCode.multi: MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) { Record subrequest = op.toRequestRecord(); int type; Record txn; /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { type = OpCode.error; txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); } /* Prep the request and convert to a Txn */ else { try { pRequest2Txn(op.getType(), zxid, request, subrequest, false); type = request.getHdr().getType(); txn = request.getTxn(); } catch (KeeperException e) { ke = e; type = OpCode.error; txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} aborting" + " remaining multi ops. Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); /* Rollback change records from failed multi-op */ rollbackPendingChanges(zxid, pendingChanges); } } //FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array())); } request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type)); request.setTxn(new MultiTxn(txns)); break;
代码虽然看起来很恶心,可是逻辑却是挺简单的:
//All the rest don't need to create a Txn - just verify session case OpCode.sync: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break;
非事务型请求,校验一下session就能够发送至下一个Processor了。
其本质为事务性请求进行一些预处理,如:建立事务头、事务体、会话检查、ACL检查和版本检查等等。
对于事务请求会发起Proposal,并发送给CommitProcessor。并且ProposalRequestProcessor还会将事务请求交付给SyncRequestProcessor。
public void processRequest(Request request) throws RequestProcessorException { // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " + // request.type + " id = " + request.sessionId); // request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower * handler adds it to syncHandler. Otherwise, if it is a client of * the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor. */ if (request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { nextProcessor.processRequest(request); if (request.getHdr() != null) { // We need to sync and get consensus on any transactions try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); } } }
接着看propose:
/** * create a proposal and send it out to all the members * * @param request * @return the proposal that is queued to send to all the members */ public Proposal propose(Request request) throws XidRolloverException { /** * Address the rollover issue. All lower 32bits set indicate a new leader * election. Force a re-election instead. See ZOOKEEPER-1277 */ if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); } byte[] data = SerializeUtils.serializeRequest(request); proposalStats.setLastBufferSize(data.length); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized(this) { p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig){ self.setLastSeenQuorumVerifier(request.qv, true); } if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) { p.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } if (LOG.isDebugEnabled()) { LOG.debug("Proposing:: " + request); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); sendPacket(pp); } return p; }
此次提交的记录是一个QuorumPacket
,其实现了Record
接口。指定了type为PROPOSAL。咱们看一下注释:
/** * This message type is sent by a leader to propose a mutation. */ public final static int PROPOSAL = 2;
很显然,这个只有Leader才能够发起的一种变化型请求。再简单描述下逻辑:
outstandingProposals
的Map里顾名思义,事务提交器。只关心事务请求——等待集群内Proposal投票直到可被提交。有了CommitProcessor,每一个服务器均可以很好的对事务进行顺序处理。
该部分的代码实在简陋,故不占篇幅来分析。读者朋友知道上述信息后,也能够理解整个请求链是怎样的。
逻辑很是的简单,将请求记录到事务日志中,并尝试触发快照。
public void processRequest(Request request) { // request.addRQRec(">sync"); queuedRequests.add(request); } //线程的核心方法,会对queuedRequests这个队列进行操做 @Override public void run() { try { int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // track the number of records written to the log if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); }
该处理器的核心为一个toBeApplied队列,专门用来存储那些已经被CommitProcessor处理过的可提交的Proposal——直到FinalRequestProcessor处理完后,才会将其移除。
/* * (non-Javadoc) * * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request) */ public void processRequest(Request request) throws RequestProcessorException { next.processRequest(request); // The only requests that should be on toBeApplied are write // requests, for which we will have a hdr. We can't simply use // request.zxid here because that is set on read requests to equal // the zxid of the last write op. if (request.getHdr() != null) { long zxid = request.getHdr().getZxid(); Iterator<Proposal> iter = leader.toBeApplied.iterator(); if (iter.hasNext()) { Proposal p = iter.next(); if (p.request != null && p.request.zxid == zxid) { iter.remove(); return; } } LOG.error("Committed request not found on toBeApplied: " + request); } }
该类核心逻辑约有500多行,为节约篇幅,在这里仅描述下逻辑——既然是最后一个处理器,那么总得回复请求吧。并负责事务请求的生效——改变内存数据库的状态。
先看一下其组装Processors的代码:
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
能够看到,这里又两对儿请求链:
那么请求来的时候,是哪一个Processor来handle呢?这边能够大体跟踪一下:
ZooKeeperServer
来调度,handle 请求logRequest
的入口进来。该类的由Learner
调度进来,handle leader的请求。看到这里有人就要问了,这明明是个Observer,怎么从Learner进来的呢?这就得看签名了:
/** * This class is the superclass of two of the three main actors in a ZK * ensemble: Followers and Observers. Both Followers and Observers share * a good deal of code which is moved into Peer to avoid duplication. */ public class Learner {
为了不重复代码,就把一些共同的代码抽取上来了。
Follower的正常处理器,会判断是否是事务,是事务就发送给Leader,否则本身处理。
FollowerRequestProcessor.run
@Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }
而提交到CommitProcessor
说的很清楚,这样就能够收到回复。该Processor在前文说过,用于等待集群内Proposal投票直到可被提交。也是这个时候,该Follower的CommitProcessor已经在等待Proposal投票被提交了。
public void processRequest(Request si) { if(si.type != OpCode.sync){ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); try { learner.writePacket(qp, false); } catch (IOException e) { LOG.warn("Closing connection to leader, exception during packet send", e); try { if (!learner.sock.isClosed()) { learner.sock.close(); } } catch (IOException e1) { // Nothing to do, we are shutting things down, so an exception here is irrelevant LOG.debug("Ignoring error closing the connection", e1); } } } }
逻辑很是的简单,用于反馈ACK
成功,表示自身完成了事务日志的记录。
/** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
逻辑很清晰(大概是由于3.3.0后加入的代码吧),正常的请求链为:
若是syncRequestProcessorEnabled
开启的状况下(缺省为开),这意味着Observer也会去记录事务日志以及作快照,这会给降低必定的性能,以及更多的内存要求。
而后再看下ObserverRequestProcessor
,简直和FollowerRequestProcessor
一模一样,难道他们不知道复用一下代码吗???
@Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getObserver().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getObserver().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("ObserverRequestProcessor exited loop!"); }
以上,就是源码分析部分,基于3.5.7
版本。
上面和你们过了一下源码,相信各位对ZK请求处理流程有了必定的了解。接下来,让咱们理一理事务请求的过程。从Leader的ProposalRequestProcessor开始,大体会分为三个阶段,即:
主要由ProposalRequestProcessor
来作,参与Proposql的机器(Leader和Follower)都要记录事务日志。
每一个事务请求都要超过半数的投票承认(Leader + Follower)。
toBeApplied
队列中。(见org.apache.zookeeper.server.quorum.Leader.tryToCommit)COMMIT
,而Observer的为Inform
。这使它们提交该Proposal。(见org.apache.zookeeper.server.quorum.Leader.commit && inform)直到这里,算是完成了SyncRequestProcessor -> AckRequestProcessor
接下来说CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor
的过程。
committedRequests
这个队列中,而后一个个发送至ToBeAppliedRequestProcessor里去。toBeApplied
队列中移除这个Proposal。PrepRequestProcessor
时添加元素的。若是是小于的话说明当前事务比以前的事务早到达了这里,故移除掉(中途可能已经失败了)。接着就是应用到内存数据库了,该内存数据库实例会维护一个默认上限为500的committedLog——存放最近成功的Proposal,便于快速同步。