Netty整理(三)

Netty整理(二)web

ByteBuf:是数据容器(字节容器)算法

        JDK ByteBuffer
            共用读写索引,每次读写操做都须要Flip()
            扩容麻烦,并且扩容后容易形成浪费 关于ByteBuffer的使用方法能够参考序列化和反序列化的三种方法 ,里面有Netty 3的ChannelBuffer,由于如今Netty 3用的比较少,看成参考就好。缓存

        Netty ByteBuf
            读写使用不一样的索引,因此操做便捷
            自动扩容,使用便捷服务器

咱们如今来看一下ByteBuf的源码。websocket

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

咱们能够看到ByteBuf是一个抽象类,它里面几乎大部分都是抽象方法。socket

继承ByteBuf的是一个AbstractByteBuf的抽象类,而读写索引的属性就在该类中。ide

public abstract class AbstractByteBuf extends ByteBuf
int readerIndex;  //读索引
int writerIndex;  //写索引
private int markedReaderIndex;  //标记当前读索引的位置
private int markedWriterIndex;  //标记当前写索引的位置
private int maxCapacity;  //最大容量

而支持自动扩容的部分能够由ensureWritable追踪看到oop

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }
    ensureWritable0(minWritableBytes);
    return this;
}

final void ensureWritable0(int minWritableBytes) {
    ensureAccessible();
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);
}

在AbstractByteBufAllocator中性能

static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
/**
 * 计算新容量
 */
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

这个newCapacity就是扩容的新容量,因此咱们通常不须要担忧ByteBuf容量不够的问题。this

ByteBuf的建立方法
                1)ByteBufAllocator
                     池化(Netty4.x版本后默认使用 PooledByteBufAllocator
                         提升性能而且最大程度减小内存碎片

                    可使用Channel处理器上下文来建立

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf headBuffer = ctx.alloc().buffer();
    ByteBuf directBuffer = ctx.alloc().directBuffer();
    CompositeByteBuf byteBufs = ctx.alloc().compositeBuffer();
    byteBufs.addComponents(headBuffer,directBuffer);
}

                     非池化UnpooledByteBufAllocator: 每次返回新的实例

                2)Unpooled: 提供静态方法建立未池化的ByteBuf,能够建立堆内存和直接内存缓冲区

    
            ByteBuf使用模式
                堆缓存区HEAP BUFFER:  ByteBuf headBuffer = Unpooled.buffer(6);
                    优势:存储在JVM的堆空间中,能够快速的分配和释放
                    缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)   

                直接缓存区DIRECR BUFFER:  ByteBuf directBuffer = Unpooled.directBuffer();
                    优势:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
                    缺点:内存的分配和释放,比在堆缓冲区更复杂   

                复合缓冲区COMPOSITE BUFFER:   CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

                                                                 compositeByteBuf.addComponents(headBuffer,directBuffer);
                    能够建立多个不一样的ByteBuf,而后放在一块儿,可是只是一个视图
                选择:大量IO数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”

心跳检测

以前有说过Netty Channel处理器的生命周期,如今咱们在此基础上增长心跳检测的部分

咱们须要修改一下EchoServer

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法;
                    //若是要减小发送次数,就设置为false,会累积必定大小后再发送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            //IdleStateHandler是一个作空闲检测的ChannelInboundHandler
                            //针对客户端,若是10秒钟时没有向服务端发送读写心跳(All),则主动断开
                            //若是是读空闲或者是写空闲,不处理
                            socketChannel.pipeline().addLast(new IdleStateHandler(2,4,10));
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

再修改EchoServerHandler以下

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    //用于记录和管理全部客户端的channel
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        clients.remove(ctx.channel());
    }

    /**
     * 将channel注册到EventLoop的Selector多路复用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未注册到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有链接,变为活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 没有链接,非活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }

    /**
     * 用户事件追踪器
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("进入读空闲");
            }else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("进入写空闲");
            }else if (event.state() == IdleState.ALL_IDLE) {
                log.info("channel关闭前,链接数为" + clients.size());
                ctx.channel().close();
                log.info("channel关闭后,链接数为" + clients.size());
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded");
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());
        log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());
    }
}

在这里,咱们只是简单打印一下客户端链接时发送过来的一段字符串

分别启动服务端和客户端,服务端的日志以下

2019-10-22 08:13:31.615  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : handlerAdded
2019-10-22 08:13:31.616  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-10-22 08:13:31.616  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-10-22 08:13:31.626  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : Starting EchoclientApplication &_on admindeMBP.lan with PID 741 &_(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_/Users/admin/Downloads/nettyecho)&_
2019-10-22 08:13:31.631  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-22 08:13:33.637  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入读空闲
2019-10-22 08:13:35.632  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入写空闲
2019-10-22 08:13:35.635  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入读空闲
2019-10-22 08:13:37.639  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入读空闲
2019-10-22 08:13:39.634  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入写空闲
2019-10-22 08:13:39.641  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 进入读空闲
2019-10-22 08:13:41.635  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channel关闭前,链接数为1
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channel关闭后,链接数为0
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] cd.g.websocket.netty.EchoServerHandler    : channelInactive
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered
2019-10-22 08:13:41.648  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 客户端断开,channel对应的长id为:acde48fffe001122-0000021d-00000001-a2651c27e64fc656-a01ff1fe
2019-10-22 08:13:41.648  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 客户端断开,channel对应的短id为:a01ff1fe

从日志能够看到,处理的流程为先将handler添加到管道pipeline中(handlerAdded),而后是注册到EventLoop的Seletor多路复用器里(channelRegistered),而后是有链接进来,激活状态(channelActive),读取客户端发送过来的消息,而后是读取完毕,次数客户端再也不发送消息,服务端每2秒进入一次读空闲状态,每4秒进入一次写空闲状态,等到第10秒的时候,读写都没有,服务端主动断开客户端的链接,进入无链接,非活跃状态。channel未注册到EventLoop线程中。最终removed完成后,打印断开的channel的id。此处能够看到,客户端要想不被断开链接,就必须主动发送心跳链接的检测信号。

相关文章
相关标签/搜索