接Netty整理(二)web
ByteBuf:是数据容器(字节容器)算法
JDK ByteBuffer
共用读写索引,每次读写操做都须要Flip()
扩容麻烦,并且扩容后容易形成浪费 关于ByteBuffer的使用方法能够参考序列化和反序列化的三种方法 ,里面有Netty 3的ChannelBuffer,由于如今Netty 3用的比较少,看成参考就好。缓存
Netty ByteBuf
读写使用不一样的索引,因此操做便捷
自动扩容,使用便捷服务器
咱们如今来看一下ByteBuf的源码。websocket
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
咱们能够看到ByteBuf是一个抽象类,它里面几乎大部分都是抽象方法。socket
继承ByteBuf的是一个AbstractByteBuf的抽象类,而读写索引的属性就在该类中。ide
public abstract class AbstractByteBuf extends ByteBuf
int readerIndex; //读索引 int writerIndex; //写索引 private int markedReaderIndex; //标记当前读索引的位置 private int markedWriterIndex; //标记当前写索引的位置 private int maxCapacity; //最大容量
而支持自动扩容的部分能够由ensureWritable追踪看到oop
@Override public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } ensureWritable0(minWritableBytes); return this; } final void ensureWritable0(int minWritableBytes) { ensureAccessible(); if (minWritableBytes <= writableBytes()) { return; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // Adjust to the new capacity. capacity(newCapacity); }
在AbstractByteBufAllocator中性能
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
/** * 计算新容量 */ @Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
这个newCapacity就是扩容的新容量,因此咱们通常不须要担忧ByteBuf容量不够的问题。this
ByteBuf的建立方法
1)ByteBufAllocator
池化(Netty4.x版本后默认使用 PooledByteBufAllocator
提升性能而且最大程度减小内存碎片
可使用Channel处理器上下文来建立
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf headBuffer = ctx.alloc().buffer(); ByteBuf directBuffer = ctx.alloc().directBuffer(); CompositeByteBuf byteBufs = ctx.alloc().compositeBuffer(); byteBufs.addComponents(headBuffer,directBuffer); }
非池化UnpooledByteBufAllocator: 每次返回新的实例
2)Unpooled: 提供静态方法建立未池化的ByteBuf,能够建立堆内存和直接内存缓冲区
ByteBuf使用模式
堆缓存区HEAP BUFFER: ByteBuf headBuffer = Unpooled.buffer(6);
优势:存储在JVM的堆空间中,能够快速的分配和释放
缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)
直接缓存区DIRECR BUFFER: ByteBuf directBuffer = Unpooled.directBuffer();
优势:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
缺点:内存的分配和释放,比在堆缓冲区更复杂
复合缓冲区COMPOSITE BUFFER: CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(headBuffer,directBuffer);
能够建立多个不一样的ByteBuf,而后放在一块儿,可是只是一个视图
选择:大量IO数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”
心跳检测
以前有说过Netty Channel处理器的生命周期,如今咱们在此基础上增长心跳检测的部分
咱们须要修改一下EchoServer
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //欢迎线程组(其实就是一个线程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做线程组(其实就是一个线程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //将两个线程组添加到启动对象中 serverBootstrap.group(bossGroup,workGroup) //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的请求的等待队列的最大长度; .option(ChannelOption.SO_BACKLOG,1024) //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法; //若是要减小发送次数,就设置为false,会累积必定大小后再发送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个 //必需要实现的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //将咱们本身编写的事件处理器添加到客户端的链接管道中 //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用 //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上 //做业的工人,咱们能够往一条流水线上投放不少的工人 //IdleStateHandler是一个作空闲检测的ChannelInboundHandler //针对客户端,若是10秒钟时没有向服务端发送读写心跳(All),则主动断开 //若是是读空闲或者是写空闲,不处理 socketChannel.pipeline().addLast(new IdleStateHandler(2,4,10)); socketChannel.pipeline().addLast(new EchoServerHandler()); } }); log.info("服务器启动中"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
再修改EchoServerHandler以下
/** * 事件处理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { //用于记录和管理全部客户端的channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 监听读取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); } /** * 监听读取完毕事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 监听异常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); clients.remove(ctx.channel()); } /** * 将channel注册到EventLoop的Selector多路复用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未注册到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有链接,变为活跃状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 没有链接,非活跃状态 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } /** * 用户事件追踪器 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲 if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("进入读空闲"); }else if (event.state() == IdleState.WRITER_IDLE) { log.info("进入写空闲"); }else if (event.state() == IdleState.ALL_IDLE) { log.info("channel关闭前,链接数为" + clients.size()); ctx.channel().close(); log.info("channel关闭后,链接数为" + clients.size()); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded"); clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText()); log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText()); } }
在这里,咱们只是简单打印一下客户端链接时发送过来的一段字符串
分别启动服务端和客户端,服务端的日志以下
2019-10-22 08:13:31.615 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : handlerAdded
2019-10-22 08:13:31.616 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-10-22 08:13:31.616 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-10-22 08:13:31.626 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : Starting EchoclientApplication &_on admindeMBP.lan with PID 741 &_(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_/Users/admin/Downloads/nettyecho)&_
2019-10-22 08:13:31.631 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-22 08:13:33.637 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入读空闲
2019-10-22 08:13:35.632 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入写空闲
2019-10-22 08:13:35.635 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入读空闲
2019-10-22 08:13:37.639 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入读空闲
2019-10-22 08:13:39.634 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入写空闲
2019-10-22 08:13:39.641 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 进入读空闲
2019-10-22 08:13:41.635 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channel关闭前,链接数为1
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channel关闭后,链接数为0
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] cd.g.websocket.netty.EchoServerHandler : channelInactive
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
2019-10-22 08:13:41.648 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 客户端断开,channel对应的长id为:acde48fffe001122-0000021d-00000001-a2651c27e64fc656-a01ff1fe
2019-10-22 08:13:41.648 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 客户端断开,channel对应的短id为:a01ff1fe
从日志能够看到,处理的流程为先将handler添加到管道pipeline中(handlerAdded),而后是注册到EventLoop的Seletor多路复用器里(channelRegistered),而后是有链接进来,激活状态(channelActive),读取客户端发送过来的消息,而后是读取完毕,次数客户端再也不发送消息,服务端每2秒进入一次读空闲状态,每4秒进入一次写空闲状态,等到第10秒的时候,读写都没有,服务端主动断开客户端的链接,进入无链接,非活跃状态。channel未注册到EventLoop线程中。最终removed完成后,打印断开的channel的id。此处能够看到,客户端要想不被断开链接,就必须主动发送心跳链接的检测信号。