手把手教你用netty撸一个ZkClient

原文地址: https://juejin.im/post/5dd296c0e51d4508182449a6html

前言

有这个想法的原因是前一阵子突发奇想, 想尝试能不能直接利用js链接到zookeeper, 从而获取到dubbo的注册信息.java

后来一番查找资料后, 发现因为纯js不支持tcp socket通信, 因此纯js是没法实现的. 可是发现有些大神却使用nodeJs实现zk的客户端. 这就成功地激起了个人兴趣. 简单地研究了一下zk通讯协议后, 我开始尝试徒手撸一个zk的客户端.固然是用java实现node

构思

zookeeper的通讯协议是一种典型的"header/content"结构, 在header里面指定了content的字节数, content就是具体的报文数据.mysql

既然是header/content结构, 那么很容易就能想到利用netty的LengthFieldPrepender来进行编码, 以及利用LengthFieldBasedFrameDecoder来进行解码. 有了netty这一大神器, 作什么事情都能事半功倍.所以决定了使用netty来进行开发.git

客户端选型决定好了以后, 还得须要有个服务端来进行调试. 从协议上看, zk不一样的版本之间应该不会存在太多的兼容性问题, 可是差别确定是存在的. 因此为了方便起见,咱们这里限定了服务端的zookeeper的版本是3.4.12, 更高的版本或更低的版本没有作过严格的兼容测试.github

备注: 为了简化工做量, 除了版本外, 该客户端也只在单机模式测试过, 并无验证过在集群模式上是否能跑通.[捂脸]redis

准备工做

stat path [watch]
    set path data [version]
    ls path [watch]
    delquota [-n|-b] path
    ls2 path [watch]
    setAcl path acl
    setquota -n|-b val path
    history 
    redo cmdno
    printwatches on|off
    delete path [version]
    sync path
    listquota path
    rmr path
    get path [watch]
    create [-s] [-e] path data acl
    addauth scheme auth
    quit 
    getAcl path
    close 
    connect host:port

如上面列表所示, zkCli为咱们提供了不少命令, 本文咱们将实现三个具备表明性的命令:sql

1. connect host:port

这个命令实际上是用来跟服务端创建会话的.
  为了不跟socket创建tcp链接的connect方法相混淆, 我更愿意把它称做"login", 
  因此在实现的时候, 它对应的方法名也就是login

2. create [-s] [-e] path data acl

这个命令是用来建立zk的node节点的.
   其中包括持久性节点, 持久性顺序节点, 临时性节点, 临时性顺序节点等,
   还能够指定ACL权限

3. ls path [watch]

ls命令就是列举zk某个路径下的全部子路径,
   在具体实现里, 我把这个命令叫作getChildren

在zookeep的通讯协议里面, connect命令(login)是其余全部命令的必要前置条件. 由于做为一个客户端, 你必须跟服务端创建了会话以后,下面的命令请求才能被服务端接受和处理.数组

而除了connect命令以外, 其余的全部的命令其实都是截然不同的. 所以你会发现, 理解了create命令和ls命令以后, 再实现其余命令也是很简单的事情的, 只须要了解它们的通讯协议,其余的都是照葫芦画瓢的事情了.缓存

固然了解它们的通讯协议并非个简单的事情, 并且每个命令的报文结构都不大相同. 实际上在码代码的时候, 百分之七八十的精力基本都耗在了理解每一个命令的报文结构上面.

代码实现

来看具体实现以前, 先来看一下项目的总结结构:

image

1. bean包
     封装了每一个命令须要的字段参数, 在序列化报文时只须要序列化对应的bean便可. 一样, 在服务端返回内容时, 也只须要把报文序列化成对应的对象便可.
  2. factories包
      上面提到过, zk的每一个命令的报文结构都是不同的,因此在序列化和反序列化时, 对应到netty的codec也是不同的.这个实现了一个codec静态方法工厂, 须要的时候直接从codec工厂拿对应的codec便可.
  3. registrys包
     其实就是一个缓存中心, 缓存了每一个命令对应的requestId和codec, 在服务端返回时, 从这个缓存中心根据requestId拿到对应codec来进行反序列化
  4. utils包
     一些工具类, 不须要多解释
  
  5. zkcodec包
     每一个命令对应的codec和handler实现
  6. NettyZkClient类
     就是本文要介绍的zk客户端了
  7. test    
     为了方便调试准备的单元测试, 先了解代码实现原理的可用直接从这个单元测试入手

看完代码结构后, 咱们再来看每一个命令的具体实现.

login命令

首先来看一下login命令的通讯报文结构体, 以下图所示:

image

简单介绍一下每一个字段的含义, 具体的含义你们能够在网上搜索作更深刻的了解:

1. header
    上面提到, zk的每一个报文都是header/content模式, 其中header占用4个字节, 表示接下来的content的长度(字节数)
 2. protocolVersion
    顾名思义, 这个字段表示协议的版本号,用4个字节表示. 这里咱们写死为0便可(好浪费~~~)
 3. lastZxidSeen
    等下咱们会看到, zk服务端每次的响应都会返回一个zxid.顾名思义, 这个结构就是表示客户端接收到最新的zxid.用8个字节表示. 因为login通常都是第一次跟服务端通信, 因此这里也是写死为0便可
 4. timeout 
    login请求的目的是为了跟zk服务端创建会话, 这个参数表示的是会话的有效时间, 用四个字节表示
 5. senssionId
    会话ID, 用8个字节表示, 用于咱们尚未创建会话,因此这个也是直接写死为0便可
 6. passwordLen
    用4个字节来表示接下来的密码的字节数
 7. password
    passwordLen个字节的密码, 用bytes[]表示
 8. readOnly
    boolean类型的,因此用一个字节表示便可

知道了报文结构以后, 咱们就能够开始写代码了.
首先定义一个ZkLoginRequest的bean.在java里面, 8个字节能够用long类型表示, 4个字节能够用int表示, String类型能够很简单地转换成byte数组. 因此最后定义的bean以下所示:

public class ZkLoginRequest implements Serializable {
  private Integer protocolVersion;
  private Long lastZxidSeen;
  private int timeout;
  private Long sessionId;
  private String password;
  private boolean readOnly = true;
    
}

由于zk的通信是基于字节的, 因此咱们仅仅定义java对象是不行的, 还须要把java对象转换成字节, 才能发送服务端.并且服务端只会接收header/content形式的报文, 因此咱们还得计算出整个java对象序列化以后的字节数, 把它赋值到header中去.

幸运的是, 这两个工做netty都为咱们提供了很好的工具, 咱们直接使用就能够了.

实现ZkLoginCodec把java对象转换成ByteBuf

ZkLoginCodec包括encode和decode两部分, 其中decode是用于解码服务端的响应的, encode是用于编码发送请求的, 以下所示, 把ZkLoginRequest转换成netty的ByteBuf

@Override
 protected void encode(ChannelHandlerContext ctx, ZkLoginRequest msg, ByteBuf outByteBuf) throws Exception {
     outByteBuf.writeInt(msg.getProtocolVersion());
     outByteBuf.writeLong(msg.getLastZxidSeen());
     outByteBuf.writeInt(msg.getTimeout());
     outByteBuf.writeLong(msg.getSessionId());
     String passWord = msg.getPassword();
     SerializeUtils.writeStringToBuffer(passWord, outByteBuf);
     outByteBuf.writeBoolean(msg.isReadOnly());
 }
直接使用netty内置的LengthFieldPrepender

netty内置的LengthFieldPrepender能够把报文转换成header/content形式的结构, 其中构造参数表示header所占的字节数, 咱们这里是4个字节, 因此是4.

// 编码器, 给报文加上一个4个字节大小的HEADER
   nioSocketChannel.pipeline().addLast(new LengthFieldPrepender(4))

ZkLoginRequest对象通过这两个codec编码以后,zk服务端就能正确解析它的报文了, 如无心外的话, 服务端会针对咱们的这个socket创建会话, 而后给咱们响应一个报文, 报文中会包含sessionId. 响应的结构体以下所示:

image

能够看到, 响应报文跟咱们的请求报文是差很少的,除了sessionId, 其余的基本是原封不动地给咱们返回来了. 因此咱们这里对响应报文含义很少作解释. 直接来看看如何解析服务端返回的响应报文.

从上图咱们能够看到, 返回的报文也是header/content形式, 因此咱们仍是可使用netty内置的解码器来获取header和content.

使用LengthFieldBasedFrameDecoder跳过header

不熟悉LengthFieldBasedFrameDecoder的同窗能够先看看netty的官网, 这里不对这个类的参数作过多解释, 只须要知道咱们是想跳过header,只获取响应报文的content部分便可

nioSocketChannel.pipeline()
                        // 解码器, 将HEADER-CONTENT格式的报文解析成只包含CONTENT
        .addLast(ZkLoginHandler.LOGIN_LENGTH_FIELD_BASED_FRAME_DECODER,new LengthFieldBasedFrameDecoder(2048, 0, 4, 0, 4))
ZkLoginCodec把content反序列成ZkLoginResp

也就是ZkLoginCodec的decode部分

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ZkLoginResp zkLoginResp = new ZkLoginResp();
        zkLoginResp.setProtocolVersion(in.readInt());
        zkLoginResp.setTimeout(in.readInt());
        zkLoginResp.setSessionId(in.readLong());
        String password = SerializeUtils.readStringToBuffer(in);
        zkLoginResp.setPassword(password);
        zkLoginResp.setReadOnly(in.readBoolean());
        out.add(zkLoginResp);
    }

完成这一步后, 咱们的客户端就成功地跟服务端创建了会话了, 后面就能够愉快地发送其余请求了

create命令

opCode和requestHeader

在开始实现create命令以前, 先来了解一下两个术语,这两个术语不单是create命令须要用到的, 等下要实现的getChildren命令也一样须要用到.

  1. opCode

zk的服务端跟客户端约定好了, 每个命令都对应一个opCode, 客户端发送命令请求时必须带上这个opCode, 服务端才能知道如何去处理这个命令. 例如create命令对应的opCode是1, 等下要实现的getChildren命令的opCode是8

  1. requestHeader

这个header不要跟header/content中的header混淆了. requestHeader仍是content的一部分, 它包含了两个字段, 每一个字段占4个字节, 以下图:

image

1. xid, 通俗点理解的就是requestId, 客户端维护这个xid的惟一性, 服务端返回响应时会原封不动的返回这个xid,
   这样客户端就能够知道服务端返回的报文时对应哪一个请求的了.毕竟socket通信都是异步的.
   
   2. type
      这个更好理解, 就是上面的opcode
create命令的报文结构

说真的, create命令的报文有一丁点复杂, 因此
在看createRequest的报文结构以前,还得先了解另一个概念: ACL权限;

zookeeper的ACL权限涉及到如下3个点:

  1. scheme 身份的认证有4种方式:
  2. id 受权的对象
  3. permission 权限

具体能够看这篇博客, 说得比较清楚.

在本文中咱们写死了scheme是"world", id是"anyone", permission是31(转成二进制即11111,拥有CREATE、READ、WRITE、DELETE、ADMIN五种权限)

zk中的ACL的报文结构以下所示:

image

1. perms是permission的简写, 4个字节表示
  2. 由于scheme是用字符串表示的, 因此须要用4个字节表示scheme字符串的长度, 用schemelen个字节表示scheme
  3. id也是用字符串表示的, 跟scheme同理

了解了requestHeader和ACL的结构体以后, createRequest的报文结构也就比较好理解了, 以下图所示:

image

1. requestHeader
   包含了xid和type
2. pathLen
   要建立的path的字符串长度
3. path
   要建立的path, 例如你想在zk上建立一个/dubbo节点, path就是"/path"
4. dataLen
   要建立的path下的data的长度
5. data
   要建立的path下的数据, 例如"192.168.99.101:2181"
6. aclListSize
   zk的ACL权限是list形式的,表示不一样的权限控制维度
7. aclList
   aclListSize个ACL结构体
8. flags
   该节点的类型, 能够是持久性节点, 持久性顺序节点, 临时节点, 临时顺序节点4种

接下来的工做就是login差很少了:

  1. 实现一个ZkCreateCodec把ZkCreateRequest转化成ByteBuf
  2. 利用LengthFieldPrepender把ByteBuf构建成Header/content的报文结构
  3. 利用LengthFieldBasedFrameDecoder从服务端的响应解析出content的内容
  4. 把content内容的ByteBuf转换成ZkCreateResponse

其中createRes的报文结构以下图所示:

image

1. xid
   这个就是createReq中requestHeader的xid
2. zxid
   这个能够跟login报文中的lastZxidSeen关联起来, 能够理解为服务端的xid
3. errcode
   errcode为0表示正常响应, 若是不为0,说明有异常出现, 后面的path字段也会为空
4. pathLen和path
   其实也是createReuest中的path和pathLen

3. getChildren命令(ls path)

若是上面的create命令能理解的话, 那getChildren命令也就很容易理解了, 二者只有报文结构不同,于是codec也会有一点点不一样.

getChildrenRequest的报文结构:

image

响应的结构体:

image

运行代码

要想快速地体验一下这个简陋的zkClient的话, 能够直接从单元测试入手:

public class ZkClientTest {
    @Test
    public void testZkClient() throws Exception {
        // NettyZkClient的构造方法里面会调用login() 跟服务端创建会话
        NettyZkClient nettyZkClient = new NettyZkClient(30000);

        // 建立一个临时顺序节点
        ZkCreateResponse createResponse = nettyZkClient.create("/as", 12312, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(new Gson().toJson(createResponse));

        // 获取/下的全部子路径
        List<String> list =  nettyZkClient.getChildren("/");
        System.out.println(new Gson().toJson(list));

    }
}

实现新的命令

由于不一样的命令之间的逻辑大多数是相同的, 所以我已经把一些通用的逻辑抽象了出来, 若是想要实现其余命令的话, 只须要作几步简单的工做. 例如,我想实现一个get path命令, 那么只须要:

  1. 查找文档,肯定get命令的报文结构, 这一步是最麻烦的
  2. 建立GetRequest类, 继承RequestHeader类, 实现ZkRequest接口
  3. 建立GetResp类, 继承AbstractZkResonse类, 实现ZkResponse
  4. 编写GetRequestCodec, 实现ByteBuf和GetRequest, ZkResponse的转换
  5. 修改ZkCodecFactories类, 把GetRequest和GetRequestCodec关联起来

这样就能够实现了一个新的命令.固然还得必须再提一下, 第一步是最麻烦的, 可能要花掉百分之七八十的工做量.

说到这, 可能会有人问, 去哪里了解每一个命令的报文结构呢? 方法其实有不少的, 可能官方文档就有(然而我暂时没找到). 个人办法是最简单的, 就是阅读现有ZkClient的代码. 可是现有的zkClient并不能很是直观地体现出来, 还得结合调试代码, 阅读服务端代码(解析报文),抓包等等方法.

源码

撸完了这个zkClient客户端后, 发现不够过瘾,因此后来又撸了一个redis客户端和kafka的producer/consumer.

撸完后, 发现只要了解通讯协议, netty基本上能够实现任何C/S架构的客户端. 所以把这几个客户端整理到了一块儿,放到了github上面.

后面还想继续实现elastic-search, mysql, dubbo等等客户端(其实调研过了是可行的, 只是尚未精力去实现)

最后附上github源码地址:

https://github.com/NorthWard/awesome-netty

感兴趣的同窗能够参考一下,共同窗习共同进步.

相关文章
相关标签/搜索