ZooKeeper源码研究系列(3)单机版服务器介绍

#1 系列目录node

#2 单机版服务器启动方式服务器

单机版的服务器启动,使用ZooKeeperServerMain的main函数来启动,参数分为两种:网络

  • 只有一个参数:表示为一个配置文件地址
  • 有2~4个参数:分别表示端口、dataDir、tickTime、maxClientCnxns

详细介绍见开篇的介绍运行ZooKeepersession

接下来看下启动的整个过程:socket

输入图片说明

  • 第一步:建立一个ZooKeeperServer,表明着一个服务器对象
  • 第二步:根据配置参数dataLogDir和dataDir建立出用于管理事务日志和快照的对象FileTxnSnapLog
  • 第三步:对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout
  • 第四步:建立ServerCnxnFactory,用于建立ServerSocket,等待客户端的socket链接
  • 第五步:启动ZooKeeperServer服务

后面第四部分详细说明。函数

#3 ZooKeeperServer服务器对象源码分析

ZooKeeperServer是单机版才使用的服务器对象,集群版都是使用的是它的子类,来看下继承类图.net

ZooKeeperServer类图

能够看到,集群版分别用的是LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer。后二者都属于LearnerZooKeeperServer。线程

##3.1 ZooKeeperServer的重要属性代理

ZooKeeperServer的重要属性

  • tickTime:默认3000ms,用于计算默认的minSessionTimeout、maxSessionTimeout。计算方式以下:

    public int getMinSessionTimeout() {
    	return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
    }
    
    public int getMaxSessionTimeout() {
    	return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
    }

    同时还用于指定SessionTrackerImpl的执行过时检查的周期时间,详细见说明使用sessionTracker的session过时检查

  • minSessionTimeout、maxSessionTimeout:用于限制客户段给出的sessionTimeout时间

  • SessionTracker sessionTracker:负责建立和管理session,同时负责定时进行过时检查

  • ZKDatabase zkDb:用于存储ZooKeeper树形数据的模型

  • FileTxnSnapLog txnLogFactory:负责管理事务日志和快照日志文件,能根据它加载出数据到ZKDatabase中,同时能将ZKDatabase中的数据以及session保存到快照日志文件中。后面会详细说明FileTxnSnapLog。

    操做以下:

    new ZKDatabase(txnLogFactory)
    
    txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
  • RequestProcessor firstProcessor:ZooKeeperServer请求处理器链中的第一个处理器

  • long hzxid:ZooKeeperServer最大的事务编号,每来一个事务请求,都会分配一个事务编号

  • ServerCnxnFactory serverCnxnFactory:负责建立ServerSocket,接受客户端的socket链接

  • ServerStats serverStats:负责统计server的运行状态

##3.2 ZKDatabase介绍

先来看下ZKDatabase的注释和属性

ZKDatabase的注释和属性

从注释中能够看到ZKDatabase中所包含的信息有:

  • sessions信息,即ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,也就是说仅仅会保存sessionId对应的timeout时间
  • DataTree:即ZooKeeper的内存节点信息
  • LinkedList<Proposal> committedLog:用于保存最近提交的一些事物

来重点看下DataTree的实现:

  • ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();

    维护了path对应的DataNode。每一个DataNode内容以下:

    DataNode内容

    有DataNode parent和Set<String> children,同时byte data[]存储本节点的数据。StatPersisted stat存储本节点的状态信息

  • Map<Long, HashSet<String>> ephemerals =new ConcurrentHashMap<Long, HashSet<String>>()

    维护了每一个session对应的临时节点的集合

  • WatchManager dataWatches、WatchManager childWatches分别用于管理节点自身数据更新的事件触发和该节点的全部子节点变更的事件触发。

    每一个WatchManager的结构以下:

    WatchManager的结构

    watchTable维护着每一个path对应的Watcher。watch2Paths维护着每一个Watcher监控的全部path,即每一个Watcher是能够监控多个path的。在服务器端Watcher的实现实际上是ServerCnxn,以下:

    public abstract class ServerCnxn implements Stats, Watcher

    而每一个ServerCnxn则表明服务器端为每一个客户端分配的handler,负责与客户端进行通讯。客户端每次对某个path注册的Watcher,在传输给服务器端的时候仅仅是传输一个boolean值,便是否监听某个path,并无把咱们自定义注册的Watcher传输到服务器端(何况Watcher也不能序列化),而是在本地客户端进行存储,存储着对某个path注册的Watcher。服务器端接收到该boolean值以后,若是为true,则把该客户端对应的ServerCnxn做为Watcher存储到上述WatchManager中,即上述WatchManager中存储的是一个个ServerCnxn实例。一旦服务器端数据变化,触发对应的ServerCnxn,ServerCnxn而后把该事件又传递客户端,客户端这时才会真正引起咱们自定义注册的Watcher。

    上面只是简单描述了一下,以后的文章会详细源码分析整个过程。

DataTree就负责进行node的增删改查。

咱们知道node的类型分为四种类型:

  • PERSISTENT:持久型节点
  • PERSISTENT_SEQUENTIAL:持久型顺序型节点
  • EPHEMERAL:临时型节点
  • EPHEMERAL_SEQUENTIAL:临时型顺序型节点

前二者持久型节点和后二者临时型节点的不一样之处就在于,一旦当客户端session过时,则会清除临时型节点,不会清除持久型节点,除非去执行删除操做。

而顺序型节点,则是每次建立一个节点,会在一个节点路径的后面加上父节点的cversion版本号(即该父节点的全部子节点一旦发生变化,就会自增该版本号)的格式化形式,以下:

顺序型节点来来历

能够看到是将父节点的cversion版本号以10进制形式输出,宽度是10位,不足的话前面补0。因此是在执行DataTree建立node方法以前就已经定好了path路径的。

再来看下是如何区分持久型和临时型节点的呢?

在DataTree建立node方法会传递一个ephemeralOwner参数,当客户端选择的是持久型节点,给出的sessionId为0,当为临时型节点时,给出客户端的sessionId,以下:

输入图片说明

先来看下DataTree建立node方法的方法:

输入图片说明

  • 先判断父节点存不存在,不存在的话,报错。
  • 而后检查父节点的全部子节点是否已存在要建立的节点,若是存在报错。
  • 建立出节点,并存放到DataTree的ConcurrentHashMap<String, DataNode> nodes属性中,见上文描述
  • 判断该节点是不是临时节点,若是是临时节点,则ephemeralOwner参数即为客户端的sessionId。而后以sessionId为key,存储该客户端所建立的全部临时节点到DataTree的Map<Long, HashSet<String>> ephemerals属性中,见上文描述

ZKDatabase先暂时介绍到这里,以后抽出一篇文章单独介绍DataTree和FileTxnSnapLog。

##3.3 ZooKeeperServer请求处理器链介绍 ZooKeeper使用请求处理器链的方式来处理请求,先看下请求处理器的定义RequestProcessor:

RequestProcessor定义

从注释上能够看到几个要点:

  • RequestProcessor是以责任链的形式来处理事务的。
  • 请求是被顺序的进行处理的,单机版、集群版的Leader、Follower略有不一样
  • 对于请求的处理,是经过processRequest(Request request)方法来处理的。有些处理器是一个线程,即请求被扔到该线程中进行处理
  • 当调用shutdown时,也会关闭它所关联的RequestProcessor

来看下ZooKeeperServer请求处理器链的具体状况:

ZooKeeperServer请求处理器链

即PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor

来一个一个具体看看:

###3.3.1 PrepRequestProcessor处理器

主要内容:对请求进行区分是不是事务请求,若是是事务请求则建立出事务请求头,同时执行一些检查操做。

大致属性以下:

PrepRequestProcessor属性

  • LinkedBlockingQueue<Request> submittedRequests:提交的用户请求
  • RequestProcessor nextProcessor:下一个请求处理器
  • ZooKeeperServer zks:服务器对象

PrepRequestProcessor所实现的processRequest接口方法即为:将该请求放入submittedRequests请求队列中。同时PrepRequestProcessor又是一个线程,在run方法中又会不断的取出上述用户提交的请求,进行处理,整个处理过程以下:

建立事务请求头

对于增删改等影响数据状态的操做都被认为是事务,须要建立出事务请求头。

只需验证session

createSession、closeSession也属于事务操做,而那些获取数据的操做则不属于事务操做,只须要验证下sessionId是否合法等

处理完成以后就交给了下一个处理器继续处理该请求。

咱们以建立session和建立节点为例,来具体看下代码:

先看下建立session,即以下代码:

session建立处理

首先会为该request获取一个事务id即zxid,该zxid的值来自于ZooKeeper服务器的一个hzxid变量,默认是0,每来一个请求就会执行自增操做。

建立事务请求头

建立session的具体内容

首先获取客户端传递过来的sessionTimeout时间,而后使用ZooKeeperServer的sessionTracker来建立一个session,同时为该session的owner属性赋值,可是对于建立session的request请求,并无为owner赋值。而是在建立其余请求的时候才会为请求的owner赋值为本机器。

代码见证以下:

建立session的request以下:

输入图片说明

建立其余的request的以下:

输入图片说明

接下来看看建立一个node的处理:

  • 首先进行的是session检查。

    session及owner的检查

    先检查服务器端该sessionId仍是否存在,若是不存在则表示已通过期,抛出SessionExpiredException异常。若是session存在,owner为空,则会对owner进行赋值。若是owner存在则进行owner核对,若是不一致抛出SessionMovedException异常。

    第一次建立session后,该session的owner是为空的,以后的请求操做owner都是有值的,此时则会为该session赋值。

    咱们想象下这样的场景:客户端链接一台服务器server1,该客户端拥有的session的owner是server1,客户端发送操做请求,因为网络缘由形成请求阻塞,客户端认为server1不稳定,则会拿着刚才的session去链接另外一台服务器server2,链接成功后,该session对应的owner被设置为null了(根据上文知道建立session的时候,owner会被清空),而后继续在server2上执行一样的操做,此时会为该session的owner属性赋值为server2,若是以前对server1的请求此时终于到达server1了,此request的owner是server1,则在检查的时候,发现该owner不一致,server1则会抛出SessionMovedException异常。即session的owner已经变化了的异常。则会阻止该请求的执行,防止了重复执行相同的操做。这里再留个疑问:为何当建立session的时候要清空owner呢?

  • 反序列化出CreateRequest对象,获取要建立的路径,同时获取父路径,检查该父路径是否存在,若是不存在抛出异常NoNodeException。若是存在获取父路径的修改记录,验证对父路径是否有修改权限

  • 从CreateRequest中获取用户建立的node的类型,若是是临时节点的话,则根据父路径的子节点的版本cversion,来生成该临时节点的路径后缀部分。而后验证该路径是否存在,若是存在则抛出NodeExistsException异常

  • 判断父节点是不是临时节点,若是是临时节点则不该该有子节点,抛出NoChildrenForEphemeralsException异常,这部分代码该判断应该是提早应该作的,而不是留到如今才来判断

  • 若是该节点是临时节点,则为该节点ephemeralOwner属性设置为对应的sessionId,若是是永久节点则设置为0。而DataTree则是依据ephemeralOwner是否为0,来判断是不是临时节点仍是持久节点,若是是临时节点,则会另外存储一份数据,以sessionId为key,即列出了每一个sessionId所包含的全部临时节点,一旦该sessionId失效,则直接拿出该列表进行清除操做便可,不用再去遍历全部的节点了。

  • 产生两条变化记录,分别是父节点的子节点列表变化的记录,和要建立的节点的建立记录。

至此便完成预处理操做。该交给下一个RequestProcessor处理器来处理了

###3.3.2 SyncRequestProcessor处理器

主要对事务请求进行日志记录,同时事务请求达到必定次数后,就会执行一次快照。

主要属性以下:

SyncRequestProcessor属性

  • ZooKeeperServer zks:ZooKeeper服务器对象

  • LinkedBlockingQueue<Request> queuedRequests:提交的请求(包括事务请求和非事务请求)

  • RequestProcessor nextProcessor:下一个请求处理器

  • Thread snapInProcess:执行一次快照任务的线程

  • LinkedList<Request> toFlush:那些已经被记录到日志文件中但还未被flush到磁盘上的事务请求

  • int snapCount:发生了snapCount次的事务日志记录,就会执行一次快照

  • int randRoll:上述是一个对全部服务器都统一的配置数据,为了不全部的服务器在同一时刻执行快照任务,实际状况为发生了(snapCount / 2 + randRoll)次的事务日志记录,就会执行一次快照。randRoll的计算方式以下:

    r.nextInt(snapCount/2)

接下来就详细看下SyncRequestProcessor(也是一个线程)的详细实现:

对于RequestProcessor定义的接口:processRequest(Request request),SyncRequestProcessor和PrepRequestProcessor同样,都是讲请求放入阻塞式队列中,而后在线程run方法中执行相应的逻辑操做。

首先仍是从LinkedBlockingQueue<Request> queuedRequests队列中取出一个Request,处理以下:

SyncRequestProcessor处理过程

  • 第一步:将该请求添加到事务日志中,这一部分会区分Request是事务请求仍是非事务请求,依据就是前一个处理器PrepRequestProcessor为Request加上的事务请求头。若是是事务请求,则添加成功后返回true,添加成功即为将该请求序列化到一个指定的文件中。若是是非事务请求,直接返回false。
  • 第二步:若是是事务请求,添加到事务日志中后,logCount++,该logCount就是用于记录已经执行多少次事务请求序列化到日志中了。
  • 第三步:一旦logCount超过(snapCount / 2 + randRoll)次后,就须要执行一次快照了。
  • 第四步:先将当前的事务日志记录flush到磁盘中,而后设置当前流为null,以便下一次事务日志记录从新开启一个新的文件来记录
  • 第五步:建立一个ZooKeeperThread线程,用于执行一次快照任务,则会把当前的dataTree和sessionsWithTimeouts信息序列化到一个文件中。
  • 第六七步:若是是非事务请求的话,则会直接交给下一个RequestProcessor处理器来处理。咱们看到这里还加上了一个toFlush.isEmpty()的判断,即以前没有请求遗留,只有在这样的条件下才会直接交给下一个RequestProcessor处理器来处理,主要是为了保证请求的顺序性。若是以前还有遗留的请求,则后来的请求不能被先处理。

上述的请求除了直接被下一个处理器处理的状况,其他大部分都会被保存到LinkedList<Request> toFlush中,何时才会被执行呢?

toFlush中的request被执行1

toFlush中的request被执行2

两种状况下会被执行flush:

  • 当request数量超过1000
  • 当没有请求到来的时候

来看下具体的flush过程:

flush过程

  • 第一步:执行事务日志文件执行commit操做。上述rollLog操做仅仅是先flush,而后设置当前日志记录流为null,以便下一次从新开启一个新的事务日志文件,同时这些流都会被保存到LinkedList<FileOutputStream> streamsToFlush属性中,commit操做则是先flush这些全部的流,而后执行这些流的close操做。
  • 第二步:即是将请求交给下一个处理器来处理

至此SyncRequestProcessor的内容也完成了,接下来就是下一个请求处理器即FinalRequestProcessor

###3.3.2 FinalRequestProcessor处理器

做为处理器链上的最后一个处理器,负责执行请求的具体任务,前面几个处理器都是辅助操做,如PrepRequestProcessor为请求添加事务请求头和执行一些检查工做,SyncRequestProcessor也仅仅是把该请求记录下来保存到事务日志中。该请求的具体内容,如获取全部的子节点,建立node的这些具体的操做就是由FinalRequestProcessor来完成的。

下面就来详细看看FinalRequestProcessor处理request的过程

FinalRequestProcessor的处理内容

  • 对于request是顺序执行,要删除那些zxid小于当前request的zxid的outstandingChanges、以及outstandingChangesForPath 。这里就有一个疑问:outstandingChanges数据是由PrepRequestProcessor在预处理事务请求头的时候产生的,他们又被谁来消费呢?他们主要做用是什么?

  • 接着就是落实具体的事务操做了,如建立节点、删除节点、设置数据等

来具体看下这个过程:

ZK执行事务过程

这些事务操做分红两种状况,一部分就是针对dataTree的增删改节点,另外一种就是建立session,关闭session。建立和关闭session都是使用sessionTracker来完成,这一部分以前已经详细描述过了。下面具体看下针对dataTree的增删改节点:

DataTree执行事务操做

根据事务请求头的不一样类型,分别执行增删改操做。对于增长节点上面也已经详细描述过了。

接下来就是开始准备返回值,而后响应给客户端。以建立session为例:

响应session的建立

使用sessionTimeout(客户端传递的sessionTimeout和服务器端协商后的),sessionId,根据sessionId获取的密码 这些数据构建一个ConnectResponse,而后进行序列化,传给客户端,并开始接收客户端的请求。

上述是session建立成功的时候。即上图中的valid为true。何时为fasle呢?

当你已经建立session了,可是同服务器的链接断开了,而后拿着该session去从新链接下一台服务器,若是密码是错误的,服务器则会这设置valid为false。若是密码是正确的,可是在于服务器端已经创建TCP链接后,此时该从新激活session了,可是发现该session已通过期了,被服务器端清除了,也会致使valid为false。

一旦valid为false,返回给客户端的sessionTimeout为0,sessionId为0,密码为空。客户端在接收到该数据后,看到sessionTimeout为0,则认为创建session关联失败,发出session过时的异常事件,开始走向死亡,即客户端的ZooKeeper对象不可用,必需要从新建立一个新的ZooKeeper对象。

##3.4 ServerStats介绍

它是用于统计服务器的运行数据的。

ServerStats属性

  • packetsSent:服务器端已发送的数据包
  • packetsReceived:服务器端已接收的数据包
  • maxLatency:处理一次请求的最大延迟
  • minLatency:处理一次请求的最小延迟
  • totalLatency:服务器端处理请求的总延迟
  • count:服务器端已经处理的请求数

ZooKeeperServer会建立一个ServerCnxnFactory,即建立了ServerSocket,等待客户端链接。每来一个客户端的TCP链接,ServerCnxnFactory就会为该链接建立一个ServerCnxn,每一个ServerCnxn也会统计上述信息,即单独针对某个客户端的数据。而ZooKeeperServer则是统计全部客户端的上述数据。

#4 单机版服务器启动概述

上面描述了ZooKeeper服务器的几个重要数据,下面就概述下单机版服务器的服务过程:

输入图片说明

  • 第一步:建立一个ZooKeeperServer,表明着一个服务器对象,同时会建立出ServerStats用于统计服务器运行数据
  • 第二步:根据配置参数dataLogDir和dataDir建立出用于管理事务日志和快照的对象FileTxnSnapLog,用于从磁盘上恢复数据和将内存数据快照到磁盘上、事务请求记录到磁盘上。
  • 第三步:对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout
  • 第四步:建立ServerCnxnFactory,用于建立ServerSocket,等待客户端的socket链接
  • 第五步:启动ZooKeeperServer服务

当客户端第一次TCP链接ZooKeeper服务器的时候:

  • 上述ServerCnxnFactory会为该客户端建立出一个ServerCnxn服务器代理对象,单独用于处理和该客户端的通讯,TCP链接创建成功

  • TCP链接创建成功后,客户端开始发送ConnectRequest请求,申请sessionId,会传递sessionTimeout时间。

  • ServerCnxn接收到该申请后,根据客户端传递过来的sessionTimeout时间以及ZooKeeperServer自己的minSessionTimeout、maxSessionTimeout参数,肯定最终的sessionTimeout时间

  • sessionTimeout肯定好了,就开始判断客户端的请求是否已经含有sessionId,若是含有,则执行sessionId的是否过时、密码是否正确等检查。若是没有sessionId,则会使用sessionTracker分配sessionId,建立一个session。该session含有上述肯定的sessionTimeout信息,即每一个session都有各自的sessionTimeout信息。

  • 以后就是构建一个Request请求,该请求的类型就是建立session。而后将该请求交给请求处理器链来处理。请求处理器链为PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor。

  • 首先是PrepRequestProcessor的处理,它对请求分红事务请求和非事务请求。事务请求即能改变服务器状态数据的一些操做,即node的增删改,session的建立关闭等。为这些事务请求加上事务请求头,后面的请求处理器都是基因而否含有事务请求头来判别该请求是不是事务请求。 同时对事务请求进行一些检查,如session是否过时,session的owner是否一致、节点是否存在等。对于建立session来讲,就检查了下session是否过时。同时会对全部请求都分配zxid。

  • 接下来是SyncRequestProcessor的处理:仅仅对事务请求进行记录到事务日志中,一旦事务请求达到必定数量,就会执行一次对内存DataTree数据和session数据的快照。

  • 最后就是FinalRequestProcessor处理器:真正执行数据操做的地方。如为DataTree添加节点、删除节点、更改数据、查询数据等。同时也负责统计整个服务器端处理过程的最大延迟、最小延迟、总延迟、总处理数,每一个ServerCnxn也针对本身的客户端也进行响应的统计。最后FinalRequestProcessor处理上述事务操做的结果给客户端。如建立session,则是返回sessionTimeout、sessionId、密码数据给客户端。若是建立失败,则返回的sessionTimeout为0,sessionId为0,密码为空。

客户端对服务器端响应的结果的处理:

  • 首先判断服务器端返回的sessionTimeout是否小于等于0

    若是是,则认为申请sessionId失败,向eventThread中添加了两个事件。首先是session过时的事件即KeeperState.Expired,客户端的默认Watcher接收到以后,必须自行采起响应的处理操做(如从新建立一个ZooKeeper对象),由于客户端的ZooKeeper对象即将失效了。第二个事件就是一个死亡事件,eventThread遇到该事件后,就会跳出事件循环,eventThread线程走向结束。

    若是不是,则认为申请sessionId成功。则保存服务器端给出的sessionId、密码、sessionTimeout数据,重置客户端的readTimeout、connectTimeout。而后经过eventThread发送一个成功链接的事件即KeeperState.SyncConnected,客户端在接收到该事件后就能够执行相应的操做了。

相关文章
相关标签/搜索