扫描下方二维码或者微信搜索公众号
菜鸟飞呀飞
,便可关注微信公众号,阅读更多Spring源码分析
和Java并发编程
文章。java
前两篇文章中分析了 netty 中解码器相关的源码,解码过程是发生在读数据这一步的,那么读到数据,通过解码器解码后,最终就会交由咱们自定义的业务处理中执行,当咱们的业务逻辑处理完成后,就须要给客户端响应消息,这就涉及到服务端如何经过 channel 将响应消息写出去的流程了,同时还会涉及到消息的编码过程,由于在 TCP 协议中,数据最终是经过字节流传输的,而咱们一般在业务代码中是返回一个对象,所以须要进行编码。接下来本文将会重点分析这两个的过程的源码实现。编程
为了方便描述,这里模拟一个简单的场景:netty 服务端在读到客户端发来的消息后,netty 服务端就经过咱们自定义的 ChannelHandler 来进行业务处理,并返回一个 Data 对象,Data 类是咱们自定义的一个类,而后咱们将 Data 对象经过咱们自定义的一个编码器 DataEncoder 进行编码,最后将消息发送出去。promise
netty 服务端启动的 demo 代码微信
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(bossGroup,workerGroup)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 向客户端channel中添加两个channelHandler
pipeline.addLast(new DataEncoder());
pipeline.addLast(new BusinessHandler());
}
});
serverBootstrap.bind(8080).sync();
}
复制代码
能够看到,分别向客户端的 channel 的 pipeline 中添加了两个 ChannelHandler:DataEncoder 是自定义的一个编码器,负责进行编码,BusinessHandler 是负责进行业务处理的。所以建立出来的客户端 channel 的 pipeline 的结构以下,这个结构很重要,后面的源码分析全是基于这个结构来分析的。并发
DataEncoder 编码器的源码异步
/** * 实际上就是一个基于换行符的编码器 */
public class DataEncoder extends MessageToByteEncoder<Data> {
private static final String LINE = "\r\n";
@Override
protected void encode(ChannelHandlerContext ctx, Data msg, ByteBuf out) throws Exception {
if(msg instanceof Data){
out.writeBytes(msg.getMsg().getBytes());
out.writeLong(msg.getServerTime());
out.writeBytes(LINE.getBytes());
}
}
}
复制代码
为了简化分析,BusinessHandler 的代码比较简单,它就是直接向客户端返回一个 Data 对象ide
public class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读到客户端发来的消息后,服务端直接返回一个data对象
Data data = new Data("Hello World",System.currentTimeMillis());
ctx.channel().writeAndFlush(data);
}
}
复制代码
Data 类就是一个简单的 Bean,包含两个属性:msg,serverTimeoop
public class Data {
private String msg;
private Long serverTime;
public Data(String msg, Long serverTime) {
this.msg = msg;
this.serverTime = serverTime;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getServerTime() {
return serverTime;
}
public void setServerTime(Long serverTime) {
this.serverTime = serverTime;
}
}
复制代码
当服务端读到客户端发送过来的消息后,就会经过客户端 channel(即:NioSocketCahnnel)的 pipeline 进行传播执行 channelRead() 方法,也就是会执行到咱们自定义的 BusinessHaandler 中的 channelRead()方法。此时咱们直接建立了一个 Data 对象,而后经过客户端 channel 的 writeAndFlush() 方法,将 Data 对象写出去,发送给客户端。因此接下来详细分析下 writeAndFlush()方法的工做原理。源码分析
ctx.channel()获取到的是客户端 channel,调用客户端 channel 的 writeAndFlush()时,最终会执行客户端 channel 的 pipeline 的 writeAndFlush()方法。ui
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
复制代码
看到调用 pipeline 的方法,首先想到的是这个方法的调用,会沿着 pipeline 这个管道执行全部 channelHandler 的方法。进入到 pipeline 的 writeAndFlush(msg)方法的源码中,果真,它是从 tail 节点开始向前传播执行。
@Override
public final ChannelFuture writeAndFlush(Object msg) {
// 从尾结点开始向前传播
return tail.writeAndFlush(msg);
}
复制代码
最终会调用 tail 节点的 write() 方法。在调用 write()方法时,传入的第二个参数 flush 为 true,表示的是须要执行 flush()方法(注意:此时传入的是 true,记住这一点很重要,由于后面还会有一个地方也会调用 write()方法,可是传入的是 false)。
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 省略部分代码...
// 找到下一个Outbound类型的handler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 同步执行
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 异步执行,实际上就是将当前的写操做封装成一个task,而后提交到线程池
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}
复制代码
write()方法的逻辑能够分为三步,第一步:找到 pipeline 中下一个 OutBound 类型的 channelHandler,对于咱们 demo 中的示例,就是 DataEncoder。
第二步:根据当前的线程是不是 NioEventLoop 线程来判断是同步执行仍是异步执行 writeAndFlush,因为当前线程是 NioEventLoop,因此最终是同步执行。
第三步:根据传入的 flush 标识来判断是否须要执行 flush 方法,若是传入的是 true,表示须要执行 flush,前面已经提到了,此时传入的 flush 为 true,所以最后会调用 next.invokeWriteAndFlush(m, promise) 这一行代码。
在 invokeWriteAndFlush()中会先调用 invokeWrite0(),也就是触发执行 write()方法,而后调用 invokeFlush0(),也就是触发执行 flush()方法。今天先分析 write()方法相关的源码,flush 相关的源码下一篇文章分析。
invokeWrite0()会触发执行 channelHandler 的 write()方法,此时传播到的 handler 是咱们自定义的 DataEncoder(由于 tail 节点的前一个 OutBound 类型的节点就是 DataEncoder,注意:BusinessHandler 是 InBound 类型),因为 DataEncoder 没有重写 write()方法,因此调用的是父类 MessageToByteEncoder 的 write()方法。在 write()方法中,会调用到子类的 encode()方法,对数据进行编码。其源码以下。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判断要写出的数据对象msg,是否须要当前的MessageToByteEncoder来处理
// 如何判断呢?就根据类中传入的泛型类型和msg的实际类型相比较
// 若是类型匹配成功,那就使用MessageToByteEncoder来处理,即会进行编码:encode()
// 若是匹配不成功,则直接使用write()方法
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
// 建立一个ByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用编码器的编码方法(在编码后,会将编码出来的字节数据存放到buf)
encode(ctx, cast, buf);
} finally {
// 将cast对象的资源释放,由于前面一步已经将对象进行编码了,后面再也不须要用到该对象了
ReferenceCountUtil.release(cast);
}
// 若是编码成功,buf中就会有数据,若是有数据,那么就就能够调用write()方法了
// 会调用父类AbstractChannelHandlerContext的write()方法
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 出现异常后,释放资源
if (buf != null) {
buf.release();
}
}
}
复制代码
write()方法的源码比较长,执行流程能够用以下流程图表示。
首先会根据 acceptOutboundMessage(msg) 方法判断要写出的数据对象 msg,是否须要当前的 MessageToByteEncoder 来处理。如何判断呢?就根据类中传入的泛型类型和 msg 的实际类型相比较,若是类型匹配成功,那就使用 MessageToByteEncoder 来处理,即会进行编码:encode()。例如本文的示例代码中,DataEncoder 这个编码器中,传入的泛型是 Data 类型,这也就意味着 DataEncoder 这个编码器只会对类型是 Data 的数据进行编码。在 BusinessHandler 中咱们发送的数据就是 Data 类型,所以此时 DataEncoder 编码器会对 Data 对象进行编码。若是类型匹配不成功,那么就会直接调用 ctx.write(msg, promise)方法,向下一个 handler 传播执行 write()方法。
当类型匹配成功之后,会先将 msg 的类型强转为指定的泛型类型,在 demo 中就是 Data 类型。而后 allocateBuffer()方法申请一块内存,因为 preferDirect 属性默认是 true,所以会建立一块堆外内存。接着调用 encode()方法进行编码,因为咱们定义的 DataEncoder 重写了 encode()方法,因此这里会调用 DataEncoder 的 encode()方法。
protected void encode(ChannelHandlerContext ctx, Data msg, ByteBuf out) throws Exception {
if(msg instanceof Data){
out.writeBytes(msg.getMsg().getBytes());
out.writeLong(msg.getServerTime());
out.writeBytes(LINE.getBytes());
}
}
复制代码
encode 中传入的第一个参数就是当前持有 DataEncoder 的 context,第二个参数 msg 就是咱们在 BusinessHandler 中建立的 Data 对象,第三个参数 out 就是经过 allocateBuffer()建立出来的一块堆外内存。
encode()方法执行完成之后,再次回到父类的 write()中,因为已经对 msg 对象编码完成了,因此后面该对象没有任何用处了,能够直接释放该对象。
若是编码成功,buf 中就会有数据,若是有数据,buf.isReadable()会返回 true,那么就接着调用 write()方法了,最终会调用父类 AbstractChannelHandlerContext 的 write()方法。若是没有数据被编码,那么就将以前申请的 ByteBuf 内存释放,而后依旧是调用父类 AbstractChannelHandlerContext 的 write()方法,只不过传入的 msg 对象是一个空的 ByteBuf。
最终又会调用到 AbstractChannelHandlerContext 类中的 write(msg,flush,promise)方法(在前面分析 tail 节点的时候已经贴出了该方法的源码),与以前不一样的是,此时传入的第二个参数 flush 为 false,表示不调用 flush。一样先经过 pipeline 找到下一个 Outbound 节点,对于本文 demo 中,下一个节点就是 Head 节点,因为 flush 传入的是 false,所以会调用 next.invokeWrite(m, promise),也就是最终会调用 head 节点的 write()方法。head 节点的 write()方法源码以下。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 对于客户端Channel而言,unsafe对象是NioSocketChannelUnsafe类型的实例
unsafe.write(msg, promise);
}
复制代码
能够看到,在 head 节点中最终是经过 unsafe 对象来写数据的,此时因为 channel 是 NioSocketChannel,因此 unsafe 是 NioSocketChannelUnsafe 对象实例。最终调用的是 AbstractChannel 类的 write()方法。源码以下。
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
// 非空判断
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 将msg转化成DirectByteBuf
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
// 异常处理
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 将待写的数据添加到待写队列中,待写队列是一个entry组成的链表
outboundBuffer.addMessage(msg, size, promise);
}
复制代码
这该方法中有两个比较重要的逻辑,第一处逻辑:若是 msg 不是堆外内存,会先将 msg 转换为 DirectByteBuf 类型(也就是堆外内存);第二处逻辑:也就是该方法的最后一行,这一行代码的做用是啥呢?将待写的数据添加到待写队列中,待写队列是一个 entry 组成的链表,被存放在 ChannelOutboundBuffer 这个缓冲区中。下面详细分析一下这一行代码的实现原理。
netty 在发送数据时,是先 write(),而后再 flush(),这样才会把数据发送到操做系统的套接字中。若是仅仅是调用 write()方法,数据是不会被发送出去的,而是先存放在 ChannelOutboundBuffer 缓冲区,该缓冲区里面维护了一个队列,每调用一次 write()方法,就向队列中添加一个 entry。对于同一个 NioSocketChannel 而言,可能存在一边调用 write(),一边调用 flush()方法,所以就须要区分出这个缓冲队列中,哪些数据是刚刚 write 进来的,哪些数据是已经被 flush 过了,怎么区分呢?netty 为这个缓冲区提供了两个指针:flushedEntry 和 unflushedEntry,分别指向的是第一个已经被 flush 的数据和第一个没有被 flush 的数据(注意:这里强调一下一个词:第一个,由于可能连续有还几条数据等待被 flush 或者已经被 flush,因此指针指向的是第一个),另外因为是队列,咱们须要维护一个指向队列头部的指针或者尾部的指针,所以还提供了一个 tailEntry 指针用来指向队列的尾部。
初始状态下,这三个指针均指向 null,由于尚未数据没写入缓冲队列。当调用 write()方法和 flush()方法后,三个指针的指向变化关系以下图所示。
调用 write()方法时,最终会调用 outboundBuffer.addMessage()方法来移动这三个指针,下面看下该方法的源码。
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 将要写的数据封装成一个Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 移动相关的指针
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 累计要写的字节数
incrementPendingOutboundBytes(entry.pendingSize, false);
}
复制代码
addMessage()方法的逻辑大体能够分为两部分,第一部分:将传入的数据封装成一个 Entry,而后移动三个指针,至于如何移动的,能够参考上面的示意图,那样会更清晰。第二部分:累计写数据的字节数,由于操做系统的套接字缓冲区会有限制,咱们不能不停地向缓冲区中写入数据,所以 netty 在写数据时,会累加当前要发送数据的字节数,若是超过了限制,就会将当前 channel 设置为不可写状态,直到要写的数据量低于某个值后,channel 的状态又会被设置成可写状态。incrementPendingOutboundBytes()方法的源码以下。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 累加字节数
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 若是待写的数量超过了设置的最高水位线,那么就会将channel设置为不可写状态
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
复制代码
从代码中能够看到,若是累计的字节数超过了缓冲区配置的高水位线,那么就会将 channel 设置为不可写状态。和高水位线相对应的是低水位线,当要写的字节数低于低水位线时,channel 会被从新设置成可写状态。何时会判断低水位线呢?那就是在调用 flush()方法时,flush()方法会在下一篇文章中详细分析。
在 netty 中,默认的高水位线的值是 64KB,低水位线是 32KB。也就是说,当累计要发送的数据大于 64KB 时,channel 会被暂时设置为不可写状态,直到 channel 缓冲区中的数据低于 32KB 时,会从新变为可写状态。
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
复制代码
至此,关于写数据时的 write()方法的源码就分析完了。
最后总结一下,当调用 channel 的 writeAndFlush()方法发送数据时,会从 pipleline 的 tail 节点开始向前传播执行 writeAndFlush()方法。先从 tail 节点向前依次调用 OutBound 类型 handler 的 write()方法,对于编码器而言,在 write()方法中会调用 encode()方法,将数据进行编码,最终 write()方法会被传播执行到 head 节点中,在 head 节点中,会将要写的数据存储到缓冲区,这个缓冲区是由一个队列组成的,由 3 个指针来维护关联关系,每当调用 write()方法或者 flush()方法时,会移动这 3 个指针。最后当等待发送的数据超过 64KB 时,channel 会暂时变为不可写状态,直到堆积的数据量低于 32KB 时,才会从新变为可写状态。
关于 flush()方法的源码会在下一篇文章中详细分析。