目录java
用一张图来讲明session创建过程当中client和server的交互算法
主要流程apache
客户端要发起链接要先启动,不管是使用curator client仍是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper
。网络
Zookeeper初始化的主要工做是初始化本身的一些关键组件session
ClientCnxn包含两个线程框架
ClientCnxn初始化的过程就是初始化启动这两个线程,客户端发起链接的主要逻辑在SendThread线程中异步
// org.apache.zookeeper.ClientCnxn.SendThread#run @Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { // client是否链接到server,若是没有链接到则链接server if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } // 这个里面去链接server startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } // 省略中间代码... clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); // 省略中间代码... }
SendThread#run是一个while循环,只要client没有被关闭会一直循环,每次循环判断当前client是否链接到server,若是没有则发起链接,发起链接调用了startConnectsocket
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { // 经过hostProvider来获取一个server地址 addr = hostProvider.next(1000); } // 省略中间代码... // 创建client与server的链接 clientCnxnSocket.connect(addr); }
到这里client发起了socket链接,server监听的端口收到client的链接请求后和client创建链接。ide
socket链接创建后,client会向server发送一个ConnectRequest来创建session链接。两种状况会发送ConnectRequestthis
发送ConnectRequest是在下面的方法中
// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // 省略中间代码... // 将conReq封装为packet放入outgoingQueue等待发送 outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); } clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } }
请求中带的参数
在server启动的过程当中除了会启动用于选举的网络组件还会启动用于处理client请求的网络组件
org.apache.zookeeper.server.NIOServerCnxnFactory
主要启动了三个线程:
client发起socket链接的时候,server监听了该端口,接收到client的链接请求,而后把创建练级的SocketChannel放入队列里面,交给SelectorThread处理
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; }
SelectorThread是一个不断循环的线程,每次循环都会处理刚刚创建的socket链接
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run while (!stopped) { try { select(); // 处理对立中的socket processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 向该socket注册读事件 key = accepted.register(selector, SelectionKey.OP_READ); // 建立一个NIOServerCnxn维护session NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); // 省略中间代码... } }
说了这么久,咱们说的session到底是什么尚未解释,session中文翻译是会话,在这里就是zk的server和client维护的一个具备一些特别属性的网络链接,网络链接这里就是socket链接,一些特别的属性包括
因此session创建的两步就是
server收到ConnectRequest以后,按照正常处理io的方式处理这个request,server端的主要操做是
整体流程是
其中有一个session生成算法咱们来看下
public static long initializeNextSession(long id) { // sessionId是long类型,共8个字节,64位 long nextSid; // 取时间戳的的低40位做为初始化sessionId的第16-55位,这里使用的是无符号右移,不会出现负数 nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 使用serverId(配置文件中指定的myid)做为高8位 nextSid = nextSid | (id <<56); // nextSid为long的最小值,这中状况不可能出现,这里只是做为一个case列在这里 if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }
初始化sessionId的组成
myid(1字节)+截取的时间戳低40位(5个字节)+2个字节(初始化都是0)
每一个server再基于这个id不断自增,这样的算法就保证了每一个server的sessionId是全局惟一的。
session在zk框架中是一个重要概念,不少功能都依赖于session,好比临时节点,session关闭后就自动删除了。本文主要介绍了session的创建过程当中client和server各自的处理方式。