本文首发于 泊浮目的简书: https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.3.29 | 文章首发 |
咱们知道Zookeeper是一个分布式协同系统。在一个大型的分布式系统中,必然会有大量的Client来链接Zookeeper。那么Zookeeper是如何管理这些session的生命周期呢?带着这个问题,咱们进入今天的正文。java
咱们先来看看session相关的核心类——SessionTracker(会话管理器)的抽象定义:shell
/** * This is the basic interface that ZooKeeperServer uses to track sessions. The * standalone and leader ZooKeeperServer use the same SessionTracker. The * FollowerZooKeeperServer uses a SessionTracker which is basically a simple * shell to track information to be forwarded to the leader. */ public interface SessionTracker { public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); } public static interface SessionExpirer { void expire(Session session); long getServerId(); } long createSession(int sessionTimeout); /** * Add a global session to those being tracked. * @param id sessionId * @param to sessionTimeout * @return whether the session was newly added (if false, already existed) */ boolean addGlobalSession(long id, int to); /** * Add a session to those being tracked. The session is added as a local * session if they are enabled, otherwise as global. * @param id sessionId * @param to sessionTimeout * @return whether the session was newly added (if false, already existed) */ boolean addSession(long id, int to); /** * @param sessionId * @param sessionTimeout * @return false if session is no longer active */ boolean touchSession(long sessionId, int sessionTimeout); /** * Mark that the session is in the process of closing. * @param sessionId */ void setSessionClosing(long sessionId); /** * */ void shutdown(); /** * @param sessionId */ void removeSession(long sessionId); /** * @param sessionId * @return whether or not the SessionTracker is aware of this session */ boolean isTrackingSession(long sessionId); /** * Checks whether the SessionTracker is aware of this session, the session * is still active, and the owner matches. If the owner wasn't previously * set, this sets the owner of the session. * * UnknownSessionException should never been thrown to the client. It is * only used internally to deal with possible local session from other * machine * * @param sessionId * @param owner */ public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException, KeeperException.UnknownSessionException; /** * Strictly check that a given session is a global session or not * @param sessionId * @param owner * @throws KeeperException.SessionExpiredException * @throws KeeperException.SessionMovedException */ public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException; void setOwner(long id, Object owner) throws SessionExpiredException; /** * Text dump of session information, suitable for debugging. * @param pwriter the output writer */ void dumpSessions(PrintWriter pwriter); /** * Returns a mapping of time to session IDs that expire at that time. */ Map<Long, Set<Long>> getSessionExpiryMap(); }
大体能够看到,该interface定义对会话一系列的控制方法:好比会话的建立、激活及删除等等。数据库
那么咱们来看下其SessionTrackerImpl
实现中比较重要的接口和成员变量以及方法。数组
接下来咱们来看看一个会话实例会包含哪些属性,话很少说,直接看接口定义:服务器
public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); }
咱们能够看到,在服务端,仅仅记录了client这样的三个属性:sessionId,timeout,isClosing。
网络
但在client,还会更复杂一点。好比session的状态就有好多个:session
@InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } }
一般状况下,由于网络闪断或其余缘由,client会出现和server断开的状况。所幸的是,zkClient会自动重连,这时client会变为connecting,直到连上服务器,则变connected。若是会话超时、权限检查失败或client退出程序等异常状况,则客户端会变成close状态。数据结构
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>(); private final ExpiryQueue<SessionImpl> sessionExpiryQueue; private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
sessionsById
很显然,就是经过session的id与session本体作映射的一个字典。sessionExpiryQueue
,听名字像是一个过时队列,没错,不过里面使用了分桶策略 ,稍后咱们会作分析。sessionsWithTimeout
,名字说明一切。用于标示session的超时时间,k是sessionId,v是超时时间。该数据结构和Zk的内存数据库相连通,会被按期持久化到快照里去。要谈会话管理,必然要谈到会话是怎么建立的,否则则显得有些空洞。这里不会赘述client的初始化过程。不管如何,咱们须要一个连接,毕竟不能让会话基于空气创建:并发
ClientCnxnSocket
去建立与zk之间的TCP连接。readConnectResult
方法来处理请求。这就是会话的大体建立流程了,固然咱们还省去了SyncConnected-None
的事件通知逻辑,由于这在咱们今天要将讲的内容里并不重要。app
会话过时检查是经过SessionTrackerImpl.run
来作的,这是一个线程的核心方法——显然,zk的session过时检查是经过一个线程来作的。
简单来讲,ExpiryQueue
会根据时间将会要过时的sessions进行归档。好比在12:12:54将会有session一、session二、session3
会过时,12:12:55会有session四、session五、session6
会过时,那么时间会做为一个k,而对应的过时sessions会被做为一个数组,用字典将它们映射起来:
key | value |
---|---|
12:12:54 | [session1,session2,session3] |
12:12:55 | [session4,session5,session6] |
固然,实际中间隔不会是1s,这里为了便于表达,才这么写的。真实的状况是,zk会计算每一个session的过时时间,并将其归档到对应的会话桶中。
CurrentTime+SessionTimeout
(见ExpiryQueue的update)。为了便于理解,咱们能够举几个例子,Zk默认的间隔时间是2000ms:
0 | 2000ms | 4000ms | 6000ms | 8000ms |
---|---|---|---|---|
sessionB | sessionA |
这样线程就不用遍历全部的会话去逐一检查它们的过时时间了,有点妙。在这里,也能够简单的讲一下会话清理步骤:
PrepRequestProcessor
,使其在整个Zk集群里生效。sessionsWithTimeout
和内存数据库是共通的。FinalRequestProcessor.processRequest
)。sessionsById
、sessionExpiryQueue
、sessionsWithTimeout
中移除。FinalRequestProcessor.closeSession
)。从上面看来,session彷佛是到了事先计算好的时间就会过时。其实并不是如此——client会经过发送请求or心跳请求来保持会话的有效性,即延迟超时时间。这个过程通常叫作TouchSession(没错,代码里也是这么叫的)。咱们来简单的讲一下流程:
/** * Generates an initial sessionId. High order byte is serverId, next 5 * 5 bytes are from timestamp, and low order 2 bytes are 0s. */ public static long initializeNextSession(long id) { long nextSid; nextSid = (Time.currentElapsedTime() << 24) >>> 8; nextSid = nextSid | (id <<56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }
简单来讲,前7位肯定了所在的机器,后57位使用当前时间的毫秒表示进行随机。
@Override public void run() { try { while (running) { long waitTime = sessionExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (SessionImpl s : sessionExpiryQueue.poll()) { setSessionClosing(s.sessionId); expirer.expire(s); } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); }
逻辑很简单。去sessionExpiryQueue
里看一下离最近的过时时间还要多久,有的话就等一下子。
接下来是标记成Closing,并开始作使过时
操做。
咱们接着看expirer.expire
:
public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); }
跳向close
:
private void close(long sessionId) { Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); setLocalSessionFlag(si); submitRequest(si); }
就是build一个新的请求,而后set本地的flag。关键方法是submitRequest
:
public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }
第一段逻辑是等待Processor的chain准备好。接下来是激活一下会话,但会话若是已经被移除或超时,则会抛出异常。这个状况很正常,由于client的session和这里的移除请求并非同时作的。
接下来则是提交移除会话
的请求。
synchronized public boolean touchSession(long sessionId, int timeout) { SessionImpl s = sessionsById.get(sessionId); if (s == null) { logTraceTouchInvalidSession(sessionId, timeout); return false; } if (s.isClosing()) { logTraceTouchClosingSession(sessionId, timeout); return false; } updateSessionExpiry(s, timeout); return true; }
获取和校验逻辑再也不赘述。直接跳向核心方法ExpiryQueue.update
:
/** * Adds or updates expiration time for element in queue, rounding the * timeout to the expiry interval bucketed used by this queue. * @param elem element to add/update * @param timeout timout in milliseconds * @return time at which the element is now set to expire if * changed, or null if unchanged */ public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); Long newExpiryTime = roundToNextInterval(now + timeout); if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap( new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; }
逻辑很是简单。计算最新的过时时间,并放置到新的归档区间里,再移除掉老归档区间里的会话实例。
在本文中,笔者和你们一块儿了剖析了Zk的Session管理机制。其中的分桶策略在这种大量Client会话场景下显得很是有用,显著提高了会话超时的清理效率。