写数据是NIO Channel实现的另外一个比较复杂的功能。每个channel都有一个outboundBuffer,这是一个输出缓冲区。当调用channel的write方法写数据时,这个数据被一系列ChannelOutboundHandler处理以后,它被放进这个缓冲区中,并无真正把数据写到socket channel中。而后再调用channel的flush方法,flush会把outboundBuffer中数据真正写到socket channel。正常状况下flush以后,数据已经真正写完了。但使用Selector加非阻塞socket的方式写数据,让写操做变得复杂了。操做系统为每一个socket维护了一个数据发送缓冲区,它的长度SO_SNDBUF, 每次发送数据,先把数据写到这个缓冲区中,操做系统负责把这个发送缓冲区中的数据发送出去,并清理这个缓冲区。当向缓冲区写的速率大于系统的发送速率时,它会被填满,在非阻塞模式下的表现为: 调用socket的write方法写入长度为n数据,实际写入的数据长度m的范围是:0=<m<n。这个时候还剩下长度为n-m的数据没有写入到socket,而数据必须以正确的顺序完整地写入到socket中。 outboundBuffer正是为解决这个问题而设计的,没写进socket的剩余数据会以正确的顺序保存在outboundBuffer中,当发送缓冲区中有空间能够写时,能够从outboundBuffer中取出剩余的数据继续写入到socket中。java
Channel write实现: 把数据写到outboundBuffer中git
write调用栈:github
1 io.netty.channel.AbstractChannel#write(java.lang.Object) 2 io.netty.channel.DefaultChannelPipeline#write(java.lang.Object) 3 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object) 4 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise) 5 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise) 6 io.netty.channel.AbstractChannelHandlerContext#invokeWrite 7 io.netty.channel.DefaultChannelPipeline.HeadContext#write 8 io.netty.channel.AbstractChannel.AbstractUnsafe#write
write的主要逻辑在io.netty.channel.AbstractChannel.AbstractUnsafe#write中实现,这个方法把要写的数据msg对象放到outboundBuffer中。在执行close时,netty不但愿有但愿写新的数据,避免引发不可预料的错误,所以会把outboundBuffer置为null。这里在向outboundBuffer写数据以前会把对它进行检查,若是是null就抛出错误。下面是这个write方法的实现。promise
1 @Override 2 public final void write(Object msg, ChannelPromise promise) { 3 assertEventLoop(); 4 5 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; 6 if (outboundBuffer == null) { 7 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); 8 ReferenceCountUtil.release(msg); 9 return; 10 } 11 12 int size; 13 try { 14 msg = filterOutboundMessage(msg); 15 size = pipeline.estimatorHandle().size(msg); 16 if (size < 0) { 17 size = 0; 18 } 19 } catch (Throwable t) { 20 safeSetFailure(promise, t); 21 ReferenceCountUtil.release(msg); 22 return; 23 } 24 25 outboundBuffer.addMessage(msg, size, promise); 26 }
第5-9行,对outboudBuffer进行检查,若是是null抛出错误。这个里有个小细节,用一个局部变量引用outboundBuffer,避免由其余线程对this.outboundBuffer置空引起错误。socket
14行,调用filterOutboundMessage对msg进行过滤。这是一个protected方法,默认实现是什么都没作,返回输入的msg参数。子类能够覆盖这个方法,把msg转换成指望的类型。ide
15行,计算msg的长度。oop
25行,把放入到outboundBuffer中。this
Channel flush实现:把数据真正写到channelspa
flush调用栈:操作系统
1 io.netty.channel.AbstractChannel#flush 2 io.netty.channel.DefaultChannelPipeline#flush 3 io.netty.channel.AbstractChannelHandlerContext#flush 4 io.netty.channel.AbstractChannelHandlerContext#invokeFlush 5 io.netty.channel.DefaultChannelPipeline.HeadContext#flush 6 io.netty.channel.AbstractChannel.AbstractUnsafe#flush 7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 8 io.netty.channel.socket.nio.NioSocketChannel#doWrite 9 io.netty.channel.nio.AbstractNioByteChannel#doWrite 10 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
以上是io.netty.channel.socket.nio.NioSocketChannel的flush调用栈,对于io.netty.channel.socket.nio.NioDatagramChannel来讲,从第8行开始变得不一样:
7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 8 io.netty.channel.nio.AbstractNioMessageChannel#doWrite 9 io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage
把Byte数据流写入channel
io.netty.channel.socket.nio.NioSocketChannel#doWrite是Byte数据流的写逻辑,io.netty.channel.nio.AbstractNioByteChannel#doWrite也是,这二者不一样的地方在于前者是在outboundBuffer能够转换成java.nio.ByteBuffer的状况下执行,后者是在outboundBuffer中的msg是ByteBuf或FileRegin类型时执行。除此以外其余逻辑都同样:
下面来看看io.netty.channel.socket.nio.NioSocketChannel#doWrite的实现代码:
1 @Override 2 protected void doWrite(ChannelOutboundBuffer in) throws Exception { 3 for (;;) { 4 int size = in.size(); 5 if (size == 0) { 6 // All written so clear OP_WRITE 7 clearOpWrite(); 8 break; 9 } 10 long writtenBytes = 0; 11 boolean done = false; 12 boolean setOpWrite = false; 13 14 // Ensure the pending writes are made of ByteBufs only. 15 ByteBuffer[] nioBuffers = in.nioBuffers(); 16 int nioBufferCnt = in.nioBufferCount(); 17 long expectedWrittenBytes = in.nioBufferSize(); 18 SocketChannel ch = javaChannel(); 19 20 // Always us nioBuffers() to workaround data-corruption. 21 // See https://github.com/netty/netty/issues/2761 22 switch (nioBufferCnt) { 23 case 0: 24 // We have something else beside ByteBuffers to write so fallback to normal writes. 25 super.doWrite(in); 26 return; 27 case 1: 28 // Only one ByteBuf so use non-gathering write 29 ByteBuffer nioBuffer = nioBuffers[0]; 30 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { 31 final int localWrittenBytes = ch.write(nioBuffer); 32 if (localWrittenBytes == 0) { 33 setOpWrite = true; 34 break; 35 } 36 expectedWrittenBytes -= localWrittenBytes; 37 writtenBytes += localWrittenBytes; 38 if (expectedWrittenBytes == 0) { 39 done = true; 40 break; 41 } 42 } 43 break; 44 default: 45 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { 46 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); 47 if (localWrittenBytes == 0) { 48 setOpWrite = true; 49 break; 50 } 51 expectedWrittenBytes -= localWrittenBytes; 52 writtenBytes += localWrittenBytes; 53 if (expectedWrittenBytes == 0) { 54 done = true; 55 break; 56 } 57 } 58 break; 59 } 60 61 // Release the fully written buffers, and update the indexes of the partially written buffer. 62 in.removeBytes(writtenBytes); 63 64 if (!done) { 65 // Did not write all buffers completely. 66 incompleteWrite(setOpWrite); 67 break; 68 } 69 } 70 }
第5-7行,若是outboundBuffer中已经没有数据了,调用clearOpWrite方法清除channel SelectionKey上的OP_WRITE事件。
第15-17行,把outboundBuffer转换成ByteBuffer类型,并获得数据长度。
25行,outboundBuffer不能转换成ByteBuffer, 调用io.netty.channel.nio.AbstractNioByteChannel#doWrite执行写操做。
29-42,45-57的逻辑基本已经,都是尽可能把ByteBuffer中的数据写到channel中,知足下列条件中的任意一个时,结束本次写操做:
1. ByteBuffer中的数据已经写完,正常结束。
2. channel已经不能写入数据,须要在channel能够写是继续执行写操做。
3. 者超过channel config中写入次数限制,须要选择合适的实际继续执行写操做。
62行,把已经写入到channel的数据从outboundBuffer中删除。
64-66行, 若是数据没写完,调用incompleteWrite处理没写完的状况。当setOpWrite==true时,在channel的SelectionKey上设置OP_WRITE事件,等eventLoop触发这个事件时再继续执行flush操做。不然,把flush包装成task放到eventLoop中排队执行。
当NioEventLoop检测到OP_WRITE事件时,会调用processSelectedKey方法处理:
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
forceFlush的调用栈以下:
1 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#forceFlush 2 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 3 io.netty.channel.socket.nio.NioSocketChannel#doWrite 4 io.netty.channel.nio.AbstractNioByteChannel#doWrite 5 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
把数据写入UDP类型的channel
io.netty.channel.nio.AbstractNioMessageChannel#doWrite是数据报的写逻辑。相较于Byte流类型的数据,数据报的写逻辑简单一些。它只是把outboundBuffer中的数据报依次写入到channel中,若是channel写满了,在channel的SelectionKey上设置OP_WRITE事件随后退出,其后OP_WRITE事件处理逻辑和Byte流写逻辑同样。 真正的写操做在io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage中实现,这个方法的实现以下:
1 @Override 2 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { 3 final SocketAddress remoteAddress; 4 final ByteBuf data; 5 if (msg instanceof AddressedEnvelope) { 6 @SuppressWarnings("unchecked") 7 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg; 8 remoteAddress = envelope.recipient(); 9 data = envelope.content(); 10 } else { 11 data = (ByteBuf) msg; 12 remoteAddress = null; 13 } 14 15 final int dataLen = data.readableBytes(); 16 if (dataLen == 0) { 17 return true; 18 } 19 20 final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); 21 final int writtenBytes; 22 if (remoteAddress != null) { 23 writtenBytes = javaChannel().send(nioData, remoteAddress); 24 } else { 25 writtenBytes = javaChannel().write(nioData); 26 } 27 return writtenBytes > 0; 28 }
5-9行,处理AddressedEnvelope类型的数据报,获得数据报的远程地址和数据。
10-12行,发送的是一个ByteBuf。没有指定远程地址。这种状况下须要先调用channel的connect方法。
20-26行,分别针对两种状况发送数据报. 23行指定了远程地址,25行没有指定远程地址,但调用过了connect方法。