粘包和拆包是TCP网络编程中不可避免的,不管是服务端仍是客户端,当咱们读取或者发送消息的时候,都须要考虑TCP底层的粘包/拆包机制。编程
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。markdown
如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,因为服务端一次读取到的字节数是不肯定的,故可能存在如下4种状况。网络
若是此时服务端TCP接收滑窗很是小,而数据包D1和D2比较大,颇有可能会发生第五种可能,即服务端分屡次才能将D1和D2包接收彻底,期间发生屡次拆包。app
数据从发送方到接收方须要通过操做系统的缓冲区,而形成粘包和拆包的主要缘由就在这个缓冲区上。粘包能够理解为缓冲区数据堆积,致使多个请求数据粘在一块儿,而拆包能够理解为发送的数据大于缓冲区,进行拆分处理。tcp
详细来讲,形成粘包和拆包的缘由主要有如下三个:ide
因为底层的TCP没法理解上层的业务数据,因此在底层是没法保证数据包不被拆分和重组的,这个问题只能经过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,能够概括以下。oop
针对上一小节描述的粘包和拆包的解决方案,对于拆包问题比较简单,用户能够本身定义本身的编码器进行处理,Netty并无提供相应的组件。对于粘包的问题,因为拆包比较复杂,代码比较处理比较繁琐,Netty提供了4种解码器来解决,分别以下:ui
以上解码器在使用时只须要添加到Netty的责任链中便可,大多数状况下这4种解码器均可以知足了,固然除了以上4种解码器,用户也能够自定义本身的解码器进行处理。具体能够参考如下代码示例:编码
// Server主程序 public class XNettyServer { public static void main(String[] args) throws Exception { // accept 处理链接的线程池 NioEventLoopGroup acceptGroup = new NioEventLoopGroup(); // read io 处理数据的线程池 NioEventLoopGroup readGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(acceptGroup, readGroup) .channel(NioServerSocketChannel.class) .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 增长解码器 pipeline.addLast(new XDecoder()); // 打印出内容 handdler pipeline.addLast(new XHandler()); } }); System.out.println("启动成功,端口 7777"); serverBootstrap.bind(7777).sync().channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); readGroup.shutdownGracefully(); } } } // 解码器 public class XDecoder extends ByteToMessageDecoder { static final int PACKET_SIZE = 220; // 用来临时保留没有处理过的请求报文 ByteBuf tempMsg = Unpooled.buffer(); /** * @param ctx * @param in 请求的数据 * @param out 将粘在一块儿的报文拆分后的结果保留起来 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes()); // 合并报文 ByteBuf message = null; int tmpMsgSize = tempMsg.readableBytes(); // 若是暂存有上一次余下的请求报文,则合并 if (tmpMsgSize > 0) { message = Unpooled.buffer(); message.writeBytes(tempMsg); message.writeBytes(in); System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes()); } else { message = in; } int size = message.readableBytes(); int counter = size / PACKET_SIZE; for (int i = 0; i < counter; i++) { byte[] request = new byte[PACKET_SIZE]; // 每次从总的消息中读取220个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(Unpooled.copiedBuffer(request)); } // 多余的报文存起来 // 第一个报文: i+ 暂存 // 第二个报文: 1 与第一次 size = message.readableBytes(); if (size != 0) { System.out.println("多余的数据长度:" + size); // 剩下来的数据放到tempMsg暂存 tempMsg.clear(); tempMsg.writeBytes(message.readBytes(size)); } } } // 处理器 public class XHandler extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] content = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(content); System.out.println(Thread.currentThread() + ": 最终打印" + new String(content)); ((ByteBuf) msg).release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }