最近看了 Redis 的代码,感受仍是挺简单的.有冲动想用其它语言实现(抄)一个.原来想用 Python 实现来着.后来想一想试试 Netty.缘由有二java
第一:Java 的NIO 和Netty 的 EventLoop 配合起来和 Redis 的网络模型很接近.都是 Ractor 模型.甚至 Redis的模型更简单--只有一个 EventLoop 线程.写(抄)起来更方便git
第二:Netty 架构挺不错.借这个机会学习一下.redis
若是咱们从一个很抽象(简单)的角度看 Redis Server.就是一个监听在6379的程序, 本质上是一个处理单线线请求的 Hashtable. 而 Redis 的协议也是很是很是的简单.比 http 协议可简单多了.bootstrap
如下是这个协议的通常形式:网络
*<参数数量> CR LF $<参数 1 的字节数量> CR LF <参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF <参数 N 的数据> CR LF
这基本就是一个很简单的有限状态机.架构
因此我给咱们的命令解析器设置3个状态.socket
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
咱们将初始状态设置NUMBER_OF_ARGS 也就是开始那个绿色的状态.当有数据到达时.咱们不停的判断程序的状态.是哪一个状态,咱们作啥.ide
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.若是到了最后一个.则跳出,不然状态转回NUMBER_BYTE_OF_ARGS break; } }
下面咱们按着咱们上面思路实现一下.oop
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder<CommandDecoder.State> { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) <=0) return frame; else { checkpoint(State.NUMBER_BYTE_OF_ARGS); } break; default: throw new DecoderException(""); } } } private int parseRedisNumber(ByteBuf byteBuf) { byte readByte = byteBuf.readByte(); boolean negative = readByte == '-'; if (negative) { readByte = byteBuf.readByte(); } int result = 0; do { int digit = readByte - '0'; if (digit >= 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF){ throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
写到这里有一个小问题,若是你上面代码看懂了,你就会发现一个小问题.若是因为网络缘由,有时数据能够并无接收彻底.而咱们的代码彻底没有作这方面的考虑? 而 Checkpoint 这是又什么鬼?学习
第一个问题:
事实上咱们有考虑这个问题.因此咱们继承了一个相对比较特别Decoder--ReplayingDecoder.咱们看一下ReplayingDecoder的 CallDecode 方法.(这个名字起的很是的直白.你必定明白他是干啥的)
try { decode(ctx, replayable, out); //省略 } catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Signal replay 是 Netty 中定义的一个错误.当咱们读取错误时,Netty 会再等到下次有数据到达时,再试一次Decode 方法.看看能再解析成功.因此咱们就能够假设置咱们要的数据都已经读取了.
可是要注意: replaydecoder 的 decode 方法会被反复调用..因此咱们的代码中要作好这样的准备.
二: CheckPoint 就是为了防止若是每次反复调用 Decode 时从头执行,而设置的一个状态.让咱们这个 decode 方法有状态.
好了.如今咱们建立监部分的代码.这都是套数,直接抄下来就好了
ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
咱们把 Redis 的协议解析为RedisFrame 类
package me.yunanw.redisinjava; import java.util.ArrayList; import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List<String> ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList<String>(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }
好了.这时你打开 Redis-cli 试试是否是能够连上咱们的 "假Redis" Server.有意的是---你打开 Redis-cli.他会自动发一个 "Command" 命令.而你无论回复什么,它都认为连上了.