一个最简单的demo以下:node
public class ZookeeperConstructorSimple implements Watcher{ private static CountDownLatch connectedSemaphone=new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181",5000,new ZookeeperConstructorSimple()); System.out.println(zooKeeper.getState()); try { connectedSemaphone.await(); } catch (Exception e) {} System.out.println("ZooKeeper session established"); System.out.println("sessionId="+zooKeeper.getSessionId()); System.out.println("password="+zooKeeper.getSessionPasswd()); } @Override public void process(WatchedEvent event) { System.out.println("my ZookeeperConstructorSimple watcher Receive watched event:"+event); if(KeeperState.SyncConnected==event.getState()){ connectedSemaphone.countDown(); } } }
使用的maven依赖以下:apache
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
对于目前来讲,ZooKeeper的服务器端代码和客户端代码仍是混在一块儿的,估计往后能改吧。数组
使用的ZooKeeper的构造函数有三个参数构成服务器
ZooKeeper集群的服务器地址列表session
该地址是能够填写多个的,以逗号分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那客户端链接的时候究竟是使用哪个呢?先随机打乱,而后轮询着用,后面再详细介绍。dom
sessionTimeout异步
最终会引出三个时间设置:和服务器端协商后的sessionTimeout、readTimeout、connectTimeoutsocket
服务器端使用协商后的sessionTimeout:即超过该时间后,客户端没有向服务器端发送任何请求(正常状况下客户端会每隔一段时间发送心跳请求,此时服务器端会重新计算客户端的超时时间点的),则服务器端认为session超时,清理数据。此时客户端的ZooKeeper对象就再也不起做用了,须要再从新new一个新的对象了。maven
客户端使用connectTimeout、readTimeout分别用于检测链接超时和读取超时,一旦超时,则该客户端认为该服务器不稳定,就会重新链接下一个服务器地址。ide
Watcher
做为ZooKeeper对象一个默认的Watcher,用于接收一些事件通知。如和服务器链接成功的通知、断开链接的通知、Session过时的通知等。
同时咱们能够看到,一旦和ZooKeeper服务器链接创建成功,就会获取服务器端分配的sessionId和password,以下:
sessionId=94249128002584594 password=[B@4de3aaf6
下面就经过源码来详细说明这个创建链接的过程。
首先与ZooKeeper服务器创建链接,有两层链接要创建。
创建TCP链接以后,客户端发送ConnectRequest请求,申请创建session关联,此时服务器端会为该客户端分配sessionId和密码,同时开启对该session是否超时的检测。
当在sessionTimeout时间内,即还未超时,此时TCP链接断开,服务器端仍然认为该sessionId处于存活状态。此时,客户端会选择下一个ZooKeeper服务器地址进行TCP链接创建,TCP链接创建完成后,拿着以前的sessionId和密码发送ConnectRequest请求,若是还未到该sessionId的超时时间,则表示自动重连成功,对客户端用户是透明的,一切都在背后默默执行,ZooKeeper对象是有效的。
若是从新创建TCP链接后,已经达到该sessionId的超时时间了(服务器端就会清理与该sessionId相关的数据),则返回给客户端的sessionTimeout时间为0,sessionid为0,密码为空字节数组。客户端接收到该数据后,会判断协商后的sessionTimeout时间是否小于等于0,若是小于等于0,则使用eventThread线程先发出一个KeeperState.Expired事件,通知相应的Watcher,而后结束EventThread线程的循环,开始走向结束。此时ZooKeeper对象就是无效的了,必需要从新new一个新的ZooKeeper对象,分配新的sessionId了。
它是面向用户的,提供一些操做API。
它又两个重要的属性:
如建立某个node操做(同步方式):
ZooKeeper对象负责建立出Request,并交给ClientCnxn来执行,ZooKeeper对象再对返回结果进行处理。
下面来看下异步回调的方式建立node:
同步方式提交一个请求后,开始循环判断该请求包的状态是否结束,即处于阻塞状态,一旦结束则继续往下走下去,返回结果。异步方式则提交一个请求后,直接返回,对结果的处理逻辑包含在回调函数中。一旦该对该请求包响应完毕,则取出回调函数执行相应的回调方法。
至此简单了解了,ZooKeeper对象主要封装用户的请求以及处理响应等操做。用户请求的执行所有交给ClientCnxn来执行,那咱们就详细看下ClientCnxn的来源及大致内容。
先看看ClientCnxn是怎么来的:
第二步:将链接字符串信息交给ConnectStringParser进行解析
链接字符串好比: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"
解析结果以下:
获得两个数据String chrootPath默认的跟路径和ArrayList<InetSocketAddress> serverAddresses即多个host和port信息。
第三步:根据上述解析的host和port列表结果,建立一个HostProvider
有了ConnectStringParser的解析结果,为何还须要一个HostProvider再来包装下呢?主要是为未来留下扩展的余地
来看下HostProvider的详细接口介绍:
HostProvider主要负责不断的对外提供可用的ZooKeeper服务器地址,这些服务器地址能够是从一个url中加载得来或者其余途径得来。同时对于不一样的ZooKeeper客户端,给出就近的ZooKeeper服务器地址等。
来看下默认的HostProvider实现StaticHostProvider:
有三个属性,一个就是服务器地址列表(通过以下方式随机打乱了):
Collections.shuffle(this.serverAddresses)
另外两个属性用于标记,下面来具体看下,StaticHostProvider是如何实现不断的对外提供ZooKeeper服务器地址的:
代码也很简单,就是在打乱的服务器地址列表中,不断地遍历,到头以后,在从0开始。
上面的spinDelay是个什么状况呢?
正常状况下,currentIndex先加1,而后返回currentIndex+1的地址,当该地址链接成功后会执行onConnected方法,即lastIndex = currentIndex了。然而当返回的currentIndex+1的地址链接不成功,继续尝试下一个,仍不成功,仍继续下一个,就会遇到currentIndex=lastIndex的状况,此时即轮询了一遍,仍然没有一个地址可以链接上,此时的策略就是先暂停休息休息,而后再继续。
第四步:为建立ClientCnxn准备参数并建立ClientCnxn。
首先是经过getClientCnxnSocket()获取一个ClientCnxnSocket。来看下ClientCnxnSocket是主要作什么工做的:
A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.
专门用于负责socket通讯的,把一些公共部分抽象出来,其余的留给不一样的实现者来实现。如能够选择默认的ClientCnxnSocketNIO,也可使用netty等。
来看下getClientCnxnSocket()的获取ClientCnxnSocket的过程:
首先获取系统参数"zookeeper.clientCnxnSocket",若是没有的话,使用默认的ClientCnxnSocketNIO,因此咱们能够经过指定该参数来替换默认的实现。
参数准备好了,ClientCnxn是如何来建立的呢?
首先就是保存一些对象参数,此时的sessionId和sessionPasswd都尚未。而后就是两个timeout参数:connectTimeout和readTimeout。在ClientCnxn的发送和接收数据的线程中,会不断的检测链接超时和读取超时,一旦出现超时,就认为服务不稳定,须要更换服务器,就会从HostProvider中获取下一个服务器地址进行链接。
最后就是两个线程,一个事件线程即EventThread,一个发送和接收socket数据的线程即SendThread。
事件线程EventThread呢就是从一个事件队列中不断取出事件并进行处理:
看下具体的处理过程,主要分红两种状况,一种就是咱们注册的watch事件,另外一种就是处理异步回调函数:
能够看到这里就是触发咱们注册Watch的,还有触发上文提到的异步回调的状况的。
明白了EventThread是如何来处理事件的,须要知道这些事件是如何来的:
对外提供了三个方法来添加不一样类型的事件,如SendThread线程就会调用这三个方法来添加事件。其中对于事件通知,会首先根据ZKWatchManager watchManager来获取关心该事件的全部Watcher,而后触发他们。
再来看看SendThread的工做内容:
sendThread = new SendThread(clientCnxnSocket); 把传递给ClientCnxn的clientCnxnSocket,再传递给SendThread,让它服务于SendThread。
在SendThread的run方法中,有一个while循环,不断的作着如下几件事:
任务1:不断检测clientCnxnSocket是否和服务器处于链接状态,若是是未链接状态,则从hostProvider中取出一个服务器地址,使用clientCnxnSocket进行链接。
和服务器创建链接成功后,开始发送ConnectRequest请求,把该请求放到outgoingQueue请求队列中,等待被发送给服务器
任务2:检测是否超时:当处于链接状态时,检测是否读超时,当处于未链接状态时,检测是否链接超时
一旦超时,则抛出SessionTimeoutException,而后看下是如何处理呢?
能够看到一旦发生超时异常或者其余异常,都会进行清理,并设置链接状态为未链接,而后发送Disconnected事件。至此又会进入任务1的流程
任务3:不断的发送ping通知,服务器端每接收到ping请求,就会从当前时间从新计算session过时时间,因此当客户端按照必定时间间隔不断的发送ping请求,就能保证客户端的session不会过时。发送时间间隔以下:
clientCnxnSocket.getIdleSend():是最后一次发送数据包的时间与当前时间的间隔。当readTimeout的时间已通过去一半多了,都没有发送数据包的话,则执行一次Ping发送。或者过去MAX_SEND_PING_INTERVAL(10s)都尚未发送数据包的话,则执行一次Ping发送。
ping发送的内容只有请求头OpCode.ping的标示,其余都为空。发送ping请求,也是把该请求放到outgoingQueue发送队列中,等待被执行。
任务4:执行IO操做,即发送请求队列中的请求和读取服务器端的响应数据。
首先从outgoingQueue请求队列中取出第一个请求,而后进行序列化,而后使用socket进行发送。
读取服务器端数据以下:
分为两种:一种是读取针对ConnectRequest请求的响应,另外一种就是其余响应,先暂时不说。
先来看看针对ConnectRequest请求的响应:
首先进行反序列化,获得ConnectResponse对象,咱们就能够获取到服务器端给咱们客户端分配的sessionId和passwd,以及协商后的sessionTimeOut时间。
首选要根据协商后的sessionTimeout时间,从新计算readTimeout和connectTimeout值。而后保留和记录sessionId和passwd。最后经过EventThread发送一个SyncConnected链接成功事件。至此,TCP链接和session初始化请求都完成了,客户端的ZooKeeper对象能够正常使用了。
至此,咱们便了解客户端与服务器端创建链接的过程。
服务器端状况分不少种,先暂时说最简单的单机版。同时也再也不给出服务器端的启动过程(后面的文章再来详细说明)。
首先介绍下服务器端的大致概况:
服务器端默认采用NIOServerCnxnFactory来负责socket的处理。每来一个客户端socket请求,为该客户端建立一个NIOServerCnxn。以后与该客户端的交互,就交给了NIOServerCnxn来处理。对于客户端的ConnectRequest请求,处理以下:
首先反序列化出ConnectRequest
而后开始协商sessionTimeout时间
即判断用户传递过来的sessionTimeout时间是否在minSessionTimeout、maxSessionTimeout之间。协商完成以后,根据用户传递过来的sessionId是不是0进行不一样的处理。客户端第一次请求,sessionId为0。当客户端已经链接过一个服务器地址,分配了sessionId,而后若是发生超时等异常,客户端会去拿着已经分配的sessionId去链接下一个服务器地址,此时的sessionId不为0。 sessionId为0,则表明着要建立session。sessionId不为0,则须要对该sessionId进行合法性检查,以及是否已通过期了的检查。 咱们先来看看sessionId为0的状况:  大致上分三大步:一、使用sessionTracker根据sessionTimeout时间建立一个新的session 二、根据sessionId建立出密码 三、提交这个建立session的请求到请求处理器链,最终将sessionId和密码返回给客户端
下面来分别详细的说明这三个过程:
SessionTracker是用来建立删除session,执行session的过时检查的。
直接看下默认使用的SessionTrackerImpl:
先看下session有哪些属性:
而后再来看看SessionTracker的几个数据:
要搞清楚的内容有:1 建立session的过程 2 session过时检查的过程
先来看看建立session的过程:
代码很简单,就是建立一个SessionImpl对象,而后存储到SessionTracker中,同时开始计算session的超时时间。这里有一个内容就是sessionId的来历,咱们能够看到就是根据nextSessionId来的,而且是不断自增的。
sessionId是一个客户端的重要标示,是全局惟一的,先来看看单机版的nextSessionId初始化:
单机版的服务器使用1经过计算来初始化nextSessionId。而集群版对应的id则分别是每一个机器指定的sid。计算过程以下:
综上所示保证了sessionId是惟一的,不会出现重复分配的状况。
搞清楚了sessionId的分配,接下来就要弄清楚如何进行session的过时检查问题:
咱们先看下,session激活过程是怎么处理的:
首先获取这个session数据,而后计算它的超期时间
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
private long roundToInterval(long time) {
// We give a one interval grace period return (time / expirationInterval + 1) * expirationInterval;
}
便是拿当前时间加上这个session的timeout时间,而后对其进行取expirationInterval的整,即始终保持是expirationInterval的正数倍,即每一个session的过时时间点最终都会落在expirationInterval的整数倍上。
若是本来该session的超期时间就大于你所计算出的超期时间,则不作任何处理,不然设置该session的超期时间为上述计算结果的超期时间。
取出本来该session所在的超期时间,从集合里面删除
从新获取如今超期时间所在的集合,添加进去
综上所述,session的激活其实就是从新计算下超时时间,最终取expirationInterval的正数倍,而后从以前时间点的集合中移除,而后再添加到新的时间点的集合中去。
至此,session的检查就方便多了,只须要在expirationInterval整数时间点上取出集合,而后一个个标记为过时便可。而那些不断被激活的session,则不断的从一个时间点的集合中换到下一个时间点的集合中。
SessionTrackerImpl也是一个线程,该线程执行内容就是session的过时检查,以下所示:
回到建立session的三大步骤:
来看下密码是如何来产生的:
Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd);
其中superSecret为常量
static final private long superSecret = 0XB3415C00L;
使用Random的方式来随机生成字节数组。可是该字节数组,只要参数即sessionId相同,字节数组的内容就相同。即当咱们知道了sessionId,就能够利用上述方式算出对应的密码,我感受密码基本上没什么用。
再看下当客户端带着sessionId和密码进行链接的时候,这时会进行密码的检查:
看了上面的代码,就再次验证了密码没什么鸟用,知道了sessionId,就彻底知道了密码。因此这一块有待改进吧,应该不能由sessionId彻底决定吧,如再加上当前时间等等,让客户端造不出来密码,同时服务器端存储加密后的密码。
本文内容已太多,这里就先简单描述下,以后再详细的讲解
若是是成功建立session,则把sessionTimeout、sessionId、passwd传递给客户端。若是没有成功建立,上述三者的值分别是0,0,new byte[16]
以后客户端处理该响应的过程,上面已经说了,能够回头再看下。
下一篇文章开始讲述zooKeeper集群时,服务器端对用户的建立链接请求的处理。