ZooKeeper系列之(十一):客户端实现机制

笔者为何要讲ZooKeeper的源码,对于程序员来讲,光知道用是成为不了优秀的行家的,还要知道因此然,只有知道了内部实现机制,才能开拓眼界提升自我。而笔者认为ZooKeeper是最好的入门分布式系统的敲门砖。node

好很少说,咱们这里先看看客户端是怎么运转的。程序员

一、概述算法

ZooKeeper客户端是链接到服务端集群,获取zk节点数据,监听zk节点数据变化的。zk节点就是znode,它是相似文件路径的东西,每一个znode能够设置它的文本内容,当znode的文本内容被其余客户端修改后,全部监听该znode的客户端都会实时被通知到,这样的方式实现了分布式一致性存储。缓存

在客户端里有一个ZooKeeper类,注意这里特指类名称。客户端经过Zookeeper类来发送命令给Zookeeper服务器。服务器

ZooKeeper类中还能够设置Watcher,这就是znode监听者。Watcher能够指定监听哪一个znode,当Zookeeper集群的znode节点状态发生变化时,服务端会发送通知消息给客户端的Watcher。网络

Watcher又能够细分为3种Watcher子类:DataWatcher,ExistWatcher和ChildWatcher。根据字面意思就能猜得出来,DataWatcher是znode的数据变化时触发,ExistWatcher是znode的建立删除时触发,ChildWatcher是在znode下建立子目录(也是znode)时触发。实际生产环境中用的最多的仍是DataWatcher。session

下面咱们先分析分析ZooKeeper类的实现,至于Watcher的实现后面会有专门介绍。异步

二、通讯机制分布式

客户端与服务端交互的数据流大体以下:ide

 

  1. 首先是客户端ZooKeeper类发起命令请求,而后经过ClientCntx发送给服务端集群。ClientCnxn是上层类,屏蔽了具体的网络通讯流程,网络经过是ClientCntxSocketNetty实现的,服务端是ZooKeeperServer。
  2. 以create命令(建立znode)为例,ZooKeeper类会构造Packet,将请求数据封装在Packet里。而后调用ClientCnxn的submitRequest方法。ClientCnxn的submitRequest方法调用queuePacket方法将Packet放入outgoingQueue队列中,而后线程执行wait方法挂起等待服务端返回。
  3. ClientCnxnSocketNetty和ClientCnxn共享同一个outgoingQueue,ClientCnxnSocketNetty启动了发送守护进程,当outgoingQueue队列中有Packet时,会自动将该Packet发送给ZooKeeperServer。同时ClientCnxnSocketNetty启动接收线程实时接收ZooKeeperServer的返回数据,返回数据触发ClientCnxnSocketNetty中启动的ZKClientHandler的MessageReceived事件。
  4. 在MessageReceived事件中回调ClientCnxn中的SendThread类的readResponse方法。
  5. readResponse方法中最后调用finishPacket方法唤醒在该Packet上wait的线程,也就是发起submitRequest的方法,使得submitRequest方法返回到ZooKeeper类。
  6. 客户端请求过程结束。

三、ZooKeeper类

客户端在构造函数阶段建立ClientCnxn 与服务端链接,后续命令都经过ClientCntx发送给服务端。ClientCnxn是客户端与服务端通讯的底层接口,它和ClientCnxnSocketNetty一块儿工做提供网络通讯服务。

服务端是ZooKeeperServer类,收到ClientCnxn的Request请求包后调用相应的处理逻辑,返回结果再经过ClientCnxc发送给客户端。

ClientCntx链接时能够同时指定多台服务器地址,根据必定的算法挑选某一个服务器地址进行链接,当某个服务器发生故障没法链接时,会自动链接其余的服务器。实现这一机制的是HostProdiver接口,实现用StaticHostProvider类。

ZooKeeper类的构造函数以下:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        hostProvider = aHostProvider;
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
 }

这里的connectString是链接字符串,aHostProvider是管理服务端列表的。watcher是监听器。

为何有aHostProvider?客户端能够配置多个服务端地址,这样当某个服务端挂掉的时候,客户端会自动尝试链接其余的服务端,实现分布式可靠性。

建立了ZooKeeper对象后就能够调用具体的读写数据的方法了,下面列举常见方法的实现机制。

create方法根据输入参数构造出CreateRequest包,而后经过submitRequest方法传递给服务端,submitRequest方法将CreateRequest转换成Packet包并调用sendPacket方法将发送包放入队列,等待发送线程发送给服务端。

服务端响应完成后会将返回结果填充到CreateResponse实体中返回给客户端。

四、发送命令

咱们选取getData方法,来看看客户端的内部机制,其余命令的处理过程是相似的,不一样的只是命令类型不一样而已。

getData方法从服务端读取znode的数据,入参同时包括watcher,这样在znode数据被其余客户端修改后,会实时回调watcher来使得全部客户端同步本次变化。

先给出getData的代码:

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)  {
     final String clientPath = path;
     PathUtils.validatePath(clientPath);
     WatchRegistration wcb = null;
     if (watcher != null) {
         wcb = new DataWatchRegistration(watcher, clientPath);
     }
     final String serverPath = prependChroot(clientPath);
     RequestHeader h = new RequestHeader();
     h.setType(ZooDefs.OpCode.getData);
     GetDataRequest request = new GetDataRequest();
     request.setPath(serverPath);
     request.setWatch(watcher != null);
     GetDataResponse response = new GetDataResponse();
     cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
}

这里干了几件事情呢?主要干了两件事。

(1)注册watcher,这个很好理解,至于watcher的细节会在其余文章里专栏叙说。

(2)构建完整的znode的路径名,从根目录开始。而后将znode的路径名和GetDataRequest类型打包放到ClientCnxn的发送队列里,等待排队发往服务端。

其余命令的处理过程是相似的,不一样的只是命令类型不一样而已,对应到代码里是不一样的Request对象。getData命令对应GetDataRequest类;Exists方法对应ExistsRequest类。他们的父类倒是同一个。ZooKeeper支持的Request类主要有如下这些:

  1. create:CreateRequest
  2. delete:DeleteRequest
  3. exists:ExistsRequest
  4. getData:GetDataRequest
  5. setData:SetDataRequest
  6. getChildren:GetChildrenRequest

对于create命令来讲,和GetData有一点不一样。不一样点在于如下两点:

(1)create命令是当即返回结果的,而getData等其余命令是异步返回结果的。getData入参里的DataCallback参数就是异步回调处理方法。

(2)create是调用ClientCnxn的submitRequest方法启动发送命令过程,而getData等其余方法是调用ClientCnxn的queuePacket方法将请求命令缓存在队列里,等待发送线程异步发送。

 

五、ClientCnxn

前面咱们看到ZooKeeper类的命令发送都是经过ClientCnxn类实现的。这里就谈谈ClientCnxn类干了哪些活。

Clientcnxn将客户端请求加入发送队列,等待sendThread发送。eventThread负责处理Server返回的WatchedEvent,以及回调注册的客户端事件接口处理函数。

5.1 queuePacket

这是ClientCnxn里最重要的一个方法,它将请求包放入发送队列outgoingQueue中,等待发送线程发送给服务端。

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
       Record response, AsyncCallback cb, String clientPath,
       String serverPath, Object ctx, WatchRegistration atchRegistration,
            WatchDeregistration watchDeregistration) {
        Packet packet = null;
        packet = new Packet(h, r, request, response, watchRegistration);        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
       
        synchronized (state) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                     if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
}

最后告诉SendThread数据已经放好了,至于什么时候发送就等SendThread本身来决定了。

5.2 submitRequest

提交客户端请求到服务端,这是当即返回的方法,若是请求包没处理完则一直等待下去。submitRequest方法主要用在create命令。

ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,  null, watchRegistration, watchDeregistration);
synchronized (packet) {
     while (!packet.finished) {
           packet.wait();
     }
}
return r;

5.3 sendPacket

sendPacket构建Packet,而后调用发送线程SendThread里的同名sendPacket方法来发送数据到服务端。

public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)   throws IOException {
        int xid = getXid();
        RequestHeader h = new RequestHeader();
        h.setXid(xid);
        h.setType(opCode);
        ReplyHeader r = new ReplyHeader();
        r.setXid(xid);
        Packet p = new Packet(h, r, request, response, null, false);
        p.cb = cb;
        sendThread.sendPacket(p);
 }

5.4 finishPacket

该方法在接收到服务端的响应时,唤醒等待响应的客户端线程,经过调用Packet的notifyAll方法来唤醒wait在该Packet上的线程。

若是客户端请求指定了Watcher,则同时生成WatchedEvent事件并放入事件队列,等待eventThread线程处理。

代码片断:

private void finishPacket(Packet p) {
    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
     } else {
        p.finished = true;
        eventThread.queuePacket(p);
     }
}

5.5 readResponse

ClientCnxnSocketNetty收到服务端响应后触发ZKClientHandler的messageReceived事件,在该事件处理逻辑中调用sendThread的readResponse方法获取服务端响应。

若是服务端响应的是WatchedEvent事件,则将事件放入eventThread中等候调度执行事件方法。

若是服务端响应的是客户端命令结果,则将Packet从发送队列删除,最后调用CientCnxn的finishPacket方法完成最后的处理,finishPacket方法在前面已经讲过了。

readResponse的主要代码以下:

void readResponse(ByteBuffer incomingBuffer) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
     BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
     ReplyHeader replyHdr = new ReplyHeader();
     replyHdr.deserialize(bbia, "header");
     if (replyHdr.getXid() == -1) {
          // -1 means notification
           WatcherEvent event = new WatcherEvent();
           event.deserialize(bbia, "response");
        // convert from a server path to a client path
           if (chrootPath != null) {
                String serverPath = event.getPath();
                if(serverPath.compareTo(chrootPath)==0)
                    event.setPath("/");
                else if (serverPath.length() > chrootPath.length())
             event.setPath(serverPath.substring(chrootPath.length()));               
            }
           WatchedEvent we = new WatchedEvent(event);          
           eventThread.queueEvent( we );
           return;
     }
     Packet packet;
     try {        
           packet.replyHeader.setXid(replyHdr.getXid());
           packet.replyHeader.setErr(replyHdr.getErr());
           packet.replyHeader.setZxid(replyHdr.getZxid());
           if (replyHdr.getZxid() > 0) {
                 lastZxid = replyHdr.getZxid();
            }           
      
      } finally {
            finishPacket(packet);
      }
 }
相关文章
相关标签/搜索