在RPC框架中,粘包和拆包问题是必须解决一个问题,由于RPC框架中,各个微服务相互之间都是维系了一个TCP长链接,好比dubbo就是一个全双工的长链接。因为微服务往对方发送信息的时候,全部的请求都是使用的同一个链接,这样就会产生粘包和拆包的问题。本文首先会对粘包和拆包问题进行描述,而后介绍其经常使用的解决方案,最后会对Netty提供的几种解决方案进行讲解。css
产生粘包和拆包问题的主要缘由是,操做系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,若是一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就造成了粘包问题;若是一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为屡次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。以下图展现了粘包和拆包的一个示意图:java
上图中演示了粘包和拆包的三种状况:json
对于粘包和拆包问题,常见的解决方案有四种:bootstrap
1) FixedLengthFrameDecoder数组
对于使用固定长度的粘包和拆包场景,可使用FixedLengthFrameDecoder,该解码器会每次读取固定长度的消息,若是当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只须要在构造函数中指定每一个消息的长度便可。这里须要注意的是,FixedLengthFrameDecoder只是一个解码器,Netty也只提供了一个解码器,这是由于对于解码是须要等待下一个包的进行补全的,代码相对复杂,而对于编码器,用户能够自行编写,由于编码时只须要将不足指定长度的部分进行补全便可。下面的示例中展现了如何使用FixedLengthFrameDecoder来进行粘包和拆包处理:服务器
public class EchoServer { public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为20 ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); // 将前一步解码获得的数据转码为字符串 ch.pipeline().addLast(new StringDecoder()); // 这里FixedLengthFrameEncoder是咱们自定义的,用于将长度不足20的消息进行补全空格 ch.pipeline().addLast(new FixedLengthFrameEncoder(20)); // 最终的数据处理 ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoServer().bind(8080); } }
上面的pipeline中,对于入栈数据,这里主要添加了FixedLengthFrameDecoder和StringDecoder,前面一个用于处理固定长度的消息的粘包和拆包问题,第二个则是将处理以后的消息转换为字符串。最后由EchoServerHandler处理最终获得的数据,处理完成后,将处理获得的数据交由FixedLengthFrameEncoder处理,该编码器是咱们自定义的实现,主要做用是将长度不足20的消息进行空格补全。下面是FixedLengthFrameEncoder的实现代码:app
public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> { private int length; public FixedLengthFrameEncoder(int length) { this.length = length; } @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { // 对于超过指定长度的消息,这里直接抛出异常 if (msg.length() > length) { throw new UnsupportedOperationException( "message length is too large, it's limited " + length); } // 若是长度不足,则进行补全 if (msg.length() < length) { msg = addSpace(msg); } ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes())); } // 进行空格补全 private String addSpace(String msg) { StringBuilder builder = new StringBuilder(msg); for (int i = 0; i < length - msg.length(); i++) { builder.append(" "); } return builder.toString(); } }
这里FixedLengthFrameEncoder实现了decode()方法,在该方法中,主要是将消息长度不足20的消息进行空格补全。EchoServerHandler的做用主要是打印接收到的消息,而后发送响应给客户端:框架
public class EchoServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("server receives message: " + msg.trim()); ctx.writeAndFlush("hello client!"); } }
对于客户端,其实现方式基本与服务端的使用方式相似,只是在最后进行消息发送的时候与服务端的处理方式不一样。以下是客户端EchoClient的代码:ide
public class EchoClient { public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 对服务端发送的消息进行粘包和拆包处理,因为服务端发送的消息已经进行了空格补全, // 而且长度为20,于是这里指定的长度也为20 ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); // 将粘包和拆包处理获得的消息转换为字符串 ch.pipeline().addLast(new StringDecoder()); // 对客户端发送的消息进行空格补全,保证其长度为20 ch.pipeline().addLast(new FixedLengthFrameEncoder(20)); // 客户端发送消息给服务端,而且处理服务端响应的消息 ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoClient().connect("127.0.0.1", 8080); } }
对于客户端而言,其消息的处理流程其实与服务端是类似的,对于入站消息,须要对其进行粘包和拆包处理,而后将其转码为字符串,对于出站消息,则须要将长度不足20的消息进行空格补全。客户端与服务端处理的主要区别在于最后的消息处理handler不同,也即这里的EchoClientHandler,以下是该handler的源码:函数
public class EchoClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("client receives message: " + msg.trim()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("hello server!"); } }
这里客户端的处理主要是重写了channelActive()和channelRead0()两个方法,这两个方法的主要做用在于,channelActive()会在客户端链接上服务器时执行,也就是说,其连上服务器以后就会往服务器发送消息。而channelRead0()主要是在服务器发送响应给客户端时执行,这里主要是打印服务器的响应消息。对于服务端而言,前面咱们咱们能够看到,EchoServerHandler只重写了channelRead0()方法,这是由于服务器只须要等待客户端发送消息过来,而后在该方法中进行处理,处理完成后直接将响应发送给客户端。以下是分别启动服务端和客户端以后控制台打印的数据:
// server server receives message: hello server!
// client client receives message: hello client!
2) LineBasedFrameDecoder与DelimiterBasedFrameDecoder
对于经过分隔符进行粘包和拆包问题的处理,Netty提供了两个编解码的类,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。这里LineBasedFrameDecoder的做用主要是经过换行符,即\n或者\r\n对数据进行处理;而DelimiterBasedFrameDecoder的做用则是经过用户指定的分隔符对数据进行粘包和拆包处理。一样的,这两个类都是解码器类,而对于数据的编码,也即在每一个数据包最后添加换行符或者指定分割符的部分须要用户自行进行处理。这里以DelimiterBasedFrameDecoder为例进行讲解,以下是EchoServer中使用该类的代码片断,其他部分与前面的例子中的彻底一致:
@Override protected void initChannel(SocketChannel ch) throws Exception { String delimiter = "_$"; // 将delimiter设置到DelimiterBasedFrameDecoder中,通过该解码器进行处理以后,源数据将会 // 被按照_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据以后,若仍是未 // 读取到分隔符,则舍弃当前数据段,由于其颇有多是因为码流紊乱形成的 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(delimiter.getBytes()))); // 将分隔以后的字节数据转换为字符串数据 ch.pipeline().addLast(new StringDecoder()); // 这是咱们自定义的一个编码器,主要做用是在返回的响应数据最后添加分隔符 ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter)); // 最终处理数据而且返回响应的handler ch.pipeline().addLast(new EchoServerHandler()); }
上面pipeline的设置中,添加的解码器主要有DelimiterBasedFrameDecoder和StringDecoder,通过这两个处理器处理以后,接收到的字节流就会被分隔,而且转换为字符串数据,最终交由EchoServerHandler处理。这里DelimiterBasedFrameEncoder是咱们自定义的编码器,其主要做用是在返回的响应数据以后添加分隔符。以下是该编码器的源码:
public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> { private String delimiter; public DelimiterBasedFrameEncoder(String delimiter) { this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { // 在响应的数据后面添加分隔符 ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes())); } }
对于客户端而言,这里的处理方式与服务端相似,其pipeline的添加方式以下:
@Override protected void initChannel(SocketChannel ch) throws Exception { String delimiter = "_$"; // 对服务端返回的消息经过_$进行分隔,而且每次查找的最大大小为1024字节 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(delimiter.getBytes()))); // 将分隔以后的字节数据转换为字符串 ch.pipeline().addLast(new StringDecoder()); // 对客户端发送的数据进行编码,这里主要是在客户端发送的数据最后添加分隔符 ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter)); // 客户端发送数据给服务端,而且处理从服务端响应的数据 ch.pipeline().addLast(new EchoClientHandler()); }
这里客户端的处理方式与服务端基本一致,关于这里没展现的代码,其与示例一中的代码彻底一致,这里则不予展现。
3) LengthFieldBasedFrameDecoder与LengthFieldPrepender
这里LengthFieldBasedFrameDecoder与LengthFieldPrepender须要配合起来使用,其实本质上来说,这二者一个是解码,一个是编码的关系。它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。LengthFieldBasedFrameDecoder会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而获得目标消息体数据;而LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,这个字节数据中保存了当前消息体的总体字节数据长度。LengthFieldBasedFrameDecoder的解码过程以下图所示:
关于LengthFieldBasedFrameDecoder,这里须要对其构造函数参数进行介绍:
这里咱们以json序列化为例对LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式进行讲解。以下是EchoServer的源码:
public class EchoServer { public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,由于其须要对接收到的数据 // 进行长度字段解码,这里也会对数据进行粘包和拆包处理 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段 ch.pipeline().addLast(new LengthFieldPrepender(2)); // 对通过粘包和拆包处理以后的数据进行json反序列化,从而获得User对象 ch.pipeline().addLast(new JsonDecoder()); // 对响应数据进行编码,主要是将User对象序列化为json ch.pipeline().addLast(new JsonEncoder()); // 处理客户端的请求的数据,而且进行响应 ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoServer().bind(8080); } }
这里EchoServer主要是在pipeline中添加了两个编码器和两个解码器,编码器主要是负责将响应的User对象序列化为json对象,而后在其字节数组前面添加一个长度字段的字节数组;解码器主要是对接收到的数据进行长度字段的解码,而后将其反序列化为一个User对象。下面是JsonDecoder的源码:
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class); out.add(user); } }
JsonDecoder首先从接收到的数据流中读取字节数组,而后将其反序列化为一个User对象。下面咱们看看JsonEncoder的源码:
public class JsonEncoder extends MessageToByteEncoder<User> { @Override protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf) throws Exception { String json = JSON.toJSONString(user); ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes())); } }
JsonEncoder将响应获得的User对象转换为一个json对象,而后写入响应中。对于EchoServerHandler,其主要做用就是接收客户端数据,而且进行响应,以下是其源码:
public class EchoServerHandler extends SimpleChannelInboundHandler<User> { @Override protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception { System.out.println("receive from client: " + user); ctx.write(user); } }
对于客户端,其主要逻辑与服务端的基本相似,这里主要展现其pipeline的添加方式,以及最后发送请求,而且对服务器响应进行处理的过程:
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); ch.pipeline().addLast(new LengthFieldPrepender(2)); ch.pipeline().addLast(new JsonDecoder()); ch.pipeline().addLast(new JsonEncoder()); ch.pipeline().addLast(new EchoClientHandler()); }
public class EchoClientHandler extends SimpleChannelInboundHandler<User> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.write(getUser()); } private User getUser() { User user = new User(); user.setAge(27); user.setName("zhangxufeng"); return user; } @Override protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception { System.out.println("receive message from server: " + user); } }
这里客户端首先会在链接上服务器时,往服务器发送一个User对象数据,而后在接收到服务器响应以后,会打印服务器响应的数据。
4) 自定义粘包与拆包器
对于粘包与拆包问题,其实前面三种基本上已经可以知足大多数情形了,可是对于一些更加复杂的协议,可能有一些定制化的需求。对于这些场景,其实本质上,咱们也不须要手动从头开始写一份粘包与拆包处理器,而是经过继承LengthFieldBasedFrameDecoder和LengthFieldPrepender来实现粘包和拆包的处理。
若是用户确实须要不经过继承的方式实现本身的粘包和拆包处理器,这里能够经过实现MessageToByteEncoder和ByteToMessageDecoder来实现。这里MessageToByteEncoder的做用是将响应数据编码为一个ByteBuf对象,而ByteToMessageDecoder则是将接收到的ByteBuf数据转换为某个对象数据。经过实现这两个抽象类,用户就能够达到实现自定义粘包和拆包处理的目的。以下是这两个类及其抽象方法的声明:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; }
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; }