接Netty整理 java
如今咱们来验证一下channel的生命周期。web
咱们将EchoServerHandler修改以下,增长所有的监听事件,并打印事件方法名称。算法
/** * 事件处理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有链接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有链接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
启动EchoServer,打开telnet链接到端口,咱们能够看到bootstrap
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.数组
整个过程为链接,发送字符串sdfs,退出链接promise
服务端日志为缓存
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-10-01 05:33:54.439 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : sdfs服务器
2019-10-01 05:33:54.442 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.527 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregisteredwebsocket
整个生命周期正如前面写到同样网络
Channel的生命周期为:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered
ChannelPipeline:
比如厂里的流水线同样,能够在上面添加多个ChannelHanler,也可当作是一串 ChannelHandler 实例,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户能够对事件的处理以及ChannelHanler之间交互得到彻底的控制权。
咱们来看一下ChannelPipeline的源码
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> { /** * 在管道的首位置添加一个channelhandler */ ChannelPipeline addFirst(String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在管道的最末端添加一个channelhandler */ ChannelPipeline addLast(String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在一个管道中已存在的channelhandler以前插入另一个channelhandler */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在管道已有多一个channelhandler以后插入另一个channelhandler */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); /** * 同上,多了一个线程池参数 */ ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在该管道的首位置放入一组channelhandler * */ ChannelPipeline addFirst(ChannelHandler... handlers); /** * 同上,多了一个线程池参数 * */ ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); /** * 在管道的最末端放入一组channelhandler * */ ChannelPipeline addLast(ChannelHandler... handlers); /** * 同上,多了一个线程池参数 * */ ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); /** * 从管道中移除一个channelhandler */ ChannelPipeline remove(ChannelHandler handler); /** * 根据名字在管道中移除一个channelhandler */ ChannelHandler remove(String name); /** * 根据类名在管道中移除一个channelhandler */ <T extends ChannelHandler> T remove(Class<T> handlerType); /** * 移除管道中首个channelhandler */ ChannelHandler removeFirst(); /** * 移除管道中末个channelhandler */ ChannelHandler removeLast(); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替换旧的channelhandler,中间参数都是替换者的名字 */ <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler); /** * 返回管道中首个channelhandler */ ChannelHandler first(); /** * 获取第一个管道处理器上下文 */ ChannelHandlerContext firstContext(); /** * 获取管道中最后一个channelhandler */ ChannelHandler last(); /** * 获取管道中最后一个管道处理器上下文 */ ChannelHandlerContext lastContext(); /** * 根据名字获取管道中的一个channelhandler */ ChannelHandler get(String name); /** * 根据类获取一个channelhandler */ <T extends ChannelHandler> T get(Class<T> handlerType); /** * 根据channelhandler获取一个管道处理器上下文 */ ChannelHandlerContext context(ChannelHandler handler); /** * 根据名字获取一个管道处理器上下文 */ ChannelHandlerContext context(String name); /** * 根据一个channelhandler的类名获取一个管道处理器上下文 */ ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType); /** * 返回一个管道 */ Channel channel(); /** * 返回管道中的channelhandler的名称列表 */ List<String> names(); /** * Converts this pipeline into an ordered {@link Map} whose keys are * handler names and whose values are handlers. */ Map<String, ChannelHandler> toMap(); @Override ChannelPipeline fireChannelRegistered(); @Override ChannelPipeline fireChannelUnregistered(); @Override ChannelPipeline fireChannelActive(); @Override ChannelPipeline fireChannelInactive(); @Override ChannelPipeline fireExceptionCaught(Throwable cause); @Override ChannelPipeline fireUserEventTriggered(Object event); @Override ChannelPipeline fireChannelRead(Object msg); @Override ChannelPipeline fireChannelReadComplete(); @Override ChannelPipeline fireChannelWritabilityChanged(); @Override ChannelPipeline flush(); }
ChannelHandlerContext模块的做用和分析
咱们在ChannelHandler的方法中会看到有一个参数。如
/** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); }
这个ChannelHandlerContext就是一个处理器的上下文。
一、ChannelHandlerContext是链接ChannelHandler和ChannelPipeline的桥梁
ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,比如调用write方法,
Channel、ChannelPipeline、ChannelHandlerContext 均可以调用此方法,前二者都会在整个管道流里传播,而ChannelHandlerContext就只会在后续的Handler里面传播
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //第一种,会在整个管道中传播 Channel channel = ctx.channel(); channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); //第二种,会在整个管道中传播 ChannelPipeline pipeline = ctx.pipeline(); pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); //第三种,只会在后续的channelhandler中传播 ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // ByteBuf data = (ByteBuf) msg; // log.info(data.toString(CharsetUtil.UTF_8)); // ctx.writeAndFlush(data); }
二、AbstractChannelHandlerContext类
双向链表结构,next/prev分别是后继节点,和前驱节点
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;
三、DefaultChannelHandlerContext 是实现类,可是大部分都是父类那边完成,这个只是简单的实现一些方法
主要就是判断Handler的类型
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
ChannelInboundHandler之间的传递,主要经过调用ctx里面的FireXXX()方法来实现下个handler的调用
咱们修改一下EchoServerHandler,再增长一个EchoServerHandler2
/** * 事件处理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // //第一种,会在整个管道中传播 // Channel channel = ctx.channel(); // channel.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // //第二种,会在整个管道中传播 // ChannelPipeline pipeline = ctx.pipeline(); // pipeline.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); // //第三种,只会在后续的channelhandler中传播 // ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8)); ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第一个handler", CharsetUtil.UTF_8)); ctx.fireChannelRead(msg); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有链接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有链接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
@Slf4j public class EchoServerHandler2 extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第二个handler", CharsetUtil.UTF_8)); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有链接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有链接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
将EchoServerHandler2添加到管道中
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 socketChannel.pipeline().addLast(new EchoServerHandler()); socketChannel.pipeline().addLast(new EchoServerHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
咱们能够看到EchoServerHandler2是添加到EchoServerHandler后面的。
启动EchoServer,打开telnet
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
kdfaf
我是第一个handler我是第二个handler^]
telnet> quit
Connection closed.
服务端的日志为
2019-09-29 22:42:42.626 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-09-29 22:42:42.627 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-09-29 22:42:48.108 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler2 : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.799 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
多个入站出站ChannelHandler的执行顺序
一、通常的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序?
InboundHandler顺序执行,OutboundHandler逆序执行
问题:ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler2());
或者:
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
如今咱们以实际代码来验证一下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //在此结束调用,不会在管道中继续传递 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class OutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutBoundHandler1 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutBoundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j public class OutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutboundHaneler2 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
启动EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
dsfs
InboundHandler2 InboundHandler1 dsfs
服务端日志
2019-10-02 09:51:44.865 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:dsfs
2019-10-02 09:51:44.869 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 dsfs
咱们能够看到此时,事件在InboundHandler中就结束了,并无传递到OutboundHandler中。这是由于OutboundHandler在InboundHandler以后才开始监听,InboundHandler在处理中,并无监听写出站事件,因此不会执行到OutboundHandler之中的代码。
可是若是把监听事件的顺序调整一下,在入站以前就开始监听出站事件。
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
运行EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sfdfg
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
^]
telnet> quit
Connection closed.
从右到左为依次执行顺序,先是InboundHandler1->InboundHandler2->OutboundHandler2->OutboundHandler1
服务端日志为
2019-10-02 10:27:03.683 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
由于没有监听活跃,断开事件,因此不会打印相关日志。这里之因此OutboundHandler会执行是由于在入站以前就开始监听写出事件,而InboundHandler会先执行是由于只有接收到客户端的消息的时候才会进行写出操做,这个时候才会被OutboundHandler监听到,进行相关操做,可是因为OutboundHandler是从后往前执行,因此会先执行OutboundHandler2,再执行OutboundHandler1。而整个数据传输是贯穿管道始终的。
执行顺序是:
InboundHandler1 channelRead
InboundHandler2 channelRead
OutboundHandler2 write
OutboundHandler1 write
如今把InboundHandler1提到最前又是什么状况呢?
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
由分析可知,OutboundHandler出站是不会监听InboudHandler1的写出事件的,但能够监听到InboundHandler2的写出事件。
咱们修改一下两个InboundHandler来加以验证。
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); //在此结束调用,不会在管道中继续传递 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 alone \n".getBytes())); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
重启EchoServer,打开telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfljioldf
InboundHandler1 sdfljioldf //此处InboundHandler1只是简单的输出了,没有被OutboundHandler监听。
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone //此处InboundHandler2被OutboundHandler监听,因此会追加输出OutBoundHandler1 OutboundHandler2
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf //此处也是InboundHandler2被OutboundHandler监听输出的。
服务端日志
2019-10-02 11:04:09.345 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf
跟咱们分析的同样。
但若是咱们修改一下InboundHandler1的代码以下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
或者
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //会传递到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
如今不管InboundHandler1的位置放前、放后,是否被OutboundHandler监听,它都会流通整个管道。
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler1 alone ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
服务端日志为
2019-10-02 11:18:10.761 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服务端收到数据:ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.767 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服务端收到数据:InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
结论:
1)InboundHandler顺序执行,OutboundHandler逆序执行
2)InboundHandler之间传递数据,经过ctx.fireChannelRead(msg)
3)InboundHandler经过ctx.write(msg),则会传递到outboundHandler
4) 使用ctx.write(msg)传递消息,Inbound须要放在结尾,在Outbound以后,否则outboundhandler会不执行;
可是使用channel.write(msg)、pipline.write(msg)状况会不一致,都会执行
5) outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再inbound,服务端则相反
七、Netty异步操做模块ChannelFuture
Netty中的全部I/O操做都是异步的,这意味着任何I/O调用都会当即返回,而ChannelFuture会提供有关的信息I/O操做的结果或状态。
ChannelFuture的使用主要是在EchoServer中
//绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync();
1)ChannelFuture状态:
未完成:当I/O操做开始时,将建立一个新的对象,新的最初是未完成的 - 它既没有成功,也没有被取消,由于I/O操做还没有完成。
已完成:当I/O操做完成,无论是成功、失败仍是取消,Future都是标记为已完成的, 失败的时候也有具体的信息,例如缘由失败,但请注意,即便失败和取消属于完成状态。
ChannelFuture的四种状态
* +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
注意:不要在IO线程内调用future对象的sync或者await方法
不能在channelHandler中调用sync或者await方法
2)ChannelPromise:继承于ChannelFuture,进一步拓展用于设置IO操做的结果
其中的继承关系图如上所示,第一个Future是JDK自带的,第二个Future是netty中增长的。关于future通常的使用方法能够参考Fork/Join框架原理和使用探秘
Netty编写的网络数据传输中的编码和解码
前面说的:高性能RPC框架的3个要素:IO模型、数据协议、线程模型
最开始接触的编码码:java序列化/反序列化(就是编解码)、url编码、base64编解码
为啥jdk有编解码,还要netty本身开发编解码?
java自带序列化的缺点
1)没法跨语言
2) 序列化后的码流太大,也就是数据包太大
3) 序列化和反序列化性能比较差
业界里面也有其余编码框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等,关于Kyro的使用方法能够参考浅析克隆 。
Netty里面的编解码:
解码器:负责处理“入站 InboundHandler”数据
编码器:负责“出站 OutboundHandler” 数据
Netty里面提供默认的编解码器,也支持自定义编解码器
Encoder:编码器
Decoder:解码器
Codec:编解码器
Netty的解码器Decoder和使用场景
Decoder对应的就是ChannelInboundHandler,主要就是字节数组转换为消息对象,在咱们以前的样例中,都是处理一些简单的字符串来进行消息传递,但若是咱们须要转换的是Java的对象的话,则须要使用到Decoder。
主要是两个方法
decode (经常使用)
decodeLast (用于最后的几个字节处理,也就是channel关闭的时候,产生的最后一个消息)
抽象解码器
1)ByteToMessageDecoder
用于将字节转为消息,须要检查缓冲区是否有足够的字节,经过源码可知,ByteToMessageDecoder实际上就是一个ChannelInboundHandler。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
2)ReplayingDecoder
继承ByteToMessageDecoder,不须要检查缓冲区是否有足够的字节,可是ReplayingDecoder速度略慢于ByteToMessageDecoder,不是全部的ByteBuf都支持
选择:项目复杂性高则使用ReplayingDecoder,不然使用 ByteToMessageDecoder
解码器具体的实现,用的比较多的是(更可能是为了解决TCP底层的粘包和拆包问题)
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
LineBasedFrameDecoder: 以换行符为结束标志的解码器
FixedLengthFrameDecoder:固定长度解码器
LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
StringDecoder:文本解码器,将接收到的对象转化为字符串,通常会与上面的进行配合,而后在后面添加业务handle
Netty编码器Encoder
Encoder对应的就是ChannelOutboundHandler,消息对象转换为字节数组
Netty自己未提供和解码同样的编码器,是由于场景不一样,二者非对等的
1)MessageToByteEncoder
消息转为字节数组,调用write方法,会先判断当前编码器是否支持须要发送的消息类型,若是不支持,则透传;其自己其实就是一个ChannelOutboundHandler。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
2)MessageToMessageEncoder
用于从一种消息编码为另一种消息(例如POJO到POJO)
Netty编解码器类Codec
组合解码器和编码器,以此提供对于字节和消息都相同的操做(通常不经常使用)
优势:成对出现,编解码都是在一个类里面完成
缺点:耦合在一块儿,拓展性不佳
Codec:组合编解码
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解码
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:编码
1)ByteToMessageEncoder
2)MessageToMessageEncoder
什么是TCP粘包拆包
1)TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
2)TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包
发送方和接收方均可能出现这个缘由
发送方的缘由:TCP默认会使用Nagle算法
接收方的缘由: TCP接收到数据放置缓存中,应用程序从缓存中读取
UDP: 是没有粘包和拆包的问题,有边界协议
二、TCP半包读写常看法决方案
简介:讲解TCP半包读写常见的解决办法
发送方:能够关闭Nagle算法
接受方: TCP是无界的数据流,并无处理粘包现象的机制, 且协议自己没法避免粘包,半包读写的发生须要在应用层进行处理
应用层解决半包读写的办法
1)设置定长消息 (10字符)
xdclass000xdclass000xdclass000xdclass000
2)设置消息的边界 ($$ 切割)
sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
Header+Body
三、Netty自带解决TCP半包读写方案
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
LineBasedFrameDecoder: 以换行符为结束标志的解码器
FixedLengthFrameDecoder:固定长度解码器
LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
如今咱们来作一个实验,若是不使用解码器,会产生什么样的效果。
服务端的入站事件处理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; byte[] bytes = new byte[data.readableBytes()]; //将收到到字节流放入内存字节数组 data.readBytes(bytes); //将字节数组转成字符串,并截取该字符串从0到换行符(不包含换行符)到子串 //System.getProperty("line.separator")为获取操做系统的换行符,每种操做系统可能各不相同 String body = new String(bytes, CharsetUtil.UTF_8) .substring(0,bytes.length - System.getProperty("line.separator").length()); log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端入站事件处理器
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; //定义一段包含换行符的字符串,并转化为字节数组 byte[] req = ("guanjian.net" + System.getProperty("line.separator")).getBytes(); //连续发送10条该字符串到服务端 for (int i = 0;i < 10;i++) { //申请一段内存缓冲区 msg = Unpooled.buffer(req.length); //将字节数组写入缓冲区 msg.writeBytes(req); //发送该字符串到服务端 ctx.writeAndFlush(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
分别启动服务端和客户端
咱们能够看到服务端日志为
2019-10-05 08:41:11.733 INFO 614 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net收到消息次数:1
虽然客户端是分了10次发送到数据,可是服务端倒是只接收了一次,收到消息次数1,这明显是一次粘包。
如今咱们给客户端加上.option(ChannelOption.TCP_NODELAY,true)来看一下是什么状况
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客户端处理线程组(其实就是一个线程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客户端netty启动对象 Bootstrap bootstrap = new Bootstrap(); //将客户端线程组添加到启动对象中 bootstrap.group(group) //给启动对象添加Socket管道 .channel(NioSocketChannel.class) //主动链接到远程服务器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); //链接到服务端,connect是异步链接,在调用同步等待sync,等待链接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 group.shutdownGracefully(); } } }
服务端日志为
2019-10-05 08:45:52.654 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:2
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:3
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:4
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 08:45:52.656 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net
guanjian.net收到消息次数:6
咱们能够看到,虽然加了.option(ChannelOption.TCP_NODELAY,true),但并不能保证它不产生粘包,有些包包含了两条字符串,收到的消息也就不是10次了。
如今咱们把换行解码器LineBasedFrameDecoder放入
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //这个1024是在1024个字节内去寻找换行符,若是在1024个字节内没有找到换行符,就会报错 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
再次运行,服务端日志
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:1
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:2
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:3
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:4
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:5
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:6
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:7
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:8
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:9
2019-10-05 09:04:15.694 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.ne收到消息次数:10
LineBasedFrameDecoder是继承于ByteToMessageDecoder,是一个ChannelInboundHandler
public class LineBasedFrameDecoder extends ByteToMessageDecoder
由上面可见,粘包通过LineBasedFrameDecoder处理,再逐条发往下一个ChannelInboundHandler,而不是从客户端逐条发送过来的。
若是咱们以为ServerHandler的channelRead方法太麻烦了,还要由字节数组转成字符串,那咱们直接将收到的信息强制转成字符串会怎么样呢?
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; log.info("服务端收到消息内容为:" + body + "收到消息次数:" + ++counter); }
启动后服务端报错以下(部分)
java.lang.ClassCastException: io.netty.buffer.PooledSlicedByteBuf cannot be cast to java.lang.String
at com.guanjian.websocket.netty.packet.ServerHandler.channelRead(ServerHandler.java:29)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
说明此处没法将收到的信息直接强制转换成字符串,这个时候咱们能够加入字符串解码器StringDecoder.
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //这个1024是在1024个字节内去寻找换行符,若是在1024个字节内没有找到换行符,就会报错 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
启动运行,服务端日志为
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:1
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:2
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:3
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:4
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:5
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:6
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:7
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:8
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:9
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:guanjian.net收到消息次数:10
使用解码器LineBasedFrameDecoder解决半包读写问题
1)LineBaseFrameDecoder 以换行符为结束标志的解码器 ,构造函数里面的数字表示最长遍历的帧数
2)StringDecoder解码器将对象转成字符串
如今咱们修改一下客户端处理器ClientHandler。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"; ByteBuf buf = null; //申请一段内存缓冲区 buf = Unpooled.buffer(message.getBytes().length); //将字节数组写入缓冲区 buf.writeBytes(message.getBytes()); //发送字节数组到服务端 ctx.writeAndFlush(buf); }
咱们能够看到message字符串中都带有&_字符,咱们将在服务端以该字符为分隔符进行解码。
修改EchoServer到代码以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //使用指定消息分隔符解码器进行解码 //1024的意思是在1024个字节内查找&_,若是找不到就会抛出异常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
分别启动服务端,客户端。服务端到日志以下
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 12:55:36.079 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3
对比message
"Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"
咱们会发现最后一句/Users/admin/Downloads/nettyecho)被丢弃了,因此要所有拿到咱们须要的消息,咱们须要在最后一段字符串中加入&_
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)&_"; ByteBuf buf = null; //申请一段内存缓冲区 buf = Unpooled.buffer(message.getBytes().length); //将字节数组写入缓冲区 buf.writeBytes(message.getBytes()); //发送字节数组到服务端 ctx.writeAndFlush(buf); }
再次启动客户端,服务端日志为
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication 收到消息次数:1
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 收到消息次数:2
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次数:3
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)收到消息次数:4
其实DelimiterBasedFrameDecoder有不少个构造器。咱们这里使用的两参构造器其实调用到是一个三参构造器。
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter); }
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, true, delimiter); }
而三参构造器实际上是调用了一个四参构造器
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] { delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())}); }
咱们来讲明一下这个四参构造器每一个参数到含义。
maxFrameLength:
表示一行最大的长度,若是超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException
stripDelimiter:
解码后的消息是否去除掉分隔符(true去掉分隔符,flase保留分隔符)
failFast:
若是为true,则超出maxLength后当即抛出TooLongFrameException,不进行继续解码
若是为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常
delimiters:
分隔符,ByteBuf类型
如今咱们来看一下第二个参数到含义。
修改EchoServer以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //使用指定消息分隔符解码器进行解码,1024为在1024个字节内查找分隔符(能够本身任意定义),若是 //找不到会抛出异常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false,true,delimiter)); //字符串解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
从新启动服务端,客户端。服务端到日志以下
2019-10-06 13:33:24.806 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:Starting EchoclientApplication &_收到消息次数:1
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:on admindeMBP.lan with PID 741 &_收到消息次数:2
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_收到消息次数:3
2019-10-06 13:33:24.808 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服务端收到消息内容为:/Users/admin/Downloads/nettyecho)&_收到消息次数:4
自定义长度半包读写器LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder也有不少构造器,通常咱们使用的是一个五参构造器
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
固然它调用的是一个六参构造器。
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this( ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); }
这六个参数的含义以下
maxFrameLength 数据包的最大长度
lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节以后的才是消息体字段
lengthFieldLength 长度字段占的字节数, 帧数据长度的字段自己的长度
lengthAdjustment
通常 Header + Body,添加到长度字段的补偿值,若是为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字
initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包以后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包
failFast 是否快速失败
六参构造器调用到是一个七参构造器(不讨论)。
如今咱们来设置一个存储消息内容到消息类。(服务端和客户端相同)
public class CCMessageHeader { @Getter @Setter private byte[] messageFlag = new byte[2]; //消息长度偏移量 @Getter @Setter private int length; //消息长度 @Getter @Setter private int type; //消息类型 @Getter private String data; //消息内容 public CCMessageHeader() { //170,这两个数没有实际意义,只用来描述消息长度偏移量而填充进去的 messageFlag[0] = (byte) 0xaa; //187 messageFlag[1] = (byte) 0xbb; } public void setData(String data) { this.data = data; //消息体的长度为字符串data的长度加4,4为type整形的长度 this.length = data.length() + 4; } }
服务端入站处理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //若是字节流到可读字节数小于等于0,转给下一个InboundHandler处理 if (body.readableBytes() <= 0) { ctx.fireChannelRead(msg); } //初始化一个接收消息对象 CCMessageHeader recHd = new CCMessageHeader(); //字节流读取到的第一个整形赋给nLength int nLength = body.readInt(); //字节流读取到到第二个整形赋给nType int nType = body.readInt(); //获取字节流的可读字节数,并分配一个该大小的字节数组 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //将字节流读入字节数组 body.readBytes(aa); //将该字节数组转成字符串 String myMsg = new String(aa, CharsetUtil.UTF_8); log.info("收到 " + ++counter + "次消息:[" + myMsg + "],类型为[" + nType + "]"); //初始化一个发送消息对象 CCMessageHeader hd = new CCMessageHeader(); hd.setType(2); hd.setData("server data..."); //申请一段直接缓冲空间 ByteBuf echo = Unpooled.directBuffer(); //将该发送消息对象的各属性写入到缓冲空间中 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(),CharsetUtil.UTF_8); //将该发送消息对象以字节流到形式发送到客户端 ctx.writeAndFlush(echo); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoServer以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //65535表示对自定义长度解码器对最大处理长度, // 第二个2表示长度信息从第二个字节后开始获取, //其实这个2是messageFlag的长度,即获取长度字段的偏移量。 //4表示长度字段占4个字节,即private int length的字节数,一个整数占4个字节 //0表示添加到长度字段的补偿值,这里不须要补偿 //最后一个2表示获取咱们的消息体,要去掉2个长度字段的字节数,由于除了length字段还有一个 //type的整形字段,因此是2才能拿到data字段,即显示消息体字段,但其实消息体包含了type字段的 socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
客户端入站处理器
@Slf4j public class ClientHandler2 extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //初始化一个发送消息对象 CCMessageHeader hd = new CCMessageHeader(); //设置类型为1 hd.setType(1); for (int i = 0;i < 10;i++) { String strData = String.format("client data %d...",i + 1); //设置消息体 hd.setData(strData); //申请一段直接缓冲区 ByteBuf echo = Unpooled.directBuffer(); //将消息对象的各个属性写入该缓冲区 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(), CharsetUtil.UTF_8); //将该缓冲区的字节流发送到服务端 ctx.writeAndFlush(echo); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //初始化接收消息对象 CCMessageHeader recHd = new CCMessageHeader(); //获取第一个整形赋给nLength,即消息长度 int nLength = body.readInt(); //获取第二个整形赋给nType int nType = body.readInt(); //获取整个字节流的可读字节数,并以此创建一个字节数组 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //将字节流读入该字节数组 body.readBytes(aa); //将该字节数组转换为字符串 String myMsg = new String(aa,CharsetUtil.UTF_8); log.info("收到" + ++counter + "次消息[" + myMsg + "],类型为:[" + nType + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoClient代码以下
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客户端处理线程组(其实就是一个线程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客户端netty启动对象 Bootstrap bootstrap = new Bootstrap(); //将客户端线程组添加到启动对象中 bootstrap.group(group) //给启动对象添加Socket管道 .channel(NioSocketChannel.class) //主动链接到远程服务器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ClientHandler2()); } }); //链接到服务端,connect是异步链接,在调用同步等待sync,等待链接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 group.shutdownGracefully(); } } }
运行服务端,客户端。服务端日志以下
2019-10-06 15:27:54.601 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 1次消息:[client data 1...],类型为[1]
2019-10-06 15:27:54.604 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 2次消息:[client data 2...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 3次消息:[client data 3...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 4次消息:[client data 4...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 5次消息:[client data 5...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 6次消息:[client data 6...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 7次消息:[client data 7...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 8次消息:[client data 8...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 9次消息:[client data 9...],类型为[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 10次消息:[client data 10...],类型为[1]
客户端日志以下
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到1次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到2次消息[server data...],类型为:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到3次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到4次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到5次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到6次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到7次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到8次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到9次消息[server data...],类型为:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到10次消息[server data...],类型为:[2]
这样就分别完成了自定义长度的半包解码