Netty源码分析七章: 编码器和写数据html
第三节: 写buffer队列promise
以前的小节咱们介绍过, writeAndFlush方法其实最终会调用write和flush方法缓存
write方法最终会传递到head节点, 调用HeadContext的write方法:oop
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
这里经过unsafe对象的write方法, 将消息写入到缓存中, 具体的执行逻辑, 咱们在这个小节进行剖析源码分析
咱们跟到AbstractUnsafe的write方法中:this
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //负责缓冲写进来的byteBuf ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { //非堆外内存转化为堆外内存 msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入写队列 outboundBuffer.addMessage(msg, size, promise); }
首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer 编码
ChannelOutboundBuffer的功能就是缓存写入的ByteBufspa
咱们继续看try块中的 msg = filterOutboundMessage(msg) 指针
这步的意义就是将非对外内存转化为堆外内存code
filterOutboundMessage方法方法最终会调用AbstractNioByteChannel中的filterOutboundMessage方法:
protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //是堆外内存, 直接返回 if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
首先判断msg是否byteBuf对象, 若是是, 判断是否堆外内存, 若是是堆外内存, 则直接返回, 不然, 经过newDirectBuffer(buf)这种方式转化为堆外内存
回到write方法中:
outboundBuffer.addMessage(msg, size, promise)将已经转化为堆外内存的msg插入到写队列
咱们跟到addMessage方法当中, 这是ChannelOutboundBuffer中的方法:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(size, false); }
首先经过 Entry.newInstance(msg, size, total(msg), promise) 的方式将msg封装成entry
而后经过调整tailEntry, flushedEntry, unflushedEntry三个指针, 完成entry的添加
这三个指针均是ChannelOutboundBuffer的成员变量
flushedEntry指向第一个被flush的entry
unflushedEntry指向第一个未被flush的entry
也就是说, 从flushedEntry到unflushedEntry之间的entry, 都是被已经被flush的entry
tailEntry指向最后一个entry, 也就是从unflushedEntry到tailEntry之间的entry都是没flush的entry
咱们回到代码中:
建立了entry以后首先判断尾指针是否为空, 在第一次添加的时候, 均是空, 因此会将flushedEntry设置为null, 而且将尾指针设置为当前建立的entry
最后判断unflushedEntry是否为空, 若是第一次添加这里也是空, 因此这里将unflushedEntry设置为新建立的entry
第一次添加以下图所示
7-3-1
若是不是第一次调用write方法, 则会进入 if (tailEntry == null) 中else块:
Entry tail = tailEntry 这里tail就是当前尾节点
tail.next = entry 表明尾节点的下一个节点指向新建立的entry
tailEntry = entry 将尾节点也指向entry
这样就完成了添加操做, 其实就是将新建立的节点追加到原来尾节点以后
第二次添加 if (unflushedEntry == null) 会返回false, 因此不会进入if块
第二次添加以后指针的指向状况以下图所示:
7-3-4
之后每次调用write, 若是没有调用flush的话都会在尾节点以后进行追加
回到代码中, 看这一步incrementPendingOutboundBytes(size, false)
这步时统计当前有多少字节须要被写出, 咱们跟到这个方法中:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } //TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //getWriteBufferHighWaterMark() 最高不能超过64k if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } }
看这一步:
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)
TOTAL_PENDING_SIZE_UPDATER表示当前缓冲区还有多少待写的字节, addAndGet就是将当前的ByteBuf的长度进行累加, 累加到newWriteBufferSize中
在继续看判断 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark())
channel.config().getWriteBufferHighWaterMark() 表示写buffer的高水位值, 默认是64k, 也就是说写buffer的最大长度不能超过64k
若是超过了64k, 则会调用setUnwritable(invokeLater)方法设置写状态
咱们跟到setUnwritable(invokeLater)方法中:
private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }
这里经过自旋和cas操做, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理
以上就是写buffer的相关逻辑