编码器实现了ChannelOutboundHandler
,并将出站数据从 一种格式转换为另外一种格式,和咱们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具备如下功能的编码器:java
解码器一般须要在Channel
关闭以后产生最后一个消息(所以也就有了 decodeLast()
方法)
这显然不适于编码器的场景——在链接被关闭以后仍然产生一个消息是毫无心义的react
其接受一Short
型实例做为消息,编码为Short
的原子类型值,并写入ByteBuf
,随后转发给ChannelPipeline
中的下一个 ChannelOutboundHandler
每一个传出的 Short 值都将会占用 ByteBuf 中的 2 字节编程
Netty 提供了一些专门化的 MessageToByteEncoder
,可基于此实现本身的编码器WebSocket08FrameEncoder
类提供了一个很好的实例promise
你已经看到了如何将入站数据从一种消息格式解码为另外一种
为了完善这幅图,将展现 对于出站数据将如何从一种消息编码为另外一种。MessageToMessageEncoder
类的 encode()
方法提供了这种能力
为了演示,使用IntegerToStringEncoder
扩展了 MessageToMessageEncoder
缓存
关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder
类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。markdown
pipeline中的标准链表结构
java对象编码过程
write:写队列
flush:刷新写队列
writeAndFlush: 写队列并刷新数据结构
数据从head节点流入,先拆包,而后解码成业务对象,最后通过业务Handler
处理,调用write
,将结果对象写出去
而写的过程先经过tail
节点,而后经过encoder
节点将对象编码成ByteBuf
,最后将该ByteBuf
对象传递到head
节点,调用底层的Unsafe写到JDK底层管道并发
为何咱们在pipeline中添加了encoder节点,java对象就转换成netty能够处理的ByteBuf,写到管道里?socket
咱们先看下调用write的code
业务处理器接受到请求以后,作一些业务处理,返回一个user
ide
而后,user在pipeline中传递
情形一
情形二
handler 若是不覆盖 flush 方法,就会一直向前传递直到 head 节点
落到 Encoder
节点,下面是 Encoder
的处理流程
按照简单自定义协议,将Java对象 User 写到传入的参数 out中,这个out究竟是什么?
需知User
对象,从BizHandler
传入到 MessageToByteEncoder
时,首先传到 write
encode
,这里就调回到 Encoder
这个Handler
中ByteBuf
了,那么这个对象就已经无用,释放掉 (当传入的msg
类型是ByteBuf
时,就不须要本身手动释放了)//112 若是buf中写入了数据,就把buf传到下一个节点,直到 header 节点
//115 不然,释放buf,将空数据传到下一个节点
// 120 若是当前节点不能处理传入的对象,直接扔给下一个节点处理
// 127 当buf在pipeline中处理完以后,释放
Handler
是否能处理写入的消息
Encoder
能够处理的 Response
对象ByteBuf
encoder
,即进入到 Encoder 的 encode方法,该方法是用户代码,用户将数据写入ByteBuf总结就是,Encoder
节点分配一个ByteBuf
,调用encode
方法,将Java对象根据自定义协议写入到ByteBuf,而后再把ByteBuf传入到下一个节点,在咱们的例子中,最终会传入到head节点
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象
如下过程分三步讲解
assertEventLoop
确保该方法的调用是在reactor
线程中filterOutboundMessage()
,将待写入的对象过滤,把非ByteBuf
对象和FileRegion
过滤,把全部的非直接内存转换成直接内存DirectBuffer
想要理解上面这段代码,须掌握写缓存中的几个消息指针
ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每一个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是
ChannelOutboundBuffer
缓冲区的最后一个节点初次调用write 即 addMessage
后fushedEntry
指向空,unFushedEntry
和 tailEntry
都指向新加入节点
第二次调用 addMessage
后
第n次调用 addMessage
后
可得,调用n次addMessage
后
flushedEntry
指针一直指向null
,表此时还没有有节点需写到Socket缓冲区unFushedEntry
后有n个节点,表当前还有n个节点还没有写到Socket缓冲区统计当前有多少字节须要须要被写出
当前缓冲区中有多少待写字节
无论调用channel.flush()
,仍是ctx.flush()
,最终都会落地到pipeline
中的head
节点
以后进入到AbstractUnsafe
flush方法中,先调用
结合前面的图来看,上述过程即
首先拿到 unflushedEntry
指针,而后将flushedEntry
指向unflushedEntry
所指向的节点,调用完毕后
接下来,调用 flush0()
发现这里的核心代码就一个 doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { // 拿到第一个须要flush的节点的数据 Object msg = in.current(); if (msg instanceof ByteBuf) { boolean done = false; long flushedAmount = 0; // 拿到自旋锁迭代次数 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 自旋,将当前节点写出 for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // 写完以后,将当前节点删除 if (done) { in.remove(); } else { break; } } } }
第一步,调用current()
先拿到第一个须要flush
的节点的数据
第二步,拿到自旋锁的迭代次数
第三步 调用 JDK 底层 API 进行自旋写
自旋的方式将ByteBuf
写到JDK NIO的Channel
强转为ByteBuf,若发现没有数据可读,直接删除该节点
拿到自旋锁迭代次数
在并发编程中使用自旋锁能够提升内存使用率和写的吞吐量,默认值为16
继续看源码
javaChannel()
,代表 JDK NIO Channel 已介入这次事件
获得向JDK 底层已经写了多少字节
从 Netty 的 bytebuf 写到 JDK 底层的 bytebuffer
第四步,删除该节点
节点的数据已经写入完毕,接下来就须要删除该节点
首先拿到当前被flush
掉的节点(flushedEntry
所指)
而后拿到该节点的回调对象 ChannelPromise
, 调用 removeEntry()
移除该节点
这里是逻辑移除,只是将flushedEntry指针移到下个节点,调用后
随后,释放该节点数据的内存,调用safeSuccess
回调,用户代码能够在回调里面作一些记录,下面是一段Example
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { // 回调 } })
最后,调用 recycle
,将当前节点回收
writeAndFlush
在某个Handler
中被调用以后,最终会落到 TailContext
节点
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; }
最终,经过一个boolean
变量,表示是调用invokeWriteAndFlush
,仍是invokeWrite
,invokeWrite
即是咱们上文中的write过程
能够看到,最终调用的底层方法和单独调用write
和flush
同样的
由此看来,invokeWriteAndFlush
基本等价于write
以后再来一次flush
write
并无将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush
才是真正的写出writeAndFlush
等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程相似,用自旋锁保证写成功ByteBuf
,将Java对象转换为ByteBuf
,而后再把ByteBuf
继续向前传递,若没有再重写了,最终会传播到 head 节点,其中缓冲区列表拿到缓存写到 JDK 底层 ByteBuffer