这一章主要介绍如何使用Netty开发自定义通讯协议。咱们知道有的时候可能咱们不想用java的序列化,由于编码效率低,而且咱们也不想使用protobuf,由于若是说咱们的通讯程序都是来自一个平台,那么用protobuf每次都须要从新利用工具生成文件也不是很好,那么就须要开发本身的通讯协议。java
在开发自定义通讯协议以前,咱们先来明确这么几个开发目标:首先,咱们但愿仍然通讯在对象和字节之间作转换,对上层来讲无感知;其次,咱们的自定义协议通讯程序也要有半包解码的能力。明确了目的以后,咱们如何处理呢?咱们须要依次开发本身的编码器和解码器,你可能会以为之前都是用Netty自带的编解码器,本身开发是否是很困难啊。编程
固然不是!Netty给咱们提供了MessageToByteEncoder和ByteToMessageDecoder这两个编解码器,他们具备半包处理的能力,咱们只须要实现协议就能够了。bootstrap
咱们来看一下客户端和服务端的代码。缓存
客户端代码:服务器
package com.dlb.note.client; import com.dlb.note.constant.ConstantValue; import com.dlb.note.doj.Request; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.MessageToByteEncoder; /** * 功能:自定义编码器客户端 * 版本:1.0 * 日期:2016/12/19 19:55 * 做者:馟苏 */ public class MySelfEncoderClient { /** * 主函数 */ public static void main(String []args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new MySelfEncoder()); channel.pipeline().addLast(new MySelfEncoderHandler()); } }); // 等待客户端连接成功 ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); System.out.println("客户端连接成功!"); // 等待客户端连接关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } /** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) * @author 馟苏 */ class MySelfEncoder extends MessageToByteEncoder { /** * 编码 * @param channelHandlerContext * @param rs * @param byteBuf * @throws Exception */ protected void encode(ChannelHandlerContext channelHandlerContext, Object rs, ByteBuf byteBuf) throws Exception { Request request = (Request)(rs); //包头 byteBuf.writeInt(ConstantValue.FLAG); //module byteBuf.writeShort(request.getModule()); //cmd byteBuf.writeShort(request.getCmd()); //长度 byteBuf.writeInt(request.getDataLength()); //data if(request.getData() != null){ byteBuf.writeBytes(request.getData()); } } } /** * 自定义处理器 */ class MySelfEncoderHandler extends ChannelHandlerAdapter { // 可读 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } // 链接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { Request request = new Request(); request.setCmd((short) 100); request.setData("nihao".getBytes()); request.setModule((short) 9000); ctx.writeAndFlush(request); } } // 关闭 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }
服务端代码:框架
package com.dlb.note.server; import com.dlb.note.doj.Request; import com.dlb.note.server.decode.MySelfDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 功能:自定义解码器服务端 * 版本:1.0 * 日期:2016/12/15 12:47 * 做者:馟苏 */ public class MySelfDecoderServer { /** * 主函数 */ public static void main(String []args) { // 配置服务端的NIO线程池 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度 .option(ChannelOption.SO_BACKLOG, 1024) // 为管道添加处理器 .childHandler(new ChannelInitializer() { // 初始化管道 protected void initChannel(Channel channel) throws Exception { channel.pipeline() .addLast(new MySelfDecoder()) .addLast(new MySelfDecoderHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture future = serverBootstrap.bind(8888).sync(); System.out.println("服务器在8888端口监听hello"); // 等待服务端监听端口关闭 future.channel().closeFuture().sync(); System.out.println("服务器关闭bye"); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 自定义处理器 */ class MySelfDecoderHandler extends ChannelHandlerAdapter { // 可读 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println(request.toString()); } // 链接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client come,ip:" + ctx.channel().remoteAddress()); } // 关闭 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }
解码器:socket
package com.dlb.note.server.decode; import com.dlb.note.constant.ConstantValue; import com.dlb.note.doj.Request; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */ /** * 功能:自定义消息解码器 * 版本:1.0 * 日期:2016/12/19 19:42 * 做者:馟苏 */ public class MySelfDecoder extends ByteToMessageDecoder { /** * 数据包基本长度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; /** * 解码器 * @param channelHandlerContext * @param byteBuf * @param list * @throws Exception */ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 可读长度必须大于基本长度 if(byteBuf.readableBytes() >= BASE_LENTH){ // 防止socket字节流攻击 if(byteBuf.readableBytes() > 2048){ byteBuf.skipBytes(byteBuf.readableBytes()); } // 记录包头开始的index int beginReader; while(true){ beginReader = byteBuf.readerIndex(); byteBuf.markReaderIndex(); if(byteBuf.readInt() == ConstantValue.FLAG){ break; } // 未读到包头,略过一个字节 byteBuf.resetReaderIndex(); byteBuf.readByte(); // 长度又变得不知足 if(byteBuf.readableBytes() < BASE_LENTH){ return; } } // 模块号 short module = byteBuf.readShort(); // 命令号 short cmd = byteBuf.readShort(); // 长度 int length = byteBuf.readInt(); // 判断请求数据包数据是否到齐 if(byteBuf.readableBytes() < length){ // 还原读指针 byteBuf.readerIndex(beginReader); return; } // 读取data数据 byte[] data = new byte[length]; byteBuf.readBytes(data); Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); // 继续往下传递 list.add(request); } } }
可能你们看完之后会问这么一个问题,Netty到底是怎么给咱们解决了半包读写问题,还有关于编码器的编写时什么意思?这其实就须要咱们看一下Netty的源码,拿解码器作例子,下面是byteToMessageDecoder的关键代码。ide
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); boolean var12 = false; try { var12 = true; ByteBuf t = (ByteBuf)msg; this.first = this.cumulation == null; if(this.first) { this.cumulation = t; } else { if(this.cumulation.writerIndex() > this.cumulation.maxCapacity() - t.readableBytes()) { this.expandCumulation(ctx, t.readableBytes()); } this.cumulation.writeBytes(t); t.release(); } this.callDecode(ctx, this.cumulation, out); var12 = false; } catch (DecoderException var13) { throw var13; } catch (Throwable var14) { throw new DecoderException(var14); } finally { if(var12) { if(this.cumulation != null && !this.cumulation.isReadable()) { this.cumulation.release(); this.cumulation = null; } int size = out.size(); this.decodeWasNull = size == 0; for(int i1 = 0; i1 < size; ++i1) { ctx.fireChannelRead(out.get(i1)); } out.recycle(); } } if(this.cumulation != null && !this.cumulation.isReadable()) { this.cumulation.release(); this.cumulation = null; } int var16 = out.size(); this.decodeWasNull = var16 == 0; for(int i = 0; i < var16; ++i) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } else { ctx.fireChannelRead(msg); } }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { this.replayable.setCumulation(in); try { while(in.isReadable()) { int cause = this.checkpoint = in.readerIndex(); int outSize = out.size(); Object oldState = this.state; int oldInputLength = in.readableBytes(); try { this.decode(ctx, this.replayable, out); if(ctx.isRemoved()) { break; } if(outSize == out.size()) { if(oldInputLength == in.readableBytes() && oldState == this.state) { throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } continue; } } catch (Signal var10) { var10.expect(REPLAY); if(!ctx.isRemoved()) { int checkpoint = this.checkpoint; if(checkpoint >= 0) { in.readerIndex(checkpoint); } } break; } if(cause == in.readerIndex() && oldState == this.state) { throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if(this.isSingleDecode()) { break; } } } catch (DecoderException var11) { throw var11; } catch (Throwable var12) { throw new DecoderException(var12); } }
咱们能够看到实际上decode方法是在callDecoder中调用的,调用完了之后若是list不为空,那么就会往上传,这样看来要是想让咱们的上层handler接受对象,就必需要把咱们从字节解析的对象放入list集合中。函数
比较关键的是。工具
首先呢,查看是否是第一次接受,若是是那么把接受到的字节赋值给cumulation,若是不是那么把接收到的字节拼接到cumulation的后面。这是什么意思,咱们能够这么考虑,把cumulation想象成一个缓存,假如你是第一次接受字节,那么你的缓存就是本次接受到的字节;若是说你上次的没有处理完,那么我就把上次没处理完的字节加上本次接受的一块儿放入缓存,而后再交给本次解码器处理。
还有一个问题是怎么保证在处理的时候准备返回还未处理完的字节呢,好比说我发现这一帧不够,我但愿等帧接受足够大了再进行相应处理?其实关键在这个地方,一切都是以读指针来标识的,咱们来看一个bytebuf其实有两个指针:一个读指针readIndex和一个写指针writeIndex,这就解决了NIO编程中常常要flip的问题,那么读指针和写指针直接的字节就是还未处理的。而咱们每读一个字节,读指针都会自增,只要咱们保证在咱们的处理程序中返回读指针的正确位置就能保证Netty框架的缓存帮咱们缓存字节信息。所以,Netty就帮咱们解决了半包的解码问题,是否是很方便!