首先, 咱们想经过服务端,往客户端发送数据, 一般咱们会调用**ctx.writeAndFlush(数据)
**的方式, 入参位置的数据多是基本数据类型,也可能对象html
其次,编码器一样属于handler,只不过他是特化的专门用于编码做用的handler, 在咱们的消息真正写入jdk底层的ByteBuffer时前,数据须要通过编码处理, 不是说不进行编码就发送不出去,而是不通过编码,客户端可能接受到的是乱码java
而后,咱们知道,ctx.writeAndFlush(数据)
它实际上是出站处理器特有的行为,所以注定了它须要在pipeline中进行传递,从哪里进行传递呢? 从tail节点开始,一直传播到header以前的咱们本身添加的自定义的解码器
中git
WriteAndFlush()
的逻辑咱们跟进源码WriteAndFlush()
相对于Write()
,它的flush字段是truegithub
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //todo 由于flush 为 true next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }
因而就会这样设计模式
write()
flush()
知道这一点很重要,这意味这咱们知道了,事件传播分红两波进行, 一波write,一波flush, 这两波事件传播的大致流程我写在这里, 在下面api
writepromise
flush缓存
ByteBuffer
/** * @Author: Changwu * @Date: 2019/7/21 20:49 */ public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> { // todo write动做会传播到 MyPersonEncoder的write方法, 可是咱们没有重写, 因而就执行 父类 MessageToByteEncoder的write, 咱们进去看 @Override protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder...."); // 消息头 长度 out.writeInt(msg.getLength()); // 消息体 out.writeBytes(msg.getContent()); } }
选择继承MessageToByteEncoder<T>
从消息到字节的编码器数据结构
ok,如今来到了咱们自定义的 解码器MyPersonEncoder
,架构
可是,并没看到正在传播的writeAndFlush()
,不要紧, 咱们本身的解码器继承了MessageToByteEncoder
,这个父类中实现了writeAndFlush()
,源码以下:解析写在源码后面
// todo 看他的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {// todo 1 判断当前是否能够处理这个对象 @SuppressWarnings("unchecked") I cast = (I) msg; // todo 2 内存分配 buf = allocateBuffer(ctx, cast, preferDirect); try { // todo 3 调用本类的encode(), 这个方法就是咱们本身实现的方法 encode(ctx, cast, buf); } finally { // todo 4 释放 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // todo 5. 往前传递 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { // todo 释放 buf.release(); } }
encode()
方法,这是个抽象方法,由咱们自定义的编码器实现
write()
事件到这里为止,编码器的执行流程已经完成了,咱们能够看到,和解码器的架构逻辑类似,相似于模板设计模式,对咱们来讲,只不过是作了个填空题
其实到上面的最后一步 释放第一步建立的ByteBuf
以前 ,消息已经被写到jdk底层的 ByteBuffer 中了,怎么作的呢? 别忘了它的上一步, 继续向前传递write()
事件,再往前其实就是HeaderContext
了,和HeaderContext
直接关联的就是unsafe类, 这并不奇怪,咱们都知道,netty中不管是客户端仍是服务端channel底层的数据读写,都依赖unsafe
下面开始分析,
WriteAndFlush()
底层的两波任务细节
write()
咱们跟进HenderContext的write()
,而HenderContext的中依赖的是unsafe.wirte()
因此直接去 AbstractChannel
的Unsafe 源码以下:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // todo 缓存 写进来的 buffer ReferenceCountUtil.release(msg); return; } int size; try { // todo buffer Dirct化 , (咱们查看 AbstractNioByteBuf的实现) msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // todo 插入写队列 将 msg 插入到 outboundBuffer // todo outboundBuffer 这个对象是 ChannelOutBoundBuf类型的,它的做用就是起到一个容器的做用 // todo 下面看, 是如何将 msg 添加进 ChannelOutBoundBuf中的 outboundBuffer.addMessage(msg, size, promise); }
参数位置的msg,就是通过咱们自定义解码器的父类进行包装了的ByteBuf
类型消息
这个方法主要作了三件事
filterOutboundMessage(msg);
将ByteBuf转换成DirctByteBuf
当咱们进入查看他的实现时,idea会提示,它的子类重写了这个方法, 是谁重写的呢? 是AbstractNioByteChannel
这个类实际上是属于客户端阵营的类,和服务端的AbstractNioMessageChannel
相提并论
源码以下:
protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
DirectBuffer
插入到写队列中它其实就是一个netty自定义的容器,使用的单向链表的结构,为何要有这个容器呢? 回想一下,服务端须要向客户端发送消息,消息进而被封装进ByteBuf
,可是呢, 往客户端写的方法有两个
这个方法的区别是有的,前者只是进行了写,(写到了ByteBuf) 却没有将内容刷新到ByteBuffer
,没有刷新到缓存中,就没办法进一步把它写入jdk原生的ByteBuffer
中, 而 writeAndFlush()
就比较方便,先把msg写入ByteBuf
,而后直接刷进socket,一套带走,打完收工
可是若是客户端恰恰就是不使用writeAndFlush()
,而使用前者,那么盛放消息的ByteBuf
被传递到handler的最开始的位置,怎么办? unsafe也没法把它写给客户端, 难道丢弃不成?
因而写队列就解决了这个问题,它以链表当作数据结构,新传播过来的ByteBuf
就会被他封装成一个一个的节点(entry)进行维护,为了区分这个链表中,哪一个节点是被使用过的,哪一个节点是没有使用过的,他就用三个标记指针进行标记,以下:
下面咱们看一下,它如何将一个新的节点,添加到写队列
addMessage(Object msg, int size, ChannelPromise promise)
添加写队列public void addMessage(Object msg, int size, ChannelPromise promise) { // todo 将上面的三者封装成实体 // todo 调用工厂方法, 建立 Entry , 在 当前的ChannelOutboundBuffer 中每个单位都是一个 Entry, 用它进一步包装 msg Entry entry = Entry.newInstance(msg, size, total(msg), promise); // todo 调整三个指针, 去上面查看这三个指针的定义 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 // todo 跟进这个方法 incrementPendingOutboundBytes(entry.pendingSize, false); }
看他的源码,其实就是简单的针对链表进行插入的操做,尾插入法, 一直往最后的位置插入,链表的头被标记成unflushedEntry
这两个节点之间entry,表示是能够被flush的节点
在每次添加新的 节点后都调用incrementPendingOutboundBytes(entry.pendingSize, false)
方法, 这个方法的做用是设置写状态, 设置怎样的状态呢? 咱们看它的源码, 能够看到,它会记录下累计的ByteBuf
的容量,一旦超出了阈值,就会传播channel不可写的事件
write()
的第三件事private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } // todo TOTAL_PENDING_SIZE_UPDATER 当前缓存中 存在的代写的 字节 // todo 累加 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // todo 判断 新的将被写的 buffer的容量不能超过 getWriteBufferHighWaterMark() 默认是 64*1024 64字节 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { // todo 超过64 字节,进入这个方法 setUnwritable(invokeLater); } }
到目前为止,第一波write()
事件已经完成了,咱们能够看到了,这个事件的功能就是使用ChannelOutBoundBuf
将write事件传播过去的单个ByteBuf
维护起来,等待 flush事件的传播
flush()
咱们从新回到,AbstractChannel
中,看他的第二波flush事件的传播状态, 源码以下:它也是主要作了下面的三件事
// todo 最终传递到 这里 @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // todo 添加刷新标志, 设置写状态 outboundBuffer.addFlush(); // todo 遍历buffer队列, 过滤byteBuf flush0(); }
什么是添加刷新标志呢? 其实就是更改链表中的指针位置,三个指针之间的能够完美的把entry
划分出曾经flush过的和未flush节点
ok,继续
下面看一下如何设置状态,addflush() 源码以下:
* todo 给 ChannelOutboundBuffer 添加缓存, 这意味着, 原来添加进 ChannelOutboundBuffer 中的全部 Entry, 所有会被标记为 flushed 过 */ public void addFlush() { // todo 默认让 entry 指向了 unflushedEntry ==> 其实链表中的最左边的 未被使用过的 entry // todo Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); // todo 跟进这个方法 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
目标是移动指针,改变每个节点的状态, 哪个指针呢? 是 flushedEntry
, 它指向读被flush的节点,也就是说,它左边的,都被处理过了
下面的代码,是选出一开始位置, 由于, 若是flushedEntry == null,说明没有任何一个曾经被flush过的节点,因而就将开始的位置定位到最左边开始,
if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; }
紧接着一个do-while循环,从最后一个被flushedEntry
的地方,到尾部,挨个遍历每个节点, 由于这些节点要被flush进缓存,咱们须要把write时累加的他们的容量减掉, 源码以下
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } // todo 每次 减去 -size long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); // todo 默认 getWriteBufferLowWaterMark() -32kb // todo newWriteBufferSize<32 就把不可写状态改成可写状态 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }
一样是使用原子类作到的这件事, 此外,通过减小的容量,若是小于了32kb就会传播 channel可写的事件
这是flush的重头戏,它实现了将数据写入socket的操做
咱们跟进它的源码,doWrite(ChannelOutboundBuffer in)
这是本类AbstractChannel
的抽象方法, 写如的逻辑方法,被设计成抽象的,具体往那个channel写,和具体的实现有关, 当前咱们想往客户端写, 它的实现是AbstractNioByteChannel
,咱们进入它的实现,源码以下
boolean setOpWrite = false; // todo 总体是无限循环, 过滤ByteBuf for (;;) { // todo 获取第一个 flushedEntity, 这个entity中 有咱们须要的 byteBuf Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } if (msg instanceof ByteBuf) { // todo 第三部分,jdk底层, 进行自旋的写 ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { // todo 当前的 ByteBuf 中,没有可写的, 直接remove掉 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { // todo 获取自旋锁, netty使用它进行 writeSpinCount = config().getWriteSpinCount(); } // todo 这个for循环是在自旋尝试往 jdk底层的 ByteBuf写入数据 for (int i = writeSpinCount - 1; i >= 0; i --) { // todo 把 对应的 buf , 写到socket中 // todo localFlushedAmount就是 本次 往jdk底层的 ByteBuffer 中写入了多少字节 int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } // todo 累加一共写了多少字节 flushedAmount += localFlushedAmount; // todo 若是buf中的数据所有写完了, 设置完成的状态, 退出循环 if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // todo 自旋结束,写完了 done = true if (done) { // todo 跟进去 in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } ....
这一段代码也是很是长, 它的主要逻辑以下:
经过一个无限循环,保证能够拿到全部的节点上的ByteBuf
,经过这个函数获取节点, Object msg = in.current();
咱们进一步看它的实现,以下,它只会取出咱们标记的节点
public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
下一步, 使用jdk的自旋锁,循环16次,尝试往jdk底层的ByteBuffer中写数据, 调用函数doWriteBytes(buf);
他是本类的抽象方法, 具体的实现是,客户端chanel的封装类NioSocketChannel
实现的源码以下:
// todo @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); // todo 将字节数据, 写入到 java 原生的 channel中 return buf.readBytes(javaChannel(), expectedWrittenBytes); }
这个readBytes()
依然是抽象方法,由于前面咱们曾经把从ByteBuf
转化成了Dirct类型的, 因此它的实现类是PooledDirctByteBuf
继续跟进以下: 终于见到了亲切的一幕
// todo @Override public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); //todo 关键的就是 getBytes() 跟进去 int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; return readBytes; } 跟进getBytes(){ index = idx(index); // todo 将netty 的 ByteBuf 塞进 jdk的 ByteBuffer tmpBuf; tmpBuf.clear().position(index).limit(index + length); // todo 调用jdk的write()方法 return out.write(tmpBuf); }
private void removeEntry(Entry e) { if (-- flushed == 0) { // todo 若是是最后一个节点, 把全部的指针所有设为 null // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { //todo 若是 不是最后一个节点, 把当前节点,移动到最后的 节点 flushedEntry = e.next; } }
到这里, 第二波任务的传播就完成了
write
flush
ByteBuffer
原文出处:https://www.cnblogs.com/ZhuChangwu/p/11228433.html