Netty5 Write和Flush事件处理过程_源码讲解

欢迎你们关注个人微博 http://weibo.com/hotbain 会将发布的开源项目技术贴经过微博通知你们,但愿你们可以互勉共进!谢谢!也很但愿可以获得你们对我博文的反馈,写出更高质量的文章!!java

write处理流程git


业务逻辑handler调用context的write方法,将欲发送的数据发送到带发送缓冲区中.github

看看write流程的触发代码(就是在一个业务handler中调用一下write方法便可):数组

public class DiscardServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(final ChannelHandlerContext ctx,final  Object msg) throws Exception {
        ByteBuf bufferBuf =(ByteBuf)msg;
    System.out.println(new String(bufferBuf.array()));
    ctx.channel().write(bufferBuf);
}

追踪一下,ctx.channel().write(bufferBuf)的实现(假设out pipeline中没有其余的encode handler了,),咱们会看到,最终会由AbstractUnsafe(AbstractUnsafe是channel的一个内部类对象)的write方法(很好找,顺着找就好了,记住,默认pipeline一定会有tail和head两个handler)进行处理,上代码:promise

 public void write(Object msg, ChannelPromise promise) {
            if (!isActive()) {
                // Mark the write request as failure if the channel is inactive.
                if (isOpen()) {
                    promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
                } else {
                    promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
                }
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
            } else {//往缓存中添加一个消息对象
                outboundBuffer.addMessage(msg, promise);
            }
        }

这里关注一下outboundBuffer.addMessage() 到此处,你们就会恍然大悟,知道怎么回事儿了,就是这样,仅仅将要写入的message object写入到一个buffer中。下面咱们来看一下outboundBuffer.addmessage的实现。缓存

注意: outboundBuffer是一个ChannelOutboundBuffer类型的兑现,每个channel都会一个ChannelOutboundBuffer对象与之关联,用来盛放欲发送的消息.上代码证实一切:网络

 protected abstract class AbstractUnsafe implements Unsafe {
    
        private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this);
        private boolean inFlush0;
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {


    private MessageSizeEstimator.Handle estimatorHandle;

    private final Channel parent;
    private final ChannelId id = DefaultChannelId.newInstance();
    private final Unsafe unsafe;//channel中有Unsafe引用
    private final DefaultChannelPipeline pipeline;
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
   /**省略部分代码***/
    private final EventLoop eventLoop;

Unsafe对象里有一个outboundBuffer ,而channel里有个unsafe引用,因此能够说,channel与outboundBuffer有has-a关系

socket

看一下ChannelOutboundBuffer outboundBuffer的addMessage实现:
ide

 void addMessage(Object msg, ChannelPromise promise) {
        //预测message的size
        int size = channel.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
        //建立一个Entry对象,一个entry就是一个欲发送的message以及描述信息
        Entry e = buffer[tail++];
        e.msg = msg;
        e.pendingSize = size;
        e.promise = promise;
        e.total = total(msg); //由此能够看出total和pendingSize是相等的
        tail &= buffer.length - 1;
        if (tail == flushed) {//扩展容量
            addCapacity();
        }
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(size);//更新一下,欲发送的字节大小。
    }

都很容易看懂,就是对欲发送的message封装成entry后,将其注册到一个链表中,若是链表大小不够用的话就调用addCapacity进行扩容。下面咱们看一下,addCapaciry()方法的实现,上代码:oop

 private void addCapacity() {
//更新链表的标志位
        int p = flushed;
        int n = buffer.length;
        int r = n - p; // number of elements to the right of p 剩余没有刷出去的
        int s = size();

        int newCapacity = n << 1;//扩大一倍 注意   哈哈 
        if (newCapacity < 0) {
            throw new IllegalStateException();
        }

        Entry[] e = new Entry[newCapacity];//建立对象数组。扩容后的哦!
        System.arraycopy(buffer, p, e, 0, r);
        System.arraycopy(buffer, 0, e, r, p);//拷贝的和为拷贝的数据(不包括二进制bytebuf数据哦)都要进行复制
        for (int i = n; i < e.length; i++) {//将e数组中n到
            e[i] = new Entry();
        }

        buffer = e;
        flushed = 0;
        unflushed = s;
        tail = n;
    }

哈哈 很容易理解的,就是对数组进行扩展。而后复制

到目前为止,咱们已经讲解完了,write的的处理流程。咱们把message放入到buffer中,目的是为了将其发送到目的socket的内核缓冲区中,何时发送(固然是对应的socket发送write可写事件的时候)呢? 当writer事件发送的时候,就是咱们将缓冲起来的message flush到socket的内核缓冲区的时候了!!如今开始下一主题:

Flush处理流程

刚才已经说道flush的发生,意味着socket的write事件发生了,因而咱们天然而然的就想到了NioEventLoop的处理write事件的代码块,上代码:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            /**简洁起见,省略部分代码**/
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//对于半包消息进行输出操做
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            /**简洁起见,省略部分代码*/
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

从上面能够看到实际上worker线程也是调用的当前遍历的socketChannel的unsafe的forceFlush方法。直接上代码看具体实现(最终会调用到AbstractUnsafe的force0方法):

protected void flush0() {
            if (inFlush0) { //若是对于一个channel正在进行刷新
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;//若是缓存为空,则直接返回
            }

            inFlush0 = true;//设置正在刷新标志位

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
                    } else {
                        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                outboundBuffer.failFlushed(t);
            } finally {
                inFlush0 = false;
            }
        }

不用多说,若是write事件发生,可是缓冲区为空得话,那么就会直接返回,若是不是的话,再去调用doWrite方法。直接上代码(doWrite是个抽象方法,由NioSocketChannel实现,本身想为何选这个类!!能够看源代码!从链接的建立开始看哦!我有写链接建立的博文哦! 本身看吧! ):

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {//不断写入---将全部的数据flush到内核网络缓冲区中
            // Do non-gathering write for a single buffer case.
            final int msgCount = in.size(); //对于只有一个message的状况,直接写便可
            if (msgCount <= 1) {
                super.doWrite(in);
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();//将全部的消息转换成java.nio.ByteBuf数组
            if (nioBuffers == null) {//msg不是ByteBuf类型,则也不须要采用gathering write的方式,能够直接调用父类AbstractNioByteChannel的doWrite方法
                super.doWrite(in);
                return;
            }

            int nioBufferCnt = in.nioBufferCount(); //buffer的个数
            long expectedWrittenBytes = in.nioBufferSize();//总的buffer的byte的数量

            final SocketChannel ch = javaChannel();
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                //虽然是每次都是针对的messag进行遍历,可是写入的时候确实针对的全体内部的buffer进行遍历
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//localWrittenBytes写入的字节数目可能超过了本次遍历的msg的buffer的承载的个数,因此若是写回半包得话,须要比较current.msg的大小与此值的大小
                if (localWrittenBytes == 0) {
                    setOpWrite = true;//中止写入,多是由于socket如今不可写了
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {//若是但愿写入的数量为0,代表所有写入完毕
                    done = true;
                    break;
                }
            }

            if (done) {//若是所有发送完毕
                // Release all buffers
                for (int i = msgCount; i > 0; i --) {
                    in.remove();//删除buffer数组中的元素--挨个删除 代码 888
                }

                // Finish the write loop if no new messages were flushed by in.remove().
                if (in.isEmpty()) {//若是所有完成的话,,那么就将其写半包标志删除
                    clearOpWrite();
                    break;
                }
            } else {
                // Did not write all buffers completely.
                // Release the fully written buffers and update the indexes of the partially written buffer.
                //若是没有所有刷写完成,那么就释放已经写入的buffer,更新部分写入的buffer的索引
                for (int i = msgCount; i > 0; i --) { //代码 999
                    final ByteBuf buf = (ByteBuf) in.current();//获得当前的直接内存,current()方法调用的不是很深,由于当前的msg已经通过niobuffers转换成直接类型的了,因此基本上会直接返回。
                    final int readerIndex = buf.readerIndex();
                    final int readableBytes = buf.writerIndex() - readerIndex;//获得可写的数据字节数

                    if (readableBytes < writtenBytes) {//若是写入的部分大于当前缓存指针的大小的话,那么就将其释放
                        in.progress(readableBytes);
                        in.remove();//移动指针,移动到下一个buffer中
                        writtenBytes -= readableBytes;
                    } else if (readableBytes > writtenBytes) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);//从新设置当前的buffer的大小
                        in.progress(writtenBytes);
                        break;
                    } else { // readableBytes == writtenBytes  写入的部分
                        in.progress(readableBytes);
                        in.remove();//直接移除(实际上是删除引用个数)
                        break;
                    }//代码 1000
                }

                incompleteWrite(setOpWrite);//注册write兴趣事件
                break;
            }
        }
    }

这个方法是flush的核心代码,可是代码很长,在此咱们讲一下基本流程,而后再讲几个重要方法是干吗的!!:

流程: 

  1. 判断一下outbuffer欲发送的message的大小,若是为1的话,调用父类的doWriter方法.

  2. 而后调用outbuffer的nioBuffers方法,niobuffers方法主要就是对outbuffer中的bytebuffer解包(由于一个buffer可能会封装多个buffer)、发送字节数的统计(expectedWrittenBytes)、底层最小单位缓冲对象的个数(nioBufferCnt)统计。

  3. 调用java的原生API进行数据写入,ch.write(nioBuffers, 0, nioBufferCnt).注意 写入次数是能够经过WriteSpinCount配置项限制的哦!! 该配置项咱们能够在初始化或者运行的过程当中经过调用ctx.channel().config()进行配置或者更改。

  4. 若是全发送完成,那么就将缓冲区中的缓冲对象依次清空 见 代码 888

  5. 若是没有所有发送,就将所有刷出的bytebuf释放(至于怎样释放,会写一个专门的Netty内存管理的文章),仅仅发送一部分的byte的bytebuf的readerindex进行更改。 见代码  999和1000之间的代码

 

咱们粘贴一下nioBuffers的实现,很简单:

public ByteBuffer[] nioBuffers() {
        long nioBufferSize = 0;//用来记录全部须要发送的数据的字节大小
        int nioBufferCount = 0;//用来记录最底层的buffer的个数多少
        final int mask = buffer.length - 1;
        final ByteBufAllocator alloc = channel.alloc();//用于将heap缓冲转换成direct类型缓冲
        ByteBuffer[] nioBuffers = this.nioBuffers; //底层的niobuffer,会对buffer的进行一个解包操做
        Object m;
        int i = flushed;
        while (i != unflushed && (m = buffer[i].msg) != null) {//逐个遍历即将发送的bytebuf数据
            if (!(m instanceof ByteBuf)) {
                this.nioBufferCount = 0;
                this.nioBufferSize = 0;
                return null;
            }
            //
            Entry entry = buffer[i];
            ByteBuf buf = (ByteBuf) m;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;//能够读取的字节数组

            if (readableBytes > 0) {
                nioBufferSize += readableBytes;
                int count = entry.count;//获得低层的buffer的个数
                if (count == -1) {
                    entry.count = count = buf.nioBufferCount();
                }
                //总的buffer的个数
                int neededSpace = nioBufferCount + count;
                if (neededSpace > nioBuffers.length) {//若是buffer的个数超过了nioBuffers的length进行扩张,按照2倍的系数扩张
                    this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                }

                if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
                    if (count == 1) {//没有封装内部的缓存
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                            // derived buffer
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        nioBuffers[nioBufferCount ++] = nioBuf;
                    } else {//内部有多个buffer
                        ByteBuffer[] nioBufs = entry.buffers;
                        if (nioBufs == null) {
                            // cached ByteBuffers as they may be expensive to create in terms of Object allocation
                            entry.buffers = nioBufs = buf.nioBuffers();//获得内部缓存
                        }
                        //进行解压,并返回内部的全部的缓存的个数(解压后的哦)
                        nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                    }
                } else {
                    nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,//将heap缓存转换层direct类型
                            readableBytes, alloc, nioBuffers, nioBufferCount);
                }
            }
            i = i + 1 & mask;
        }
        this.nioBufferCount = nioBufferCount;
        this.nioBufferSize = nioBufferSize;

        return nioBuffers;

很容易看明白,就对数据的统计。例如bytebuf的个数、发送的字节数的统计

线面咱们来看一下 current()方法的实现:

  public Object current() {
        return current(true);
    }

    /**
     * 将当前即将要刷出的数据写入到directByteBuf中
     * @param preferDirect
     * @return
     */
    public Object current(boolean preferDirect) {
        if (isEmpty()) { //若是缓存为空,则直接返回
            return null;
        } else {
            // TODO: Think of a smart way to handle ByteBufHolder messages
            Object msg = buffer[flushed].msg;//获得即将要刷新的数据缓冲区--buffer[flushed]表示即将要刷新的数据缓冲去
            if (threadLocalDirectBufferSize <= 0 || !preferDirect) {//若是线程中没有直接内存缓冲区可用,不喜欢用堆外缓存
                return msg;
            }
            if (msg instanceof ByteBuf) { //由此能够看出message必须是bytebuf类修的
                ByteBuf buf = (ByteBuf) msg;
                if (buf.isDirect()) {//是否是直接内存中分配的
                    //对于nioBuffers以后,已经将全部bytebuf所有转换成direct类型的了!! 
                    return buf;
                } else {
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) { //若是没有了的话,就直接返回
                        return buf;
                    }

                    // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
                    // We can do a better job by using our pooled allocator. If the current allocator does not
                    // pool a direct buffer, we use a ThreadLocal based pool.
                    /**
                     * 非直接内存会被拷贝的jdk本身的内部的直接缓冲区中。咱们能够经过具备池子功能的分配器来将工做作的更好。
                     * 若是当前的分配器没有起到一个缓冲直接内存的做用得话,那么咱们就会使用基于线程的threadLocal的池子
                     * */
                    ByteBufAllocator alloc = channel.alloc();//获得内存分配器
                    ByteBuf directBuf;
                    if (alloc.isDirectBufferPooled()) {//是否为直接内存---分配的内存是否为直接缓冲做用的
                        directBuf = alloc.directBuffer(readableBytes);
                    } else {//不然的话,就用与线程绑定的ThreadLocalPooledByteBuf进行二进制数据分配
                        directBuf = ThreadLocalPooledByteBuf.newInstance(); //从当前线程栈中获取一个bytebuf--ByteBuffer.allocateDirect(initialCapacity)
                    }
                    //进行必要的数据拷贝--将堆内的数据拷贝到直接内存中
                    directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
                    current(directBuf);//将原先非direct类型的内存释放,而且替换成 direct的bytebuf
                    return directBuf;
                }
            }
            return msg;
        }
    }

    /**
     * Replace the current msg with the given one.
     * The replaced msg will automatically be released  用一个指定的buffer去替换原先的buffer msg对象。呗替换的对象会自动释放
     */
    public void current(Object msg) {
        Entry entry =  buffer[flushed];
        safeRelease(entry.msg);//
        entry.msg = msg;
    }

从上面的代码中咱们会看到,netty会将全部的byteBuf解包后,所有转换成直接类型的内存,而后再发送。

到目前为止,咱们已经将必要的flush和write时,发生的事件进行必要的阐述。固然也有几个方法没有讲到,不过本人以为没有必要再去讲解,由于那样得话,太冗余了。

给一下write事件的流程图(稍后给出):

给一下flush处理的流程图(稍后给出):

可是还有几个方面你们煮注意一下:

  1. 发送完成的bytebuf内存是怎样释放的?

  2. 为何要推荐使用direct类型的内存进行推送?

    关于上面的两个问题不是这篇文章要讲述的内容,本人会写一个关于Netty5内存管理的博文,来详细的讲述上面两个问题,让你们吐槽一下!!

本文是本人学习Netty后的我的总结的分享,须要说明的是本人仅仅表明我的的见解,不是什么官方权威的版本,若是有不当之处,还请赐教!!欢迎吐槽!!

相关文章
相关标签/搜索