Netty源码分析第7章(编码器和写数据)---->第4节: 刷新buffer队列

 

Netty源码分析第七章: 编码器和写数据html

 

第四节: 刷新buffer队列java

 

上一小节学习了writeAndFlush的write方法, 这一小节咱们剖析flush方法promise

经过前面的学习咱们知道, flush方法经过事件传递, 最终会传递到HeadContext的flush方法:socket

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

这里最终会调用AbstractUnsafe的flush方法:oop

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }

这里首先也是拿到ChannelOutboundBuffer对象源码分析

而后咱们看这一步:学习

outboundBuffer.addFlush();

这一步一样也是调整ChannelOutboundBuffer的指针this

跟进addFlush方法:编码

public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }

首先声明一个entry指向unflushedEntry, 也就是第一个未flush的entryspa

一般状况下unflushedEntry是不为空的, 因此进入if

再未刷新前flushedEntry一般为空, 因此会执行到flushedEntry = entry

也就是flushedEntry指向entry

通过上述操做, 缓冲区的指针状况如图所示:

7-4-1

而后经过do-while将, 不断寻找unflushedEntry后面的节点, 直到没有节点为止

flushed自增表明须要刷新多少个节点

循环中咱们关注这一步

decrementPendingOutboundBytes(pending, false, true);

这一步也是统计缓冲区中的字节数, 可是是和上一小节的incrementPendingOutboundBytes正好是相反, 由于这里是刷新, 因此这里要减掉刷新后的字节数,

咱们跟到方法中:

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } //从总的大小减去 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //直到减到小于某一个阈值32个字节 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //设置写状态  setWritable(invokeLater); } }

一样TOTAL_PENDING_SIZE_UPDATER表明缓冲区的字节数, 这里的addAndGet中参数是-size, 也就是减掉size的长度

再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) 

getWriteBufferLowWaterMark()表明写buffer的第水位值, 也就是32k, 若是写buffer的长度小于这个数, 就经过setWritable方法设置写状态

也就是通道由原来的不可写改为可写

回到addFlush方法:

遍历do-while循环结束以后, 将unflushedEntry指为空, 表明全部的entry都是可写的

通过上述操做, 缓冲区的指针状况以下图所示:

7-4-2

回到AbstractUnsafe的flush方法:

指针调整完以后, 咱们跟到flush0()方法中:

protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }

 if (inFlush0) 表示判断当前flush是否在进行中, 若是在进行中, 则返回, 避免重复进入

咱们重点关注doWrite方法

跟到AbstractNioByteChannel的doWrite方法中去:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { //每次拿到当前节点 Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //转化成ByteBuf ByteBuf buf = (ByteBuf) msg; //若是没有可写的值 int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //移除  in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { //将buf写入到socket里面 //localFlushedAmount表明向jdk底层写了多少字节 int localFlushedAmount = doWriteBytes(buf); //若是一个字节没写, 直接break if (localFlushedAmount == 0) { setOpWrite = true; break; } //统计总共写了多少字节 flushedAmount += localFlushedAmount; //若是buffer所有写到jdk底层 if (!buf.isReadable()) { //标记全写道 done = true; break; } } in.progress(flushedAmount); if (done) { //移除当前对象  in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代码省略 } else { throw new Error(); } } incompleteWrite(setOpWrite); }

首先是一个无限for循环

 Object msg = in.current() 这一步是拿到flushedEntry指向的entry中的msg

跟到current()方法中:

public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }

这里直接拿到flushedEntry指向的entry中关联的msg, 也就是一个ByteBuf

回到doWrite方法:

若是msg为null, 说明没有能够刷新的entry, 则调用clearOpWrite()方法清除写标识

若是msg不为null, 则会判断是不是ByteBuf类型, 若是是ByteBuf, 就进入if块中的逻辑

if块中首先将msg转化为ByteBuf, 而后判断ByteBuf是否可读, 若是不可读, 则经过in.remove()将当前的byteBuf所关联的entry移除, 而后跳过此次循环进入下次循环

remove方法稍后分析, 这里咱们先继续往下看

 boolean done = false 这里设置一个标识, 标识刷新操做是否执行完成, 这里默认值为false表明走到这里没有执行完成

 writeSpinCount = config().getWriteSpinCount() 这里是得到一个写操做的循环次数, 默认是16

而后根据这个循环次数, 进行循环的写操做

在循环中, 关注这一步:

int localFlushedAmount = doWriteBytes(buf);

这一步就是将buf的内容写到channel中, 并返回写的字节数, 这里会调用NioSocketChannel的doWriteBytes

咱们跟到doWriteBytes方法中:

protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }

这里首先拿到buf的可读字节数, 而后经过readBytes将可读字节写入到jdk底层的channel中

回到doWrite方法:

将内容写的jdk底层的channel以后, 若是一个字节都没写, 说明如今channel可能不可写, 将setOpWrite设置为true, 用于标识写操做位, 并退出循环

若是已经写出字节, 则经过 flushedAmount += localFlushedAmount 累加写出的字节数

而后根据是buf是否没有可读字节数判断是否buf的数据已经写完, 若是写完, 将done设置为true, 说明写操做完成, 并退出循环

由于有时候不必定一次就能将byteBuf全部的字节写完, 因此这里会继续经过循环进行写出, 直到循环到16次

若是ByteBuf内容彻底写完, 会经过in.remove()将当前entry移除掉

咱们跟到remove方法中:

public boolean remove() { //拿到当前第一个flush的entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }

首先拿到当前的flushedEntry

咱们重点关注removeEntry这步, 跟进去:

private void removeEntry(Entry e) { if (-- flushed == 0) { //位置为空 flushedEntry = null; //若是是最后一个节点 if (e == tailEntry) { //所有设置为空 tailEntry = null; unflushedEntry = null; } } else { //移动到下一个节点 flushedEntry = e.next; } }

 if (-- flushed == 0) 表示当前节点是否为须要刷新的最后一个节点, 若是是, 则flushedEntry指针设置为空

若是当前节点是tailEntry节点, 说明当前节点是最后一个节点, 将tailEntry和unflushedEntry两个指针所有设置为空

若是当前节点不是须要刷新的最后的一个节点, 则经过 flushedEntry = e.nex t这步将flushedEntry指针移动到下一个节点

以上就是flush操做的相关逻辑

 

上一节: 写buffer队列

下一节: Future和Promies

相关文章
相关标签/搜索