手把手教你用netty徒手撸一个RedisClient

手把手教你用netty撸一个ZkClientjava

redis通信协议(RESP )是什么git

上次撸完一个ZkClient以后(手把手教你用netty撸一个ZkClient), 忽然想起我以前写过一篇redis通信协议的文章(redis通信协议(RESP )是什么). 既然通信协议都弄清楚了, 那么撸一个redis的客户端是否是也是手到擒来?github

说干就干, 今天咱们就来手动撸一个redis的客户端redis

redis key的类型

你们都知道, redis中无论key也好, value也好, 真正存到内容中去时都是byte数组. 可是对于调用方来讲(以java来举例), redis的key类型实际上是有两种的:数组

一种是String类型的key, 另一种是Object类型的key.bash

jedis应该是java世界里应用最广的redis客户端了. 它虽然是同时支持String类型和Object类型的key. 可是它在把数据发往服务器以前, 都是先把String类型或者Object类型的key序列化成了byte数组后,再发起请求.服务器

然而正如我在redis通信协议(RESP )是什么那篇文章里实验过的, 其实直接把RESP格式的字符串发给redis服务器,它也是能处理的,并且返回的也是RESP格式的字符串.网络

所以在本文的实现里面, 我把String类型和Object类型(其实最终都是byte[])的客户端分开来实现, 分别是RedisStringClient和RedisBinaryClient.socket

RedisStringClient和RedisBinaryClient的处理逻辑基本一致, 惟一的区别是一个是RESP格式的字符串, 一个是RESP格式的byte数组.ide

RedisStringClient的继承结构

image

RedisBinaryClient的继承结构

image

能够看到, 主要的业务逻辑都是在AbstractRedisClient中

public abstract class AbstractRedisClient<T> implements RedisClient<T> {
    private ConnectionPool<T> connectionPool;

    protected AbstractRedisClient(ClientType clientType){
          connectionPool = new ConnectionPool<>(clientType);
    }


    protected <RETURN> RETURN invokeCmd(Cmd<T> cmd, CmdResp<T, RETURN> cmdResp) throws FailedToGetConnectionException{
        RedisConnection<T> connection = null;
        try{
            T data = cmd.build();
            // 从链接池中borrow链接
            connection = connectionPool.borrowConnection();
            if(connectionPool.checkChannel(connection)){
                // 要锁定这个链接
                connection.lock();
                try{
                    // 发送命令
                    connection.writeAndFlush(data).sync();
                    // 获取命令的返回结果
                    return cmdResp.parseResp(connection.getResp(RedisConfig.TIME_OUT_MS));
                }catch (Exception e){
                   e.printStackTrace();
                }finally {
                    // 释放锁
                    connection.unlock();
                }
            }else{
                throw new FailedToGetConnectionException("can not get connection form connection pool");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(connectionPool.checkChannel(connection)){
                // 把链接放回到链接池
                connectionPool.returnConnection(connection);
            }
        }
        return null;
    }
}
复制代码

因为RedisStringClient更容易理解和表述, 因此本文主要介绍RedisStringClient(可是其实介绍过程,RedisBinaryClient的逻辑也差很少全提到了的)

Cmd 和 CmdResp

AbstractRedisClient类的逻辑其实也不复杂:

  1. 调用Cmd类的build方法构建RESP报文
  2. 把RESP报文发送给redis服务端, 等待响应
  3. redis返回后,调用cmdResp.parseResp()方法解析返回的内容.

因此实际上主要的逻辑仍是在Cmd和CmdResp两个接口上.

Cmd

Cmd接口用来抽象一个redis命令, 例如set, get等.它只有一个方法:

public interface Cmd<PT> {
    /**
     * 构建RESP 协议
     * @return
     */
    PT build();
}
复制代码

Cmd接口下面有一个抽象类AbstractCmd, 它进一步丰富Cmd接口:

public abstract class AbstractCmd<T> implements Cmd<T> {
  /**
   * 具体是什么命令, 例如get set等待
   */
  protected String cmd;
  /**
   * 这个命令后面的参数
   */
  protected List<T> paramList;
  /**
   *  redis命令
   * @return
   */
  protected abstract String getCmd();

}
复制代码

举个例子说, 若是咱们想实现 set key val, 这个命令, 那么cmd就是"set", "key"和"val"都是 paramList的元素

CmdResp

CmdResp用来抽象解析redis返回报文的逻辑, 它没有抽象类,由具体的类(例如SetStringCmd)直接实现该接口

public interface CmdResp<PARAM,RETURN> {
   /**
    * 解析redis的resp
    * @param resp
    * @return
    */
   RETURN parseResp(PARAM resp);
}
复制代码

CmdResp有两个泛型参数PARAM和RETURN:

PARAM其实就是redis返回的RESP的类型,若是是RedisStringClient的话, 那么redis会返回String类型的RESP报文. 若是是RedisBinaryClient的话, 那么redis会返回byte数组类型的RESP报文

RETURN实际上是咱们最近想解析出来的数据类型. 这个是根据命令的不一样而不一样的, 例如set命令, 是须要返回true 或false的, 因此它Boolean. 而其余命令可能就不同了,是String或Integer等等

SET命令

先来看一下redis的set命令的参数是怎么样:

set key value [EX seconds] [PX milliseconds] [NX|XX]

复制代码

能够看到set的cmd就是"set", 有两个必选参数key和value, 以及三个可选参数. 所以, 能够抽象出一个AbstractSetCmd类:

/**
 *   set命令的抽象类
 *   命令参数
 *  set key value [EX seconds] [PX milliseconds] [NX|XX]
 * @author laihaohua
 */
public abstract class AbstractSetCmd<T> extends AbstractCmd<T> {

    public AbstractSetCmd(T  key, T  value, T expireMode, T expireTime, T xmode){
        super();
        super.paramList = new ArrayList<>();
        paramList.add(key);
        paramList.add(value);
        // 设置了过时时间
        if(expireMode != null){
            paramList.add(expireMode);
            paramList.add(expireTime);
        }
        // 设置了 XX或NX
        if(xmode != null){
            paramList.add(xmode);
        }
    }

    @Override
    protected String getCmd() {
        return super.cmd = "set";
    }
}
复制代码

该抽象类能够有String 和 Binary两种实现. 咱们来看看String的实现SetStringCmd

**
 *  命令参数
 *  set key value [EX seconds] [PX milliseconds] [NX|XX]
 * @author laihaohua
 */
public class SetStringCmd extends AbstractSetCmd<String>  implements CmdResp<String, Boolean> {
    /**
     * 没有过时时间
     * @param key
     * @param value
     */
    public SetStringCmd(String key, String value){
          this(key, value, null, 0, null);
    }

    /**
     *
     * @param key
     * @param value
     * @param expireMode
     * @param expireTime
     */
    public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime){
        this(key, value, expireMode, expireTime, null);

    }
    public SetStringCmd(String key, String value, Xmode xmode){
        this(key, value, null, 0, xmode);
    }
    public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime, Xmode xmode){
        super( key,
               value ,
               expireMode == null ? null : expireMode.getType(),
               String.valueOf(expireTime),
               xmode == null ? null : xmode.getType() );
    }


    /**
     * 构建请求参数RESP
     * @return
     */
    @Override
    public String build() {
        return CmdBuildUtils.buildString(getCmd(), paramList);
    }

    /**
     * 解析redis返回的RESP
     * @param resp
     * @return
     */
    @Override
    public Boolean parseResp(String resp) {
        char ch = resp.charAt(0);
        // 通常返回 +OK 就是成功
        if(ch == SymbolUtils.OK_PLUS.charAt(0)){
              return true;
        }
        // 其余的都是失败
        return false;
    }
}
复制代码

能够看到该类的实现也是很是的简洁的(除了构造方法有点多).

CmdBuildUtils.buildString其实就是把cmd和paramList按顺序拼接成RESP格式.

parseResp就更简洁了.根据RESP协议, 返回"+OK"的就是成功, 不然就是失败

到这里, 一个redis命令就完成了, 是否是很是的简单? 其实GET命令更简单, 由于它只有一个参数.有兴趣的能够到个人github看看GET命令的实现, 这里就再也不累赘了.

最后附上它的类继承结构

image

redis 链接池

有些心细的同窗可能已经发现了, redis的RESP里面并无像ZkClient那样有一个xid,那么当一个客户端收到一个redis响应的时候, 它怎么知道是哪次请求的响应呢.

理论上说, redis是单线程模型,处理顺序确定是按照接受到的请求的顺序的,因此本地把请求队列化感受就能够解决问题了.然而因为网络延时的存在, 服务端接受到的请求的顺序, 实际上是有可能跟发送的顺序不同的.

因此我这里换了一种方式实现(其实也就是jedis的实现方式), 使用链接池.

所谓的链接,其实就是对应netty里面的一个channel. 所谓的链接池, 其实就是在client初始化的时候, 一次建立批量的channel. 而后在发送一个命令以前, 须要向链接池获取链接, 获取到链接后,把这个链接锁定,发送请求,接受响应,再释放锁, 把链接归还给链接池. 这样就能够有效的 解决了上面所说的问题.

redis链接池涉及到两个类:RedisConnection和ConnectionPool

RedisConnection

RedisConnection是一个假的代理类, 它内部持有一个NioSocketChannel对象, 并代理里NioSocketChannel对象的writeAndFlush(msg),disconnect(),close(),isActive()四个方法:

public class RedisConnection<T>{
    private NioSocketChannel socketChannel;
    private Lock lock = new ReentrantLock();
    private SynchronousQueue<T> synchronousQueue;
    private String name;
    public RedisConnection(String name, NioSocketChannel socketChannel, SynchronousQueue<T> synchronousQueue){
        this.name = name;
           this.socketChannel = socketChannel;
           this.synchronousQueue = synchronousQueue;
    }
    public void cleanUp(){
    }
    public T getResp(long timeout) throws InterruptedException {
        return synchronousQueue.poll(timeout, TimeUnit.MILLISECONDS);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void lock() {
        lock.lock();
    }
    public void unlock() {
         lock.unlock();
    }

    /***********************代理channel的几个方法*******************************/
    public ChannelFuture writeAndFlush(Object msg) {
        return socketChannel.writeAndFlush(msg);
    }

    public void disconnect() {
        socketChannel.disconnect();
    }

    public void close() {
        socketChannel.close();
    }

    public boolean isActive() {
       return socketChannel.isActive();
    }
}
复制代码

除了代理channel的方法以外, RedisConnection也用于本身的属性和方法:

1. name 
     链接的名称
  2. lock
     用于锁定该链接
  3. synchronousQueue
     一个同步队列, 用于从netty中的handler中获取redis的返回
  4. public T getResp(long timeout) 
     从synchronousQueue获取redis的返回内容,在AbstractRedisClient中被调用
复制代码

ConnectionPool

ConnectionPool相对就比较简单了, 它只作了4件事:

  1. 初始化时, 生成必定量的RedisConnection对象
  2. 提供检测RedisConnection对象是否活跃的方法
  3. 提供borrowConnection方法
  4. 提供returnConnection方法

就这样, 一个简陋的链接池就完成了.

运行代码

跟ZkClient同样, 这个项目一样提供了体验用的单元测试RedisStringClientTest和RedisBinaryClientTest, 直接运行这两个单元测试便可.

不过须要注意的是, 该client只在单机环境下验证过, 在哨兵模式和集群模式下都没有验证过, 有可能会有异常状况.不过只有一个节点的redis-cluster的服务端是有测试经过的

至于redis版本, 理论上是redis2.6以上都是没有问题的, 由于redis2.6之后就都是统一为RESP协议了

源码

最后附上github源码地址:

github.com/NorthWard/a…

感兴趣的同窗能够参考一下,共同窗习进步.

相关文章
相关标签/搜索