欢迎你们关注个人微博 http://weibo.com/hotbain 会将发布的开源项目技术贴经过微博通知你们,但愿你们可以互勉共进!谢谢!也很但愿可以获得你们对我博文的反馈,写出更高质量的文章!!java
write处理流程git
业务逻辑handler调用context的write方法,将欲发送的数据发送到带发送缓冲区中.github
看看write流程的触发代码(就是在一个业务handler中调用一下write方法便可):数组
public class DiscardServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx,final Object msg) throws Exception { ByteBuf bufferBuf =(ByteBuf)msg; System.out.println(new String(bufferBuf.array())); ctx.channel().write(bufferBuf); }
追踪一下,ctx.channel().write(bufferBuf)的实现(假设out pipeline中没有其余的encode handler了,),咱们会看到,最终会由AbstractUnsafe(AbstractUnsafe是channel的一个内部类对象)的write方法(很好找,顺着找就好了,记住,默认pipeline一定会有tail和head两个handler)进行处理,上代码:promise
public void write(Object msg, ChannelPromise promise) { if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); } else {//往缓存中添加一个消息对象 outboundBuffer.addMessage(msg, promise); } }
这里关注一下outboundBuffer.addMessage() 到此处,你们就会恍然大悟,知道怎么回事儿了,就是这样,仅仅将要写入的message object写入到一个buffer中。下面咱们来看一下outboundBuffer.addmessage的实现。缓存
注意: outboundBuffer是一个ChannelOutboundBuffer类型的兑现,每个channel都会一个ChannelOutboundBuffer对象与之关联,用来盛放欲发送的消息.上代码证实一切:网络
protected abstract class AbstractUnsafe implements Unsafe { private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this); private boolean inFlush0; }public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; private final ChannelId id = DefaultChannelId.newInstance(); private final Unsafe unsafe;//channel中有Unsafe引用 private final DefaultChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); /**省略部分代码***/ private final EventLoop eventLoop;Unsafe对象里有一个outboundBuffer ,而channel里有个unsafe引用,因此能够说,channel与outboundBuffer有has-a关系
socket
看一下ChannelOutboundBuffer outboundBuffer的addMessage实现:
ide
void addMessage(Object msg, ChannelPromise promise) { //预测message的size int size = channel.estimatorHandle().size(msg); if (size < 0) { size = 0; } //建立一个Entry对象,一个entry就是一个欲发送的message以及描述信息 Entry e = buffer[tail++]; e.msg = msg; e.pendingSize = size; e.promise = promise; e.total = total(msg); //由此能够看出total和pendingSize是相等的 tail &= buffer.length - 1; if (tail == flushed) {//扩展容量 addCapacity(); } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(size);//更新一下,欲发送的字节大小。 }
都很容易看懂,就是对欲发送的message封装成entry后,将其注册到一个链表中,若是链表大小不够用的话就调用addCapacity进行扩容。下面咱们看一下,addCapaciry()方法的实现,上代码:oop
private void addCapacity() { //更新链表的标志位 int p = flushed; int n = buffer.length; int r = n - p; // number of elements to the right of p 剩余没有刷出去的 int s = size(); int newCapacity = n << 1;//扩大一倍 注意 哈哈 if (newCapacity < 0) { throw new IllegalStateException(); } Entry[] e = new Entry[newCapacity];//建立对象数组。扩容后的哦! System.arraycopy(buffer, p, e, 0, r); System.arraycopy(buffer, 0, e, r, p);//拷贝的和为拷贝的数据(不包括二进制bytebuf数据哦)都要进行复制 for (int i = n; i < e.length; i++) {//将e数组中n到 e[i] = new Entry(); } buffer = e; flushed = 0; unflushed = s; tail = n; }
哈哈 很容易理解的,就是对数组进行扩展。而后复制
到目前为止,咱们已经讲解完了,write的的处理流程。咱们把message放入到buffer中,目的是为了将其发送到目的socket的内核缓冲区中,何时发送(固然是对应的socket发送write可写事件的时候)呢? 当writer事件发送的时候,就是咱们将缓冲起来的message flush到socket的内核缓冲区的时候了!!如今开始下一主题:
Flush处理流程
刚才已经说道flush的发生,意味着socket的write事件发生了,因而咱们天然而然的就想到了NioEventLoop的处理write事件的代码块,上代码:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop /**简洁起见,省略部分代码**/ if ((readyOps & SelectionKey.OP_WRITE) != 0) {//对于半包消息进行输出操做 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } /**简洁起见,省略部分代码*/ } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
从上面能够看到实际上worker线程也是调用的当前遍历的socketChannel的unsafe的forceFlush方法。直接上代码看具体实现(最终会调用到AbstractUnsafe的force0方法):
protected void flush0() { if (inFlush0) { //若是对于一个channel正在进行刷新 // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return;//若是缓存为空,则直接返回 } inFlush0 = true;//设置正在刷新标志位 // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { outboundBuffer.failFlushed(t); } finally { inFlush0 = false; } }
不用多说,若是write事件发生,可是缓冲区为空得话,那么就会直接返回,若是不是的话,再去调用doWrite方法。直接上代码(doWrite是个抽象方法,由NioSocketChannel实现,本身想为何选这个类!!能够看源代码!从链接的建立开始看哦!我有写链接建立的博文哦! 本身看吧! ):
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) {//不断写入---将全部的数据flush到内核网络缓冲区中 // Do non-gathering write for a single buffer case. final int msgCount = in.size(); //对于只有一个message的状况,直接写便可 if (msgCount <= 1) { super.doWrite(in); return; } // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers();//将全部的消息转换成java.nio.ByteBuf数组 if (nioBuffers == null) {//msg不是ByteBuf类型,则也不须要采用gathering write的方式,能够直接调用父类AbstractNioByteChannel的doWrite方法 super.doWrite(in); return; } int nioBufferCnt = in.nioBufferCount(); //buffer的个数 long expectedWrittenBytes = in.nioBufferSize();//总的buffer的byte的数量 final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //虽然是每次都是针对的messag进行遍历,可是写入的时候确实针对的全体内部的buffer进行遍历 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//localWrittenBytes写入的字节数目可能超过了本次遍历的msg的buffer的承载的个数,因此若是写回半包得话,须要比较current.msg的大小与此值的大小 if (localWrittenBytes == 0) { setOpWrite = true;//中止写入,多是由于socket如今不可写了 break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) {//若是但愿写入的数量为0,代表所有写入完毕 done = true; break; } } if (done) {//若是所有发送完毕 // Release all buffers for (int i = msgCount; i > 0; i --) { in.remove();//删除buffer数组中的元素--挨个删除 代码 888 } // Finish the write loop if no new messages were flushed by in.remove(). if (in.isEmpty()) {//若是所有完成的话,,那么就将其写半包标志删除 clearOpWrite(); break; } } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. //若是没有所有刷写完成,那么就释放已经写入的buffer,更新部分写入的buffer的索引 for (int i = msgCount; i > 0; i --) { //代码 999 final ByteBuf buf = (ByteBuf) in.current();//获得当前的直接内存,current()方法调用的不是很深,由于当前的msg已经通过niobuffers转换成直接类型的了,因此基本上会直接返回。 final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex;//获得可写的数据字节数 if (readableBytes < writtenBytes) {//若是写入的部分大于当前缓存指针的大小的话,那么就将其释放 in.progress(readableBytes); in.remove();//移动指针,移动到下一个buffer中 writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes);//从新设置当前的buffer的大小 in.progress(writtenBytes); break; } else { // readableBytes == writtenBytes 写入的部分 in.progress(readableBytes); in.remove();//直接移除(实际上是删除引用个数) break; }//代码 1000 } incompleteWrite(setOpWrite);//注册write兴趣事件 break; } } }
这个方法是flush的核心代码,可是代码很长,在此咱们讲一下基本流程,而后再讲几个重要方法是干吗的!!:
流程:
判断一下outbuffer欲发送的message的大小,若是为1的话,调用父类的doWriter方法.
而后调用outbuffer的nioBuffers方法,niobuffers方法主要就是对outbuffer中的bytebuffer解包(由于一个buffer可能会封装多个buffer)、发送字节数的统计(expectedWrittenBytes)、底层最小单位缓冲对象的个数(nioBufferCnt)统计。
调用java的原生API进行数据写入,ch.write(nioBuffers, 0, nioBufferCnt).注意 写入次数是能够经过WriteSpinCount配置项限制的哦!! 该配置项咱们能够在初始化或者运行的过程当中经过调用ctx.channel().config()进行配置或者更改。
若是全发送完成,那么就将缓冲区中的缓冲对象依次清空 见 代码 888
若是没有所有发送,就将所有刷出的bytebuf释放(至于怎样释放,会写一个专门的Netty内存管理的文章),仅仅发送一部分的byte的bytebuf的readerindex进行更改。 见代码 999和1000之间的代码
咱们粘贴一下nioBuffers的实现,很简单:
public ByteBuffer[] nioBuffers() { long nioBufferSize = 0;//用来记录全部须要发送的数据的字节大小 int nioBufferCount = 0;//用来记录最底层的buffer的个数多少 final int mask = buffer.length - 1; final ByteBufAllocator alloc = channel.alloc();//用于将heap缓冲转换成direct类型缓冲 ByteBuffer[] nioBuffers = this.nioBuffers; //底层的niobuffer,会对buffer的进行一个解包操做 Object m; int i = flushed; while (i != unflushed && (m = buffer[i].msg) != null) {//逐个遍历即将发送的bytebuf数据 if (!(m instanceof ByteBuf)) { this.nioBufferCount = 0; this.nioBufferSize = 0; return null; } // Entry entry = buffer[i]; ByteBuf buf = (ByteBuf) m; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex;//能够读取的字节数组 if (readableBytes > 0) { nioBufferSize += readableBytes; int count = entry.count;//获得低层的buffer的个数 if (count == -1) { entry.count = count = buf.nioBufferCount(); } //总的buffer的个数 int neededSpace = nioBufferCount + count; if (neededSpace > nioBuffers.length) {//若是buffer的个数超过了nioBuffers的length进行扩张,按照2倍的系数扩张 this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); } if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (count == 1) {//没有封装内部的缓存 ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount ++] = nioBuf; } else {//内部有多个buffer ByteBuffer[] nioBufs = entry.buffers; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms of Object allocation entry.buffers = nioBufs = buf.nioBuffers();//获得内部缓存 } //进行解压,并返回内部的全部的缓存的个数(解压后的哦) nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } else { nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,//将heap缓存转换层direct类型 readableBytes, alloc, nioBuffers, nioBufferCount); } } i = i + 1 & mask; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers;
很容易看明白,就对数据的统计。例如bytebuf的个数、发送的字节数的统计
线面咱们来看一下 current()方法的实现:
public Object current() { return current(true); } /** * 将当前即将要刷出的数据写入到directByteBuf中 * @param preferDirect * @return */ public Object current(boolean preferDirect) { if (isEmpty()) { //若是缓存为空,则直接返回 return null; } else { // TODO: Think of a smart way to handle ByteBufHolder messages Object msg = buffer[flushed].msg;//获得即将要刷新的数据缓冲区--buffer[flushed]表示即将要刷新的数据缓冲去 if (threadLocalDirectBufferSize <= 0 || !preferDirect) {//若是线程中没有直接内存缓冲区可用,不喜欢用堆外缓存 return msg; } if (msg instanceof ByteBuf) { //由此能够看出message必须是bytebuf类修的 ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) {//是否是直接内存中分配的 //对于nioBuffers以后,已经将全部bytebuf所有转换成direct类型的了!! return buf; } else { int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //若是没有了的话,就直接返回 return buf; } // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. // We can do a better job by using our pooled allocator. If the current allocator does not // pool a direct buffer, we use a ThreadLocal based pool. /** * 非直接内存会被拷贝的jdk本身的内部的直接缓冲区中。咱们能够经过具备池子功能的分配器来将工做作的更好。 * 若是当前的分配器没有起到一个缓冲直接内存的做用得话,那么咱们就会使用基于线程的threadLocal的池子 * */ ByteBufAllocator alloc = channel.alloc();//获得内存分配器 ByteBuf directBuf; if (alloc.isDirectBufferPooled()) {//是否为直接内存---分配的内存是否为直接缓冲做用的 directBuf = alloc.directBuffer(readableBytes); } else {//不然的话,就用与线程绑定的ThreadLocalPooledByteBuf进行二进制数据分配 directBuf = ThreadLocalPooledByteBuf.newInstance(); //从当前线程栈中获取一个bytebuf--ByteBuffer.allocateDirect(initialCapacity) } //进行必要的数据拷贝--将堆内的数据拷贝到直接内存中 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); current(directBuf);//将原先非direct类型的内存释放,而且替换成 direct的bytebuf return directBuf; } } return msg; } } /** * Replace the current msg with the given one. * The replaced msg will automatically be released 用一个指定的buffer去替换原先的buffer msg对象。呗替换的对象会自动释放 */ public void current(Object msg) { Entry entry = buffer[flushed]; safeRelease(entry.msg);// entry.msg = msg; }
从上面的代码中咱们会看到,netty会将全部的byteBuf解包后,所有转换成直接类型的内存,而后再发送。
到目前为止,咱们已经将必要的flush和write时,发生的事件进行必要的阐述。固然也有几个方法没有讲到,不过本人以为没有必要再去讲解,由于那样得话,太冗余了。
给一下write事件的流程图(稍后给出):
给一下flush处理的流程图(稍后给出):
可是还有几个方面你们煮注意一下:
发送完成的bytebuf内存是怎样释放的?
为何要推荐使用direct类型的内存进行推送?
关于上面的两个问题不是这篇文章要讲述的内容,本人会写一个关于Netty5内存管理的博文,来详细的讲述上面两个问题,让你们吐槽一下!!
本文是本人学习Netty后的我的总结的分享,须要说明的是本人仅仅表明我的的见解,不是什么官方权威的版本,若是有不当之处,还请赐教!!欢迎吐槽!!