Netty源码分析——flush流程

Netty源码分析——flush流程java

前言数组

承接上篇写流程,这篇看下flush流程。以前文章中咱们已经提到过,writeAndFlush操做其实是经过pipeline分别进行了write和flush操做。具体咱们就不看了,咱们直接看下flush。promise

flush缓存

flush操做一样是经过pipeline最终传递给HeadContext:unsafe.flush();:socket

123456789101112public final void flush() { //确保不是外部调用 assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } //添加flush节点 outboundBuffer.addFlush(); //把节点里的数据写到socket里 flush0();}oop

最主要的实际上是两个步骤,上文已经标注了,一个就是添加flush节点,一个就是真正的写操做。源码分析

添加flush节点优化

追进去看下:this

1234567891011121314151617public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; }}.net

咱们按照上篇文章的状态来讲,当前调用了两次write的状态是这样的:

 

 

调用完addFlush以后是这样的:

 

 

看到了吗,其实是把flushedEntry和unFlushedEntry交换了一下。

再假设一下,若是咱们在调用flush以前调用了三次write,再调用flush,链表是这样的:

 

 

添加节点以后会继续执行flush0:

123if (!isFlushPending()) { super.flush0();}

看下这个isFlushPending:

12SelectionKey selectionKey = selectionKey();return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;

这里实际上是在校验,当前这个Channel是否有OP_WRITE,若是当前selectionKey是写事件,说明有线程执行flush过程,结合上面的一句:!isFlushPending(),说明若是有线程在进行flush过程,就直接返回。

这里其实我还没吃透,个人疑问是这里是否有必要进行!isFlushPending()判断。由于咱们以前已经说过了,任何flush操做开头都进行了一个校验:assertEventLoop(),说白了,只有Reactor线程能够调用flush,那么当前线程在执行的时候,怎么可能有别的线程进行了flush操做呢?

这个问题我会去debug一下,而后求证一下做者,有结果了会在文章后面加上。

继续看,若是当前channel没有被其余线程操做,这里会调用super.flush0,回到io.netty.channel.AbstractChannel.AbstractUnsafe#flush0里:

12345678910111213141516171819202122232425262728293031323334353637383940if (inFlush0) { // 防止重复调用 return;}// 若是没有数据要flush就返回final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null || outboundBuffer.isEmpty()) { return;}inFlush0 = true;// 若是channel失效,把全部待刷的数据设置为失败if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return;}try { // 真正的写操做 doWrite(outboundBuffer);} catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } }} finally { inFlush0 = false;}

继续看doWrite操做,这里会直接走到io.netty.channel.socket.nio.NioSocketChannel#doWrite里:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849SocketChannel ch = javaChannel();// 获取循环次数,至关于一个自旋,保证写成功int writeSpinCount = config().getWriteSpinCount();do { // 若是buffer里空的,则清理OP_WRITE,防止Reacotr线程再次处理这个Channel if (in.isEmpty()) { clearOpWrite(); return; } // 聚合 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: writeSpinCount -= doWrite0(in); break; case 1: { // 若是只有一个buffer的状况下,直接把这个buffer写进去 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // 多个buffer的状况下,写nioBufferCnt个buffer进去 long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } }} while (writeSpinCount > 0);// 是否写完成incompleteWrite(writeSpinCount < 0);

上面的过程当中,我只是粗略的写了一下过程,其实里面的细节很是多,咱们一点一点来看。

先看着几句:

123int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();

首先获取聚合写的最大字节数,聚合写,在原生NIO的概念中,是把几个Buffer的数据写到一个Channel里,就是说一次最多写多少字节数据。而后进入到nioBuffers方法,这个方法作什么注释上有说:若是缓冲区里全都是ByteBuf,则返回直接NIO缓冲区的Buffer数组(其实就是把ByteBuf里的数据写到原生Buffer里),nioBufferCount和nioBufferSize分别表明返回数组中原生NIO Buffer的数量和NIO缓冲区的可读字节总数。看代码,有点长拆开看:

123456789long nioBufferSize = 0;int nioBufferCount = 0;final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);Entry entry = flushedEntry;while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { //...}return nioBuffers;

先是从当前线程里获取ByteBuffer,这里能够看出,其实ByteBuffer是被缓存了的(若是没有建立一个长度为1024的ByteBuffer数组),不须要每次建立。

而后是循环全部的flushedEntry,这里,咱们回顾一下上面addFlush以后的图,其实循环中会不停地把flushedEntry前移,直到flushedEntry和tailEntry中的节点所有都被处理。isFlushEntry的代码:e != null && e != unflushedEntry;,其实就是,不是unflushedEntry的都是flushedEntry。

这里咱们能够看到,另一个循环的条件就是entry.msg instanceof ByteBuf,说明这个方法只处理ByteBuf。

继续看循环里:

1234567891011121314151617181920212223ByteBuf buf = (ByteBuf) entry.msg;final int readerIndex = buf.readerIndex();final int readableBytes = buf.writerIndex() - readerIndex;// 若是有数据if (readableBytes > 0) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { break; } // NIO Buffer可读字节数+ByteBuf的可读字节数 nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { // 初始化entry的count entry.count = count = buf.nioBufferCount(); } int neededSpace = min(maxCount, nioBufferCount + count); if (neededSpace > nioBuffers.length) { //若是实际须要的空间,比以前获得的ByteBuffer数组数量大,就扩容,而后缓存起来 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); }}

这里说下这个流程,声明一下,ByteBuffer指的是原生Buffer,ByteBuf是Netty本身封装的Buffer。

先拿出来一个Entry的ByteBuf,看看可读的字节多少。而后初始化entry的count,用的是组成这个ByteBuf的ByteBuffer(原生)的数量(这里这个nioBufferCount大部分状况下返回1,及一个ByteBuf对应一个ByteBuffer)。

而后看看须要的空间是多少,须要的空间默认状况下是1024和已有须要的ByteBuffer数量+count(及ByteBuf.nioBufferCount)两者之间的最小值,及,最多搞1024个ByteBuffer。

而后对比须要空间和提供空间,及对比以前分配的ByteBuffer[]的length和须要的ByteBuffer的数量,若是须要的空间大,就扩容(这里能够类比一下ArrayList的扩容)。也就是,无论怎么样,都会分配足够的ByteBuffer使用。

可能这么看起来有点绕,我举个例子:加群617434785获取文中知识点。

假设咱们有八个节点,每一个节点的ByteBuf在Flush的时候,数据都会写入到1个ByteBuffer里,而后咱们开始循环这个八个节点,循环以前我记录一下一共须要多少个ByteBuffer数组(好比叫count,循环前就是0),而后咱们有一个分配给咱们的ByteBuffer数组(好比叫fenpei)。

代码应该是这样的:

123456789101112// 循环八个节点int count = 0;for (Entry e : entries) { ByteBuf b = e.msg; // 这里大部分ByteBuf会返回1 int c = b.nioBufferCount(); count += c; if (count > fenpei.length) { //扩容 expandNioBufferArray(fenpei) }}

这样就比较直接了,再看不懂的。。。emmm,哈哈哈哈哈~

继续看:

12345678910111213141516171819202122232425if (count == 1) { // 1个ByteBuff对应1个ByteBuffer ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // 初始化ByteBuffer, entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } // 放到数组里 nioBuffers[nioBufferCount++] = nioBuf;} else { // 一个ByteBuf对应多个ByteBuffer,初始化多个ByteBuffer,循环放到数组里 ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { entry.bufs = nioBufs = buf.nioBuffers(); } for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) { ByteBuffer nioBuf = nioBufs[i]; if (nioBuf == null) { break; } else if (!nioBuf.hasRemaining()) { continue; } nioBuffers[nioBufferCount++] = nioBuf; }}

这个方法咱们总结一下,就是分配数组,这个数组一开始会初始化一个长度为1024的ByteBuffer数组,若是不够用就扩容,里面的ByteBuffer能容纳的数据,对应每一个节点ByteBuf里有的数据。这个地方其实并无看到ByteBuf向对应的ByteBuffer里写数据的地方,关于这个问题,你们能够跟一下buf.internalNioBuffer(readerIndex, readableBytes)这里,这里是会把数据搞到ByteBuffer里的。

至此分配就结束了,而后继续往下看doWrite,接下来进入了一个switch,条件就是有多少个原生ByteBuffer要写,咱们看看default的状况:

12345678910111213141516// 整个Entry链的可读数据long attemptedBytes = in.nioBufferSize();// 向管道中写数据final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);// 若是写失败了,这个要细说if (localWrittenBytes <= 0) { incompleteWrite(true); return;}// 调整最大聚合写的字节数adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);// remove节点 in.removeBytes(localWrittenBytes);// 自旋次数-1--writeSpinCount;

向channel中写入数据,若是localWrittenBytes < 0,这里是说明这个Channel不可用,其实就是写失败了(或者说没有所有写到Channel里?这个地方存疑,我没有验证过)。

这里写失败了怎么办,咱们看下incompleteWrite,注意入参是true,下面会说到:

1234567891011121314151617// 这里这个setOpWrite就是入参trueif (setOpWrite) { setOpWrite();} else { clearOpWrite(); eventLoop().execute(flushTask);}setOpWrite方法:final SelectionKey key = selectionKey();if (!key.isValid()) { return;}final int interestOps = key.interestOps();if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE);}

若是失败了,就去setOpWrite,其实就是给Channel注册了一个OP_WRITE,而后就return了。

这里为何要打上OP_WRITE呢,打上有什么用呢?还记得上文中我提出的问题么,关于为何flush0以前要进行判断isFlushPending,这里让我细细说来(为何判断isFlushPending的疑惑已经解开了,这篇文章拖了几天,这几天中来来回回的看源码、请教大神、debug。。终于有所突破)。

先放下isFlushPending,继续说这个OP_WRITE。这里还记得何时打上OP_WRITE么,是Channel写失败的时候!咱们先无论这个OP_WRITE的具体含义,就认为是一个标记,标记这个管道不可用,这时候,请问:管道不可用的状况下,若是我还想进行Flush操做,即向管道中写数据,这时候能成功么?答案是不行!怎么优化?太简单了,提早返回就能够了,每次Flush的时候先看看管道是否可用!

到这,isFlushPending的做用就体现出来了!OP_WRITE咱们就把它当成一个普通的标记,若是Channel上有这个标记,就表示不可写。

那么为何用OP_WRITE标识不可写呢,命名OP_WRITE的含义就是可写啊!这里就要说回到Reacotr要作的三件事中的处理select到的事件了,看一下processSelectedKey:

1234if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}

其中有这么一句,这里的注释我也抄过来了,就是说,若是轮训到写事件,就去执行forceFlush,而后清理掉OP_WRITE。

回想咱们写失败的地方。若是写失败,就给管道注册OP_WRITE,而后Reacotr线程会不断地去select,一旦Channel可用,那么这个Channel因为以前注册了OP_WRITE,就会被Reactor线程select出来,而后进行forceFlush,这个forceFlush其实就是调用了flush0,从新走了一遍flush操做,注意两个地方:

forceFlush不会添加flush节点(不会调用addFlush)

forceFlush不会进行isFlushPending校验

为何不进行isFlushPending:咱们说过了,OP_WRITE的意思是管道不可用,那么被select出来的管道必定是可用的,直接进行写操做。

这样咱们就理顺了写失败的流程:

若是写失败,给管道注册一个OP_WRITE

其余的flush操做都会直接返回(被isFlushPending)拦截

管道可用后,被Reacotr线程select到,进行forceFlush操做

收一下,看回到doWrite方法里,若是写成功,进行in.removeBytes(localWrittenBytes);操做,remove掉这个节点。

注意,正常状况下,结束doWrite操做是在:

1234if (in.isEmpty()) { clearOpWrite(); return;}

这里返回的。若是乐观锁的默认16次都循环完,操做还没结束,又会进行incompleteWrite(writeSpinCount < 0)操做,若是执行了16次循环之后,ChannelOutboundBuffer中还有Entry,writeSpinCount < 0成立,设置一个OP_WRITE,而后等着被Reacotr线程select。若是大于0,这种状况比较特殊,写入的ByteBuf或者FileRegion只有一个,可是这个ByteBuf是不可读的,或者region.transferred() >= region.count()。这时候会走到incompleteWrite(false)里,这里执行clearOpWrite();和eventLoop().execute(flushTask);,清理掉OP_WRITE让通道继续可写,而后再次扔了一个flushTask到NioEventLoop里,这里其是让出资源,让Reacotr能够处理其余的task。至此,整个写流程就结束了,写的比较细,你们多多琢磨,多多思考,才会有更多收获!

相关文章
相关标签/搜索