上次撸完一个ZkClient以后(手把手教你用netty撸一个ZkClient), 忽然想起我以前写过一篇redis通信协议的文章(redis通信协议(RESP )是什么). 既然通信协议都弄清楚了, 那么撸一个redis的客户端是否是也是手到擒来?github
说干就干, 今天咱们就来手动撸一个redis的客户端redis
你们都知道, redis中无论key也好, value也好, 真正存到内容中去时都是byte数组. 可是对于调用方来讲(以java来举例), redis的key类型实际上是有两种的:数组
一种是String类型的key, 另一种是Object类型的key.bash
jedis应该是java世界里应用最广的redis客户端了. 它虽然是同时支持String类型和Object类型的key. 可是它在把数据发往服务器以前, 都是先把String类型或者Object类型的key序列化成了byte数组后,再发起请求.服务器
然而正如我在redis通信协议(RESP )是什么那篇文章里实验过的, 其实直接把RESP格式的字符串发给redis服务器,它也是能处理的,并且返回的也是RESP格式的字符串.网络
所以在本文的实现里面, 我把String类型和Object类型(其实最终都是byte[])的客户端分开来实现, 分别是RedisStringClient和RedisBinaryClient.socket
RedisStringClient和RedisBinaryClient的处理逻辑基本一致, 惟一的区别是一个是RESP格式的字符串, 一个是RESP格式的byte数组.ide
RedisStringClient的继承结构
RedisBinaryClient的继承结构
能够看到, 主要的业务逻辑都是在AbstractRedisClient中
public abstract class AbstractRedisClient<T> implements RedisClient<T> {
private ConnectionPool<T> connectionPool;
protected AbstractRedisClient(ClientType clientType){
connectionPool = new ConnectionPool<>(clientType);
}
protected <RETURN> RETURN invokeCmd(Cmd<T> cmd, CmdResp<T, RETURN> cmdResp) throws FailedToGetConnectionException{
RedisConnection<T> connection = null;
try{
T data = cmd.build();
// 从链接池中borrow链接
connection = connectionPool.borrowConnection();
if(connectionPool.checkChannel(connection)){
// 要锁定这个链接
connection.lock();
try{
// 发送命令
connection.writeAndFlush(data).sync();
// 获取命令的返回结果
return cmdResp.parseResp(connection.getResp(RedisConfig.TIME_OUT_MS));
}catch (Exception e){
e.printStackTrace();
}finally {
// 释放锁
connection.unlock();
}
}else{
throw new FailedToGetConnectionException("can not get connection form connection pool");
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(connectionPool.checkChannel(connection)){
// 把链接放回到链接池
connectionPool.returnConnection(connection);
}
}
return null;
}
}
复制代码
因为RedisStringClient更容易理解和表述, 因此本文主要介绍RedisStringClient(可是其实介绍过程,RedisBinaryClient的逻辑也差很少全提到了的)
AbstractRedisClient类的逻辑其实也不复杂:
因此实际上主要的逻辑仍是在Cmd和CmdResp两个接口上.
Cmd接口用来抽象一个redis命令, 例如set, get等.它只有一个方法:
public interface Cmd<PT> {
/**
* 构建RESP 协议
* @return
*/
PT build();
}
复制代码
Cmd接口下面有一个抽象类AbstractCmd, 它进一步丰富Cmd接口:
public abstract class AbstractCmd<T> implements Cmd<T> {
/**
* 具体是什么命令, 例如get set等待
*/
protected String cmd;
/**
* 这个命令后面的参数
*/
protected List<T> paramList;
/**
* redis命令
* @return
*/
protected abstract String getCmd();
}
复制代码
举个例子说, 若是咱们想实现 set key val, 这个命令, 那么cmd就是"set", "key"和"val"都是 paramList的元素
CmdResp用来抽象解析redis返回报文的逻辑, 它没有抽象类,由具体的类(例如SetStringCmd)直接实现该接口
public interface CmdResp<PARAM,RETURN> {
/**
* 解析redis的resp
* @param resp
* @return
*/
RETURN parseResp(PARAM resp);
}
复制代码
CmdResp有两个泛型参数PARAM和RETURN:
PARAM其实就是redis返回的RESP的类型,若是是RedisStringClient的话, 那么redis会返回String类型的RESP报文. 若是是RedisBinaryClient的话, 那么redis会返回byte数组类型的RESP报文
RETURN实际上是咱们最近想解析出来的数据类型. 这个是根据命令的不一样而不一样的, 例如set命令, 是须要返回true 或false的, 因此它Boolean. 而其余命令可能就不同了,是String或Integer等等
先来看一下redis的set命令的参数是怎么样:
set key value [EX seconds] [PX milliseconds] [NX|XX]
复制代码
能够看到set的cmd就是"set", 有两个必选参数key和value, 以及三个可选参数. 所以, 能够抽象出一个AbstractSetCmd类:
/**
* set命令的抽象类
* 命令参数
* set key value [EX seconds] [PX milliseconds] [NX|XX]
* @author laihaohua
*/
public abstract class AbstractSetCmd<T> extends AbstractCmd<T> {
public AbstractSetCmd(T key, T value, T expireMode, T expireTime, T xmode){
super();
super.paramList = new ArrayList<>();
paramList.add(key);
paramList.add(value);
// 设置了过时时间
if(expireMode != null){
paramList.add(expireMode);
paramList.add(expireTime);
}
// 设置了 XX或NX
if(xmode != null){
paramList.add(xmode);
}
}
@Override
protected String getCmd() {
return super.cmd = "set";
}
}
复制代码
该抽象类能够有String 和 Binary两种实现. 咱们来看看String的实现SetStringCmd
**
* 命令参数
* set key value [EX seconds] [PX milliseconds] [NX|XX]
* @author laihaohua
*/
public class SetStringCmd extends AbstractSetCmd<String> implements CmdResp<String, Boolean> {
/**
* 没有过时时间
* @param key
* @param value
*/
public SetStringCmd(String key, String value){
this(key, value, null, 0, null);
}
/**
*
* @param key
* @param value
* @param expireMode
* @param expireTime
*/
public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime){
this(key, value, expireMode, expireTime, null);
}
public SetStringCmd(String key, String value, Xmode xmode){
this(key, value, null, 0, xmode);
}
public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime, Xmode xmode){
super( key,
value ,
expireMode == null ? null : expireMode.getType(),
String.valueOf(expireTime),
xmode == null ? null : xmode.getType() );
}
/**
* 构建请求参数RESP
* @return
*/
@Override
public String build() {
return CmdBuildUtils.buildString(getCmd(), paramList);
}
/**
* 解析redis返回的RESP
* @param resp
* @return
*/
@Override
public Boolean parseResp(String resp) {
char ch = resp.charAt(0);
// 通常返回 +OK 就是成功
if(ch == SymbolUtils.OK_PLUS.charAt(0)){
return true;
}
// 其余的都是失败
return false;
}
}
复制代码
能够看到该类的实现也是很是的简洁的(除了构造方法有点多).
CmdBuildUtils.buildString其实就是把cmd和paramList按顺序拼接成RESP格式.
parseResp就更简洁了.根据RESP协议, 返回"+OK"的就是成功, 不然就是失败
到这里, 一个redis命令就完成了, 是否是很是的简单? 其实GET命令更简单, 由于它只有一个参数.有兴趣的能够到个人github看看GET命令的实现, 这里就再也不累赘了.
最后附上它的类继承结构
有些心细的同窗可能已经发现了, redis的RESP里面并无像ZkClient那样有一个xid,那么当一个客户端收到一个redis响应的时候, 它怎么知道是哪次请求的响应呢.
理论上说, redis是单线程模型,处理顺序确定是按照接受到的请求的顺序的,因此本地把请求队列化感受就能够解决问题了.然而因为网络延时的存在, 服务端接受到的请求的顺序, 实际上是有可能跟发送的顺序不同的.
因此我这里换了一种方式实现(其实也就是jedis的实现方式), 使用链接池.
所谓的链接,其实就是对应netty里面的一个channel. 所谓的链接池, 其实就是在client初始化的时候, 一次建立批量的channel. 而后在发送一个命令以前, 须要向链接池获取链接, 获取到链接后,把这个链接锁定,发送请求,接受响应,再释放锁, 把链接归还给链接池. 这样就能够有效的 解决了上面所说的问题.
redis链接池涉及到两个类:RedisConnection和ConnectionPool
RedisConnection是一个假的代理类, 它内部持有一个NioSocketChannel对象, 并代理里NioSocketChannel对象的writeAndFlush(msg),disconnect(),close(),isActive()四个方法:
public class RedisConnection<T>{
private NioSocketChannel socketChannel;
private Lock lock = new ReentrantLock();
private SynchronousQueue<T> synchronousQueue;
private String name;
public RedisConnection(String name, NioSocketChannel socketChannel, SynchronousQueue<T> synchronousQueue){
this.name = name;
this.socketChannel = socketChannel;
this.synchronousQueue = synchronousQueue;
}
public void cleanUp(){
}
public T getResp(long timeout) throws InterruptedException {
return synchronousQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
/***********************代理channel的几个方法*******************************/
public ChannelFuture writeAndFlush(Object msg) {
return socketChannel.writeAndFlush(msg);
}
public void disconnect() {
socketChannel.disconnect();
}
public void close() {
socketChannel.close();
}
public boolean isActive() {
return socketChannel.isActive();
}
}
复制代码
除了代理channel的方法以外, RedisConnection也用于本身的属性和方法:
1. name
链接的名称
2. lock
用于锁定该链接
3. synchronousQueue
一个同步队列, 用于从netty中的handler中获取redis的返回
4. public T getResp(long timeout)
从synchronousQueue获取redis的返回内容,在AbstractRedisClient中被调用
复制代码
ConnectionPool相对就比较简单了, 它只作了4件事:
就这样, 一个简陋的链接池就完成了.
跟ZkClient同样, 这个项目一样提供了体验用的单元测试RedisStringClientTest和RedisBinaryClientTest, 直接运行这两个单元测试便可.
不过须要注意的是, 该client只在单机环境下验证过, 在哨兵模式和集群模式下都没有验证过, 有可能会有异常状况.不过只有一个节点的redis-cluster的服务端是有测试经过的
至于redis版本, 理论上是redis2.6以上都是没有问题的, 由于redis2.6之后就都是统一为RESP协议了
最后附上github源码地址:
感兴趣的同窗能够参考一下,共同窗习进步.