netty---------write flush两个方法到底作了什么?

上一篇已经看到:netty的读,是调用unsafe的read方法,把channel中的数据read到byteBuff中的byteBuffer里,也是封装了nio的读。java

那么根据猜测,netty的写应该也是调用nio 的 channel的write(byteBuffer),将用户空间的数据,写到内核空间。并且nio的write方法对应的netty应该是flush方法,看看是否是这样。数组

@Override
-----------ChannelHandlerContext的write方法实际上是调用的pipeline的write方法,到最后调用的是tail的write方法,
public final ChannelFuture write(Object msg) { return tail.write(msg); }
--------------------这里根据传入的boolean值,决定是write仍是writeAndflush,先看write
private
void write(Object msg, boolean flush, ChannelPromise promise) {
-----------------由于这里执行的是tail的write方法, 因此要看tail的findContextOutbound的逻辑,tail这个context在初始化的时候inbound是true,可是outbound属性是false,因此要从tail往前,找outboundHandler,而head的outbound属性是true的,看它的write方
法,调用的是unsafe的write方法,是AbstractChannel的write方法。看下一段
AbstractChannelHandlerContext next
= findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

 

@Override
-----------------unsafe的write方法
public final void write(Object msg, ChannelPromise promise) { assertEventLoop();
----------------ChannelOutboundBuffer这个类,维护这一个由他本身的内部类entry组成的一个单链表,entry有着指向byteBuffer和byteBuffer数组的指针,本方法的最下面的addMessage,就是新建一个entry,而后放到链表里,ChannelOutboundBuffer本身有三个属性:
----------------flushedEntry :指向第一个已经被flush的entry
----------------unflushedEntry :指向第一个没有被flush的entry
----------------tailEntry : 队尾的指针。这三个其实很简单
ChannelOutboundBuffer outboundBuffer
= this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }

 因此能够看到,所谓的netty的channelHandlerContext的write方法,其实并非向内核写入数据,而是把msg放入一个链表中,等待flush,而flush方法也是在headContext中,调用的unsafe的flush方法,promise

注意这里的unsafe是socket

@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; }
------------这个方法是把flushedEntry指针置空,把unFlushedEntry指针放到队列头部 outboundBuffer.addFlush();
------------下文 flush0(); }
@Override
-----------------从flush0到这一步的逻辑比较复杂,涉及到各类判断,简单起见,直接看这一步NioSocketChannel的doWriteBytes方法,netty的写,对应的是buf的readBytes,这其实只是方法名字取的问题。回忆以前的doReadBytes对应的是buf的writeBytes方法,
当时是把channel的数据读到nio
的ByteBuffer中,如今应该是把byteBuffer中的数据write到channel中了,写到内核
protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
---------------仍是找到以前的那个byteBuf的实现类,能够看出,仍是调用nio的socketChannel的write方法,把tmpBuf(一个Nio的ByteBuffer)写到channel中。
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); if (length == 0) { return 0; } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = buffer.duplicate(); } tmpBuf.clear().position(index).limit(index + length); return out.write(tmpBuf); }
相关文章
相关标签/搜索