Netty源码剖析与实战-第五周

第三章:Netty 源码 从“线”(请求处理)的角度剖析

目录

第三章:Netty 源码 从“线”(请求处理)的角度剖析

业务处理

主线

发送数据

写数据三种方式

写数据要点

主线

知识点

断开连接

主线

知识点

关闭服务

主线

知识点


 

业务处理

主线

worker thread

• 多路复用器( Selector )接收到OP_READ 事件
• 处理OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read()
• 分配一个初始1024 字节的byte buffer 来接受数据
• 从Channel 接受数据到byte buffer
• 记录实际接受数据大小,调整下次分配byte buffer 大小

• 触发pipeline.fireChannelRead(byteBuf) 把读取到的数据传播出去
• 判断接受byte buffer 是否满载而归:是,尝试继续读取直到没有数据或满16 次;
否,结束本轮读取,等待下次OP_READ事件

 

Handler 执行资格:
• 实现了ChannelInboundHandler
• 实现方法channelRead 不能加注解@Skip

 

知识点

• 处理业务本质:数据在pipeline 中所有的handler 的channelRead() 执行过程
Handler 要实现io.netty.channel.ChannelInboundHandler#channelRead (ChannelHandlerContext ctx,
Object msg),且不能加注解@Skip 才能被执行到。
中途可退出,不保证执行到Tail Handler。
• 默认处理线程就是Channel 绑定的NioEventLoop 线程,也可以设置其他:
pipeline.addLast(new UnorderedThreadPoolEventExecutor(10), serverHandler)

 

发送数据

写数据三种方式

写数据要点

1 对方仓库爆仓时,送不了的时候,会停止送,协商等电话通知什么时候好了,再送。
Netty 写数据,写不进去时,会停止写,然后注册一个OP_WRITE 事件,来通知什么时候可以写进去了再写。
2 发送快递时,对方仓库都直接收下,这个时候再发送快递时,可以尝试发送更多的快递试试,这样效果更好。
Netty 批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整maxBytesPerGatheringWrite)。

3 发送快递时,发到某个地方的快递特别多,我们会连续发,但是快递车毕竟有限,也会考虑下其他地方。
Netty 只要有数据要写,且能写的出去,则一直尝试,直到写不出去或者满16 次(writeSpinCount)。
4 揽收太多,发送来不及时,爆仓,这个时候会出个告示牌:收不下了,最好过2 天再来邮寄吧。
Netty 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成
false ,让应用端自己做决定要不要发送数据了。

主线

• Write - 写数据到buffer :
ChannelOutboundBuffer#addMessage
• Flush - 发送buffer 里面的数据:
AbstractChannel.AbstractUnsafe#flush
• 准备数据: ChannelOutboundBuffer#addFlush
• 发送:NioSocketChannel#doWrite


知识点

• 写的本质:
• Single write: sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
• gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[], int, int)
• 写数据写不进去时,会停止写,注册一个OP_WRITE 事件,来通知什么时候可以写进去了。
• OP_WRITE 不是说有数据可写,而是说可以写进去,所以正常情况,不能注册,否则一直触发。

• 批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多(maxBytesPerGatheringWrite)。
• 只要有数据要写,且能写,则一直尝试,直到16 次(writeSpinCount),写16 次还没有写完,就直
接schedule 一个task 来继续写,而不是用注册写事件来触发,更简洁有力。
• 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成false ,
让应用端自己做决定要不要继续写。
• channelHandlerContext.channel().write() :从TailContext 开始执行;
channelHandlerContext.write() : 从当前的Context 开始。

 

断开连接

主线

 

• 多路复用器(Selector)接收到OP_READ 事件:
• 处理OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read():
• 接受数据
• 判断接受的数据大小是否< 0 , 如果是,说明是关闭,开始执行关闭:
• 关闭channel(包含cancel 多路复用器的key)。
• 清理消息:不接受新信息,fail 掉所有queue 中消息。
• 触发fireChannelInactive 和fireChannelUnregistered 。


知识点

 

• 关闭连接本质:
• java.nio.channels.spi.AbstractInterruptibleChannel#close
• java.nio.channels.SelectionKey#cancel
• 要点:
• 关闭连接,会触发OP_READ 方法。读取字节数是-1 代表关闭。
• 数据读取进行时,强行关闭,触发IO Exception,进而执行关闭。
• Channel 的关闭包含了SelectionKey 的cancel 。

 

关闭服务

主线

 

• bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
关闭所有Group 中的NioEventLoop:
• 修改NioEventLoop 的State 标志位
• NioEventLoop 判断State 执行退出


知识点

• 关闭服务本质: • 关闭所有连接及Selector : • java.nio.channels.Selector#keys • java.nio.channels.spi.AbstractInterruptibleChannel#close • java.nio.channels.SelectionKey#cancel • selector.close() • 关闭所有线程:退出循环体for (;;)