Netty源码分析之ChannelOutboundBuffer

ChannelOutboundBuffer介绍

ChannelOutboundBuffer是Netty发送缓存,当Netty调用write时数据不会真正的去发送而是写入到ChannelOutboundBuffer缓存队列,直到调用flush方法Netty才会从ChannelOutboundBuffer取数据发送。每一个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每一个客户端链接上服务端都会建立一个ChannelOutboundBuffer绑定客户端Channel。Netty设计ChannelOutboundBuffer是为了减小TCP缓存的压力提升系统的吞吐率。java

ChannelOutboundBuffer设计

先来看下ChannelOutboundBuffer的4个重要字段数组

private Entry flushedEntry; 待发送数据起始节点
private Entry unflushedEntry;暂存数据起始节点
private Entry tailEntry;尾节点
private int flushed;待发送数据个数
Entry(flushedEntry) -->Entry--> ... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)

flushedEntry(包括)到unflushedEntry之间的就是待发送数据,unflushedEntry(包括)到tailEntry就是暂存数据,flushed就是待发送数据个数。promise

正常状况下待发送数据发送完成后会flushedEntry指向unflushedEntry位置,并将unflushedEntry指空变成以下状况:缓存

Entry(flushedEntry) -->Entry ... Entry--> Entry(tailEntry)

可是若是出现TCP缓存满的致使的半包状况,flushedEntry不会向后移动或移动发送成功的个数个位置,例如发送成功了一个数据,就会向前移动一个位置,出现以下状况:源码分析

Entry(flushedEntry) -->... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)

下面介绍ChannelOutboundBuffer中几个主要的方法this

  • addMessage方法,功能是添加数据到队列的队尾。线程

  • addFlush方法,准备待发送的数据,在flush前须要调用。设计

  • nioBuffers方法,获取待发送数据,发送数据的时候须要调用拿数据。指针

  • removeBytes方法,发送完成后须要调用删除已经写入TCP缓存成功的数据。code

下面对几个方法源码进行分析

addMessage方法源码分析

addMessage方法是在系统调用write方法的时候调用

public void addMessage(Object msg, int size, ChannelPromise promise) {
        //将消息数据包装成Entry对象
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        //队列为空的状况
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        //非空状况,将新节点放尾部
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        //若是unflushedEntry为空,设置暂时还不须要数据起始节点
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // 增长待发送的总字节数
        incrementPendingOutboundBytes(size, false);
    }

流程以下

  • 1.将消息数据包装成Entry对象。
  • 2.若是队列为空,直接设置尾节点为当前节点,不然将新节点放尾部。
  • 3.unflushedEntry为空说明不存在暂时不须要发送的节点,当前节点就是第一个暂时不须要发送的节点。
  • 4.CAS方式增长未发送节点字节数。

第1步消息数据包装成Entry对象,内部实现不是直接建立一个新的Entry,而是对已经回收的Entry的重复利用,来看下代码:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }
    
        public final T get() {
        if (maxCapacity == 0) {
            return newObject(NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

看下RECYCLER.get()实现,若是maxCapacity配置成0就直接建立一个新的Entry,默认maxCapacity默认是256,因此默认状况下会用ThreadLocalMap获取一个stack,stack里存的都是原先回收的handle,stack线回到pop一个被回收的handle,若是stack为空则建立一个新的handle,而后返回handle.value即Entry对象。RECYCLER.get()获取到entry后会对entry从新赋值。

addFlush方法源码分析

addFlush方法是在系统调用flush方法的时候调用

public void addFlush() {
        //获取暂存数据
        Entry entry = unflushedEntry;
        //暂存数据不为空,说明还有数据能够发送
        if (entry != null) {
            //将待发送数据起始指针flushedEntry指向暂存起始节点
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                //增长发送节点个数
                flushed ++;
                //锁定当前发送节点,防止其取消
                if (!entry.promise.setUncancellable()) {
                    //若是锁定失败,关闭节点,获取节点时会自动过滤
                    int pending = entry.cancel();
                    // 减小待发送的总字节数跟incrementPendingOutboundBytes方法想对应
                    decrementPendingOutboundBytes(pending, false, true);
                }
                //获取下个节点
                entry = entry.next;
            } while (entry != null);

            //清空unflushedEntry指针
            unflushedEntry = null;
        }
    }

以上方法的主要功能就是将暂存数据节点变成待发送节点,从上文内容知道须要发送的数据,是flushedEntry指向的节点到unflushedEntry指向的节点(不包含unflushedEntry)的之间的节点数据,因此下次发送要将flushedEntry指向unflushedEntry指向的节点做为发送数据的起始节点。

结合代码:

  • 1.先获取unflushedEntry指向的暂存数据的起始节点
  • 2.将待发送数据起始指针flushedEntry指向暂存起始节点
  • 3.经过promise.setUncancellable()锁定待发送数据,反正发送过程当中取消,若是锁定过程当中发现其节点已经取消,则调用entry.cancel()取消节点发送,并减小待发送的总字节数。

nioBuffers方法源码分析

nioBuffers方法是在系统调用addFlush方法完成后调用

public ByteBuffer[] nioBuffers() {
        long nioBufferSize = 0;
        int nioBufferCount = 0;
        final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        //获取原生ByteBuffer数组,这里的ByteBuffer是相同线程共享的
        ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
        //获取待发送数据起始节点
        Entry entry = flushedEntry;
        //循环取数据,isFlushedEntry是判断待发送数据节点
        while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
            //若是节点被关闭则忽略次节点
            if (!entry.cancelled) {
                //获取节点里的ByteBuf
                ByteBuf buf = (ByteBuf) entry.msg;
                final int readerIndex = buf.readerIndex();
                //获取可发送ByteBuf总字节数
                final int readableBytes = buf.writerIndex() - readerIndex;
                //可发送字节大于0继续不然跳过
                if (readableBytes > 0) {
                    //每次累计的发送字节数,不能大于Integer.MAX_VALUE
                    if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
                        break;
                    }
                    //累计的发送字节数
                    nioBufferSize += readableBytes;
                    //获取entry中ByteBuffer的个数
                    int count = entry.count;
                    if (count == -1) {
                        entry.count = count =  buf.nioBufferCount();
                    }
                    int neededSpace = nioBufferCount + count;
                    //nioBuffers数组没法知足存放个数需求扩容处理
                    if (neededSpace > nioBuffers.length) {
                        nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                        NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                    }
                    //若是只有1个直接获取ByteBuffer放入nioBuffers数组中
                    if (count == 1) {
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        nioBuffers[nioBufferCount ++] = nioBuf;
                    ///若是有多个循环获取ByteBuffer放入nioBuffers数组中
                    } else {
                        ByteBuffer[] nioBufs = entry.bufs;
                        if (nioBufs == null) {
                            entry.bufs = nioBufs = buf.nioBuffers();
                        }
                        nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                    }
                }
            }
            entry = entry.next;
        }
        this.nioBufferCount = nioBufferCount;
        this.nioBufferSize = nioBufferSize;

        return nioBuffers;
    }

以上方法的主要功能就是获取须要发送数据并转成原生的ByteBuffer数组类型,ByteBuffer数组这里是相同线程共享的,也就是说一个客户端跟服务端通信会使用相同的ByteBuffer数组来发送数据,这样减小了空间建立和销毁时间消耗。

结合代码:

  • 1.调用NIO_BUFFERS.get获取原生ByteBuffer数组,这里的ByteBuffer是相同线程共享的。
  • 2.从待发送数据起始节点开始循环处理数据,直至处理到unflushedEntry指向的Entry,或者到最后或者累计的发送字节数大于Integer.MAX_VALUE。
  • 3.处理跳过被关闭的节点。
  • 4.若是ByteBuffer数组太小则进行扩容。
  • 5.将ByteBuf转成ByteBuffer类型存入ByteBuffer数组。
  • 6.处理下个节点。

第1步的相同线程数据共享的实现原理是一种类ThreadLocal的实现,原生的ThreadLocal里是使用ThreadLocalMap来存储数据,而Netty设计了一种读取更快的InternalThreadLocalMap来存数据,ThreadLocalMap里存储数据是用线性探测法解决冲突,致使的结果就是一次hash不必定找到数据。而InternalThreadLocalMap里数据存储的位置是固定不变的,因此一次就能获取数据,然而致使的结果就是部分空间的浪费,很明显,这是一种空间换时间的作法。

removeBytes方法源码分析

removeBytes方法是在系统调用nioBuffers方法并完成发送后调用

public void removeBytes(long writtenBytes) {
        for (;;) {
            //获取flushedEntry指向的节点数据
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }

            final ByteBuf buf = (ByteBuf) msg;
            //获取读取的起始位置
            final int readerIndex = buf.readerIndex();
            //计算整个节点的数据字节长度
            final int readableBytes = buf.writerIndex() - readerIndex;
            //若是整个节点的数据字节长度比发送成功的总字节长度小,删除整个节点
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            //不然缩小当前节点的可发送字节长度
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
        //清理ByteBuffer数组
        clearNioBuffers();
    }

以上方法的主要功能就是移除已经发送成功的数据,移除的数据是从flushedEntry指向的节点开始遍历链表移除,移除数据分2种状况:

  • 1.第一种就是当前整个节点的数据已经发送成功,这种状况的作法就是将整个节点移除便可。
  • 2.第二种就是当前节点部分发送成功,这种状况的作法就是将当前节点的可发送字节数缩短,好比说当前节点有100kb,只发送了30kb,那就将此节点缩短至70kb。

结合代码:

  • 1.获取flushedEntry指向的节点数据。
  • 2.计算整个节点的数据字节长度。
  • 3.若是当前整个节点的数据已经发送成功将整个节点移除,不然将当前节点的可发送字节数缩短。
  • 4.清理ByteBuffer数组。
  • 5.处理下个节点。

总结

ChannelOutboundBuffer是没有容量限制的,在极端状况下若是ChannelOutboundBuffer消耗比较慢而ChannelOutboundBuffer写入过大会致使OOM,Netty在处理里提供了ChannelWritabilityChanged方法,此方法会在ChannelOutboundBuffer的容量超过最高限额或者小于最低限额会被调用,用户能够实现次方法来监控容量的报警,来解决容量过大问题。

相关文章
相关标签/搜索