zookeeper原理解析-客户端与服务器端交互

Zookeeper集群中server数量老是肯定的,因此集群中的server交互采用比较可靠的bio长链接模型;不一样于集群中sever间交互zookeeper客户端其实数量是未知的,为了提升zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。下面咱们主要来说讲zookeeper的服务器端与客户端的交互。读者对nio不了解的话不妨抽点时间去了解下,对于一些nio框架如netty,mina再如一些web容器如tomcat,jetty底层都实现一套nio框架,对于实现nio框架模型你们不妨去谷歌百度搜一下Doug Lea的scalable io in Java 这个ppt。java

 

客户端web

ClientCnxnSocketNIO是zookeeper的nio通信层的客户端部分,下面伪代码示例其核心代码:数组

ClientCnxnSocketNIO{tomcat

      doTransport() {服务器

               if (若是以前链接没有立马连上,则在这里处理OP_CONNECT事件) {并发

                   sendThread.primeConnection();框架

               } else {异步

                    doIO工具

                }性能

 

             //队列中有发送的消息, 开启写

        }

 

       doIO() {

              if (sockKey.isReadable()) {

                    sendThread.readResponse(incomingBuffer);

                    updateLastHeard();

               } 

 

               if(sockKey.isWritable()) {

                      Packetp = outgoingQueue.getFirst() //从发送队列取

                      updateLastSend

                      p.requestHeader.setXid(cnxn.getXid());//设置客户端的xid

                     序列化

                     发送

                     从发送队列删除

                     加入到pendingQueue队列

                }

        }

}

 

ClientCnxn 是客户端操做ClientCnxnSocketNIO的工具,维护了发送任务线程SendThread,事件任务线程EventThead, 发送队列OutgoingQueue以及请求消息的等待队列PendingQueue。下面以伪代码来示例其核心代码

ClientCnxn {

    outgoingQueue//待向服务器端发送的队列, 客户端提交请求放入这个队列

    pendingQueue //发送之后等待响应的队列,

    

    submitRequest(){

       //client端一个封装成一个packet

        outgoingQueue.add(packet);

        selector.wakeup();

        packet.wait(); //若是是同步调用wait,应该反馈后会

    }

 

   SendThread {

       run() {

           1.设置clientCnxnSocket 最后发送时间,最后的心跳时间

           2. if(!clientCnxnSocket.isConnected()) {

                    startConnect()  //主要工做clientCnxnSocket作

              } else {

                    计算下次ping的时间, 发送心跳

                  委托给 clientCnxnSocket.doTranspor进行底层的nio传输

               } 

         }

 

        primeConnection(){

           //构建ConnectRequest

           //组合成通信层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null

            outgoingQueue.addFirst

            clientCnxnSocket.enableReadWriteOnly();//确保读写事件都监听 

        }

 

        readResponse(){

           1.先读响应头,先对特殊的xid进行处理

           2. packet = pendingQueue.remove() //因为client和server都是单线程处理,多队列处理,因此认为全局有序

           3. 反序列化响应体response, 并设置到packet上

           4.finishPacket 1)同步notifyAll,结束 2)异步加入到event线程的队列

        }

    }

 

    EventThread{ //主要支持异步的回调

       run() {

 

        }

    }

}

 

你们观察客户端操做类Zookeeper里面的操做类主要分为两个参数不带callback的同步方法和参数带callback的异步方法。

1.      同步调用方法实现相似Future同步转异步模式实现

1)  Client提交请求对象封装成packet对象放入OutgoingQueue队列,并调用packet.wait()阻塞当前线程。

2)  每一个Client都只有一个SendThread线程是线性处理OutgoingQueue中的请求消息的,SendThread线程经过ClientCnxnSocketNIO工具顺序从OutgoingQueue队列中取请求消息发送到服务器端,同时将请求packet加入到PendingQueue中

3)  ClientCnxnSocketNIO工具接收处理服务器端响应

4)  从PendingQueue队列取出对应的packet,并调packet.notifyAll()唤醒阻塞的线程完成同步调用

2.      异步调用的整体流程跟同步相似关键区别在于

1)  向OutgoingQueue队列提交请求后,不会调用packet.wait()阻塞当前线程,主流程继续执行

2)  同同步调用

3)  同同步调用

4)  从PendingQueue队列取出对应的packet,并调packet.callback方法完成回调处理

 

 

 

Zookeeper服务器端

NIOServerCnxnFactory工厂类,zookeeperserver用来启动监听客户端链接,每当有客户端请求链接进来,NIOServerCnxnFactory都会为这个连接构建NIOServerCnxn 实例来单独处理与这个客户端的交互

NIOServerCnxn封装了处理读取客户端请求数据与及向客户端响应数据

下面经过伪代码来实例:

NIOServerCnxnFactory  {

   configure {

        绑定端口

        做为server监听

        注册selectkey 的链接时间

    }

 

    run {  //起到accept的做用

       1. OP_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被读写事件使用

       2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,并调doIo

    }

}

 

 

NIOServerCnxn {

 

    构造器 {

        //设置selectkey对read感兴趣

    }

 

    doIo {

        if(k.isReadable()) {

             1. 读前四个字节, 表明请求内容长度,不包括本身的4字节

             2. 读取到字节数组中

             3. zkServer.processPacket()或者zkServer.processConnectRequest()

        }

 

        if(k.isWritable()) {

              1.从outgoingBuffers取ByteBuffer

              2.发送bytes

        }

    }

}

相关文章
相关标签/搜索