[toc]java
程序代码来自于《Netty权威指南》第8章,已经加了注释,不过须要注意的是,使用的proto源代码是在Google Protobuf入门与使用中生成的,关于protobuf代码自动生成工具的使用能够参考这篇文章。bootstrap
例子中,经过×××ProtobufVarint32FrameDecoder
和编码器ProtobufVarint32LengthFieldPrepender
的使用已经解决了半包问题,测试时能够把其注释掉,这样就能够演示Netty中使用Protobuf出现的TCP粘包问题。数组
同时,经过protobuf的使用,也能够深入感觉到,其在Netty中的使用确实很是简单,编解码、半包问题,只须要添加相关的处理器便可,并且它能够方便地实现跨语言的远程服务调用。(protobuf自己提供了对不一样语言的支持)异步
但其实在使用时会发现有一个问题,就是编解码的对象是须要使用其生成的特定的proto对象来进行操做的,也就是说,须要编写.proto文件,再经过protoc来生成相应语言的代码文件,显然这样作仍是会有些麻烦(虽然其实也还好,不算麻烦),有没有方便点的方法呢?后面经过protostuff的使用便可解决这个问题。socket
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 添加日志处理器 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的参数是com.google.protobuf.MessageLite // 实际上就是要告诉ProtobufDecoder须要解码的目标类是什么,不然仅仅从字节数组中是 // 没法判断出要解码的目标类型信息的(服务端须要解析的是客户端请求,因此是Req) ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); /** * 来自源码的代码注释,用于Protobuf的半包处理 * * An encoder that prepends the the Google Protocol Buffers * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base * 128 Varints</a> integer length field. For example: * <pre> * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+ * </pre> * */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder编码器,这样就不须要对SubscribeResp进行手工编码 ch.pipeline().addLast(new ProtobufEncoder()); // 添加业务处理handler ch.pipeline().addLast(new SubReqServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // TODO: handle exception } } new SubReqServer().bind(port); } }
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import cn.xpleaf.protobuf.SubscribeRespProto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SubReqServerHandler extends ChannelInboundHandlerAdapter { /** * 因为ProtobufDecoder已经对消息进行了自动解码,所以接收到的订购请求消息能够直接使用 * 对用户名进行校验,校验经过后构造应答消息返回给客户端,因为使用了ProtobufEncoder, * 因此不须要对SubscribeRespProto.SubscribeResp进行手工编码 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg; String username = req.getUserName(); if("xpleaf".equalsIgnoreCase(username)) { System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } /** * 构建SubscribeRespProto.SubscribeResp对象 * @param subReqID * @return */ private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 发生异常,关闭链路 ctx.close(); } }
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeRespProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqClient { public void connect(String host, int port) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) // 设置TCP链接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的参数是com.google.protobuf.MessageLite // 实际上就是要告诉ProtobufDecoder须要解码的目标类是什么,不然仅仅从字节数组中是 // 没法判断出要解码的目标类型信息的(客户端须要解析的是服务端请求,因此是Resp) ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); /** * 来自源码的代码注释,用于Protobuf的半包处理 * * An encoder that prepends the the Google Protocol Buffers * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base * 128 Varints</a> integer length field. For example: * <pre> * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+ * </pre> * */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder编码器,这样就不须要对SubscribeResp进行手工编码 ch.pipeline().addLast(new ProtobufEncoder()); // 添加业务处理handler ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步链接操做 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqClient().connect("localhost", port); } }
package cn.xpleaf.subscribe; import java.util.ArrayList; import java.util.List; import cn.xpleaf.protobuf.SubscribeReqProto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SubReqClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { for(int i = 0; i < 10; i++) { ctx.write(subReq(i)); } ctx.flush(); } /** * 构建SubscribeReqProto.SubscribeReq对象 * @param i * @return */ private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(i); builder.setUserName("xpleaf"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChange"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Service accept server subscribe response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务端输出以下:ide
Service accept client subscribe req : [subReqID: 0 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 1 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 2 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 3 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 4 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 5 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 6 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 7 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 8 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ] Service accept client subscribe req : [subReqID: 9 userName: "xpleaf" productName: "Netty Book For Protobuf" address: "NanJing YuHuaTai" address: "BeiJing LiuLiChange" address: "ShenZhen HongShuLin" ]
客户端输出以下:工具
Service accept server subscribe response : [subReqID: 0 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 1 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 2 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 3 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 4 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 5 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 6 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 7 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 8 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ] Service accept server subscribe response : [subReqID: 9 respCode: 0 desc: "Netty book order succeed, 3 days later, sent to the designated address" ]