Netty5 Read事件处理过程_源码讲解

欢迎你们关注个人微博 http://weibo.com/hotbain 会将发布的开源项目技术贴经过微博通知你们,但愿你们可以互勉共进!谢谢!也很但愿可以获得你们对我博文的反馈,写出更高质量的文章!!java

Netty是对Nio的一个封装,关于网络的全部操做都是经过事件的方式完成的。例如链接建立、read事件、write事件都是经过Nio来完成 的。那netty是怎么启动监听的呢? 在什么地方启动的呢?此处不为你们设置悬念,一次性告诉你们。经过循环扫描的方式来实现监听的。具体的方法类位于NioEventLoop的run方法中 (赶忙进去看看吧!! 浅显易懂)。
git

下面是netty的acceptor线程建立链接的代码。位于类NioEventLoop的processSelectedKey中(至于 processSelectedKey是怎么被调用的,本身看看调用链就好了(eclipse用ctrl+Shift+H就能够查看到选中方法的调用 链))。github

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
 
        try {
            //获得当前的key关注的事件
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //一个刚刚建立的NioServersocketChannel感兴趣的事件是0。
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//能够读取操做  --对于serverSocket来讲就是acceptor事件、对于socketChannel来讲就是read事件 
                //INFO: channel类型为io.netty.channel.socket.nio.NioSocketChannel unsafe类型为io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe
                Object obj = k.attachment();//获得NioServerSocketChannel或者NioSocketChannel
                if(obj instanceof NioServerSocketChannel){
                    System.out.println(obj.getClass().getName()+ " 开始接收链接");
                }else{
                    System.out.println(obj.getClass().getName()+ " 开始接收字节");
                }
                //不一样的socketChannel对于那个的unsafe是不一样的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe
                unsafe.read();//对于接受连接或者read兴趣都会添加进入read操做调用serverSocket->NioMessageUnsafe
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//对于半包消息进行输出操做
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
 
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

这里咱们以Read事件的处理(NioByteUnsafe)为线索进行讲解。后续会有基于byte的unsafe进行讲解的(Unsafe不知道为啥要这 么叫,本人也感到挺费解的,不过如今看来感受就是一个工具对象。不要从名称上害怕它)。下面来看NioByteUnsafe(该类是AbstractNioByteChannel的一个内部类)的read方法进行讲 解。直接讲代码(后面也会有图形讲解,方便你们理解):算法

public void read() {
            //获得config对象、pipeline对象
            final ChannelConfig config = config();
            //获得对应的管道对象
            final ChannelPipeline pipeline = pipeline();
            //实际的内存分配器---
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                //建立一个allocHandle对象--AdaptiveRecvByteBufAllocator
                //RecvByteBufAllocator负责内存分配的算法问题 
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                int byteBufCapacity = allocHandle.guess();
                int totalReadAmount = 0;
                do {
                    //多是 direct或者 heap  从与当前socket相关的allocator获得byteBuf数组
//                    byteBuf =allocHandle.allocate(allocator);
                    //每次从内核中读取数据netty都会分配内存
                    byteBuf = allocator.ioBuffer(byteBufCapacity);
                     //得到能够写入的容量的大小
                    int writable = byteBuf.writableBytes(); //分一个多大的内存就从socket中读取多大的数据
                    int localReadAmount = doReadBytes(byteBuf);//从socket中读取数据到bytebuf中
                    if (localReadAmount <= 0) {//发生了读取事件,可是读取的长度是负数,
                        // not was read release the buffer
                        byteBuf.release();//释放到Thread Cache中
                        close = localReadAmount < 0;//是否进行关闭,关键要看读取到的数据的长度是否为-1;
                        break;
                    }
                    //发起读取事件---若是是第一次积累数据的话,那么就会将当前的bytebuf做为累积对象,供继续使用
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;//由pipeline进行byteBuf的释放
                    //避免内存溢出,
                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);//每次读取的消息的数量都会有限制的,也就说,每次处理read事件的消息量是能够配置的
                //读取完成---处理完一次 读取事件
                pipeline.fireChannelReadComplete();
                //对本次读取的数据量进行记录,便于下一次为当前的Channel分配合适大小的buffer
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }
//上述代码段说明:
/**
 config.getRecvByteBufAllocator().newHandle(); 负责内存分配算法
    而
ByteBufAllocator 负责具体的内存分配-分配到堆仍是直接内存
*/

这就是对一个read的处理基本流程,就是将从socket中读取到的放入到分配器分配的bytebuf,而后将其传入到pipeline.fireChannelRead(byteBuf);中,至于在pipeline是怎样的传递的,咱们从这个方法中是没法查看到的。这也是咱们这篇文章的主要内容(别的内存也很重要哦!关键是我已经添加了不少注释了!)。就是要看看在获得bytebuf后,pipeline是怎么处理传入进去的bytebuf的。咱们来对pipeline.fireChannelRead(byteBuf);穷追(ctrl+shift+H eclipse)到具体的实现,数组

咱们发现,最终会调用到的ChannelHandler接口的网络

void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

ChannelHandler有不少咱们具体选用哪个呢?动动脑子就知道,咱们pipeline中存储的都是ChannelHandler,有哪些个Handler,就要看咱们在启动代码中是怎样设置了。来看看个人启动代码(精简版,没有写全,因此这里看不懂得话,建议你写个Netty的小demo).上代码:less

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
//             .option(ChannelOption.ALLOCATOR, )//设置内存分配器
              .option(ChannelOption.SO_SNDBUF, 1024)//发送缓冲器
              .option(ChannelOption.SO_RCVBUF, 1024)
            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收缓冲器
             .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel对应的ChannelPipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//客户端新接入的链接socketChannel对应的ChannelPipeline的Handler
                 @Override
                 public void initChannel(SocketChannel ch) {
                     SocketChannelConfig config=ch.config();
                     ChannelPipeline p = ch.pipeline();
                     p
                     .addLast(new LineBasedFrameDecoder(30))//也会将回车符删除掉--是以换行符做为分隔的
                     .addLast(new DiscardServerHandler());
                 }
             });

由此能够看到,这里第一个被调用的ChannelHandler是LineBasedFrameDecoder。看看LineBasedFrameDecoder是怎么实现ChannelRead方法的。翻看了弗雷以后,咱们终于找到了channelRead方法。由此能够看到,在AbstractNioByteChannel的read方法中的pipeline.fireChannelRead(byteBuf);按照个人启动代码(虽说是按照个人,可是按照大家的也是这样,由于byte在经过网络接收以后,都要进行decode,第一个通过的channelHandler确定是ByteToMessageDecoder,不信,你看看本身的启动代码试试),最终调用的是ByteToMessageDecoder.channelRead()  ,上代码:eclipse

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //缓冲区的大小没有超过须要写入的数据的大小
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());//扩展缓冲区--查看实现后,就是经过分配一个更大的,而后复制一下字节数据
                    }
                    cumulation.writeBytes(data);//将数据写入到积累对象中
                    data.release();//释放bytebuffer(heap或者direct)--经过引用的方式进行释放缓冲区(至于什么是引用方式释放,咱们会有一个特定的章节进行讲解)
                }
                //收集完毕以后解析收集到的字符串---一般调用子类的方法实现,在具体实现中,用out来承载解析出来的msg
                callDecode(ctx, cumulation, out);//实现的时候,不要释放咱们的累积对象cumulation
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
            if (cumulation != null && !cumulation.isReadable()) {//若是累积对象为null或者没有可读内容的话,那么就将累积对象释放掉(由于空了或者为null了)
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();//代码 11
                decodeWasNull = size == 0;
                //针对解析后的out结果中的msg的对象,将解析出来的message(具体的类型,请本身看实现.是怎样作的)传递到pipeline中。
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();//代码  22
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

提示: 一个pipeline,为某个socketChannel全部,也就是说pipeline里的channelHandler,也是为某个socketchannel所享用的。不会出现多个线程共享一个channelHanler的状况(咱们可让他们共享一个handler,可是咱们得保证这个共享的handler是一个无状态的handler,例如咱们如今就要讲解的ByteToMessageDecoder就是一个有状态的handler,因此就不能共享,就要在每次初始化socketChannel的pipeline时,都要从新new一个ByteToMessageDecoder,不信你们,能够能够看一下ByteToMessageDecoder的实现。我直接粘贴代码吧!!(看看个人注释哦)以下:).socket

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {

    ByteBuf cumulation;//由于单词cumulation --累积 意思,也就是,这个成员对象,就是用来做为半包的累积存储的对象来使用的
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
}

下面咱们看一下callDecode()是怎样完成的,上代码,ide

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {//传入的字节是否有可读数据
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {//若是此handler被移除
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

上面的代码,很容易易理解,就是进行必要的校验,其中最惹人眼的就是decode()方法,而decode方法该类中是抽象方法:

/**
     * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
     * {@link ByteBuf} has nothing to read anymore, till nothing was read from the input {@link ByteBuf} or till
     * this method returns {@code null}.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added

     * @throws Exception    is thrown if an error accour
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

咱们来看一下具体实现有哪些?见下图

咱们发现ByteToMessageDecoder的的decode子类实现有好多,咱们为了讲解的方便咱们选择使用,FixedLengthFrameDecoder做为研究对象。至于别的decoder你们有时间本身去看一下吧1!!(很简单的,不要惧怕).

上代码(FixedLengthFrameDecoder.decode方法):

 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);//out是外界传入的一个用来盛放解析出来的message对象的容器
        }
    }

此处调用了自有的decode方法,上代码:

这里咱们看到若是能够读取的数据长度没有要求的长度搞的话,那么就会以传入的ByteBuf参数(其实这里就是那个累积对象)为基础,构建一个新的ByteBuf。

下面咱们这里特地大概翻译下in.readBytes(frameLength)方法的注释是怎样的状况.

ByteBuf io.netty.buffer.ByteBuf.readBytes(int length)

Transfers this buffer's data to a newly created buffer starting at the current readerIndex and increases the readerIndex by the number of the transferred bytes (= length). The returned buffer's readerIndex and writerIndex are 0 and length respectively.

  • Returns:

  • the newly created buffer which contains the transferred bytes 该方法返回的是新建立的盛有传输数据的直接缓冲对象

将当前的ByteBuf对象的数据传输到一个刚刚建立的ByteBuf,就是从readerindex开始,而后增长ReaderIndex的值,增长length个字节数。返回的字节的的readerindex和writerindex分别是0和length。

经过阅读上面的注释的阅读,咱们能够看到,就FixedLengthFrameDecoder解析器来讲,其实累积对象对readerIndex进行了改变。也就是说,累积对象能够读取的数据的数据量是发生变化的(咱们能够在源代码中看一下在decode先后,readerindex是否发生了变化,观察一下就知道了。这个很简单哦,看一下我是怎么知道这一点的,见下图)。至于除了FixedLengthFrameDecoder以外的别的decoder是否也改变了readerindex,你们能够去具体查看一下代码(不过我我的以为确定都是这么作滴!!)。

----------------------------------------------------------------------------------------------------------

累积对象的内存释放问题讲解完了(其实很简单,就是把readerindex改变了一下,具体长度就看解析出来的message的长度了,哈哈)。

讲到这里,会涉及到一个解析出来的message在被pipeline中的其它handler处理完毕后的内存释放问题。怎么解决? 何时释放这些message占用的空间呢? 

咱们从上面代码11 和代码22之间的代码能够看出,就是在将子类解析出来的msg,传入到后续的( 由于当前的decoder Handler负责将大的ByteBuf累积对象转换成小的后续handler能够理解的msg对象,数据这个msg对象是个什么类型,就要看子类是怎么将什么类型的msg放入到out盛放容器的了 )handler中。由此能够看出: 对于一个socketChannel,其message的处理顺序不会出现错乱。永远都是先处理完前一个,而后才是后一个,由于这是在一个线程里依次处理全部的msg的。

message是在何时释放呢? 仍是看  代码11 和代码22之间的代码  你们能够本身去看看代码。我发现就是被丢了,被JVM回收了,没有重复利用。我我的以为能够重复利用。关于这个问题,你们回去本身理解一下吧!!有时间得的话,我也会专门将一下的。毕竟这篇文章是讲Netty read事件处理的。不是将netty内存分配的。放心我不会忘记这个问题的。我会在后续的文章中讲解的。欢迎你们吐槽!!!

本文是本人学习Netty后的我的总结的分享,须要说明的是本人仅仅表明我的的见解,不是什么官方权威的版本,若是有不当之处,还请赐教!!欢迎吐槽!!

相关文章
相关标签/搜索