ChannelOutboundBuffer是Netty发送缓存,当Netty调用write时数据不会真正的去发送而是写入到ChannelOutboundBuffer缓存队列,直到调用flush方法Netty才会从ChannelOutboundBuffer取数据发送。每一个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每一个客户端链接上服务端都会建立一个ChannelOutboundBuffer绑定客户端Channel。Netty设计ChannelOutboundBuffer是为了减小TCP缓存的压力提升系统的吞吐率。java
先来看下ChannelOutboundBuffer的4个重要字段数组
private Entry flushedEntry; 待发送数据起始节点 private Entry unflushedEntry;暂存数据起始节点 private Entry tailEntry;尾节点 private int flushed;待发送数据个数
Entry(flushedEntry) -->Entry--> ... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)
flushedEntry(包括)到unflushedEntry之间的就是待发送数据,unflushedEntry(包括)到tailEntry就是暂存数据,flushed就是待发送数据个数。promise
正常状况下待发送数据发送完成后会flushedEntry指向unflushedEntry位置,并将unflushedEntry指空变成以下状况:缓存
Entry(flushedEntry) -->Entry ... Entry--> Entry(tailEntry)
可是若是出现TCP缓存满的致使的半包状况,flushedEntry不会向后移动或移动发送成功的个数个位置,例如发送成功了一个数据,就会向前移动一个位置,出现以下状况:源码分析
Entry(flushedEntry) -->... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)
下面介绍ChannelOutboundBuffer中几个主要的方法this
addMessage方法,功能是添加数据到队列的队尾。线程
addFlush方法,准备待发送的数据,在flush前须要调用。设计
nioBuffers方法,获取待发送数据,发送数据的时候须要调用拿数据。指针
removeBytes方法,发送完成后须要调用删除已经写入TCP缓存成功的数据。code
下面对几个方法源码进行分析
addMessage方法是在系统调用write方法的时候调用
public void addMessage(Object msg, int size, ChannelPromise promise) { //将消息数据包装成Entry对象 Entry entry = Entry.newInstance(msg, size, total(msg), promise); //队列为空的状况 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; //非空状况,将新节点放尾部 } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } //若是unflushedEntry为空,设置暂时还不须要数据起始节点 if (unflushedEntry == null) { unflushedEntry = entry; } // 增长待发送的总字节数 incrementPendingOutboundBytes(size, false); }
流程以下
第1步消息数据包装成Entry对象,内部实现不是直接建立一个新的Entry,而是对已经回收的Entry的重复利用,来看下代码:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; } public final T get() { if (maxCapacity == 0) { return newObject(NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
看下RECYCLER.get()实现,若是maxCapacity配置成0就直接建立一个新的Entry,默认maxCapacity默认是256,因此默认状况下会用ThreadLocalMap获取一个stack,stack里存的都是原先回收的handle,stack线回到pop一个被回收的handle,若是stack为空则建立一个新的handle,而后返回handle.value即Entry对象。RECYCLER.get()获取到entry后会对entry从新赋值。
addFlush方法是在系统调用flush方法的时候调用
public void addFlush() { //获取暂存数据 Entry entry = unflushedEntry; //暂存数据不为空,说明还有数据能够发送 if (entry != null) { //将待发送数据起始指针flushedEntry指向暂存起始节点 if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { //增长发送节点个数 flushed ++; //锁定当前发送节点,防止其取消 if (!entry.promise.setUncancellable()) { //若是锁定失败,关闭节点,获取节点时会自动过滤 int pending = entry.cancel(); // 减小待发送的总字节数跟incrementPendingOutboundBytes方法想对应 decrementPendingOutboundBytes(pending, false, true); } //获取下个节点 entry = entry.next; } while (entry != null); //清空unflushedEntry指针 unflushedEntry = null; } }
以上方法的主要功能就是将暂存数据节点变成待发送节点,从上文内容知道须要发送的数据,是flushedEntry指向的节点到unflushedEntry指向的节点(不包含unflushedEntry)的之间的节点数据,因此下次发送要将flushedEntry指向unflushedEntry指向的节点做为发送数据的起始节点。
结合代码:
nioBuffers方法是在系统调用addFlush方法完成后调用
public ByteBuffer[] nioBuffers() { long nioBufferSize = 0; int nioBufferCount = 0; final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //获取原生ByteBuffer数组,这里的ByteBuffer是相同线程共享的 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); //获取待发送数据起始节点 Entry entry = flushedEntry; //循环取数据,isFlushedEntry是判断待发送数据节点 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { //若是节点被关闭则忽略次节点 if (!entry.cancelled) { //获取节点里的ByteBuf ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); //获取可发送ByteBuf总字节数 final int readableBytes = buf.writerIndex() - readerIndex; //可发送字节大于0继续不然跳过 if (readableBytes > 0) { //每次累计的发送字节数,不能大于Integer.MAX_VALUE if (Integer.MAX_VALUE - readableBytes < nioBufferSize) { break; } //累计的发送字节数 nioBufferSize += readableBytes; //获取entry中ByteBuffer的个数 int count = entry.count; if (count == -1) { entry.count = count = buf.nioBufferCount(); } int neededSpace = nioBufferCount + count; //nioBuffers数组没法知足存放个数需求扩容处理 if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } //若是只有1个直接获取ByteBuffer放入nioBuffers数组中 if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount ++] = nioBuf; ///若是有多个循环获取ByteBuffer放入nioBuffers数组中 } else { ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { entry.bufs = nioBufs = buf.nioBuffers(); } nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
以上方法的主要功能就是获取须要发送数据并转成原生的ByteBuffer数组类型,ByteBuffer数组这里是相同线程共享的,也就是说一个客户端跟服务端通信会使用相同的ByteBuffer数组来发送数据,这样减小了空间建立和销毁时间消耗。
结合代码:
第1步的相同线程数据共享的实现原理是一种类ThreadLocal的实现,原生的ThreadLocal里是使用ThreadLocalMap来存储数据,而Netty设计了一种读取更快的InternalThreadLocalMap来存数据,ThreadLocalMap里存储数据是用线性探测法解决冲突,致使的结果就是一次hash不必定找到数据。而InternalThreadLocalMap里数据存储的位置是固定不变的,因此一次就能获取数据,然而致使的结果就是部分空间的浪费,很明显,这是一种空间换时间的作法。
removeBytes方法是在系统调用nioBuffers方法并完成发送后调用
public void removeBytes(long writtenBytes) { for (;;) { //获取flushedEntry指向的节点数据 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //获取读取的起始位置 final int readerIndex = buf.readerIndex(); //计算整个节点的数据字节长度 final int readableBytes = buf.writerIndex() - readerIndex; //若是整个节点的数据字节长度比发送成功的总字节长度小,删除整个节点 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); //不然缩小当前节点的可发送字节长度 } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } //清理ByteBuffer数组 clearNioBuffers(); }
以上方法的主要功能就是移除已经发送成功的数据,移除的数据是从flushedEntry指向的节点开始遍历链表移除,移除数据分2种状况:
结合代码:
ChannelOutboundBuffer是没有容量限制的,在极端状况下若是ChannelOutboundBuffer消耗比较慢而ChannelOutboundBuffer写入过大会致使OOM,Netty在处理里提供了ChannelWritabilityChanged方法,此方法会在ChannelOutboundBuffer的容量超过最高限额或者小于最低限额会被调用,用户能够实现次方法来监控容量的报警,来解决容量过大问题。