Netty源码分析之ChannelPipeline—入站事件的传播

以前的文章中咱们说过ChannelPipeline做为Netty中的数据管道,负责传递Channel中消息的事件传播,事件的传播分为入站和出站两个方向,分别通知ChannelInboundHandler与ChannelOutboundHandler来触发对应事件。这篇文章咱们先对Netty中入站事件的传播,也就是ChannelInboundHandler进行下分析:html

一、入站事件传播示例

咱们经过一个简单的例子看下ChannelPipeline中入站事件channelRead的传播bootstrap

public class ServerApp { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(2); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // p.addLast(new LoggingHandler(LogLevel.INFO)); // 向ChannelPipeline中添加自定义channelHandler
                            p.addLast(new ServerHandlerA()); p.addLast(new ServerHandlerB()); p.addLast(new ServerHandlerC()); } }); bootstrap.bind(8050).sync(); } catch (Exception e) { // TODO: handle exception
 } } } public class ServerHandlerA  extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.channel().pipeline().fireChannelRead("hello word"); } } public class ServerHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } } public class ServerHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } }

客户端链接服务后可看到输出结果微信

io.netty.example.echo.my.ServerHandlerA--hello word io.netty.example.echo.my.ServerHandlerB--hello word io.netty.example.echo.my.ServerHandlerC--hello word

经过输出结果咱们能够看到,消息会根据向ChannelPipeline中添加自定义channelHandler的顺序传递,并经过实现channelRead接口处理消息接收事件的。在例子中channelRead事件的传递是经过ctx.fireChannelRead(object)方法实现,接下来咱们就从这里入手看下ChannelPipeline事件传递的具体实现。ide

二、channelRead事件的传播

首先这里须要注意的是咱们例子中第一个节点的传递与实际应用中入站数据的传递是经过ChannelPipeline的fireChannelRead方法实现的,由于在实际的应用中,入站事件的传递是由NioUnsafe的read接口实现发起的,须要保证消息是从head结点开始传递的,例子中是为了模拟这一过程。oop

ctx.channel().pipeline().fireChannelRead("hello word");
 @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg);//默认传入head节点
        return this; } 

进入invokeChannelRead方法内部看下具体实现;this

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //ObjectUtil.checkNotNull 判断传入的消息数据是否为空 //next.pipeline.touch 对消息类型进行判断
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor();//获取ChannelHandlerContext对应的线程
        if (executor.inEventLoop()) {//是否为当前线程
            next.invokeChannelRead(m);//调用ChannelHandlerContext中invokeChannelRead的回调方法
        } else { executor.execute(new Runnable() {//若是线程不是当前线程
 @Override public void run() { next.invokeChannelRead(m); } }); } }

其中invokeChannelRead方法会获取该ChannelHandlerContext所封装的handler实现;spa

private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //获取封装的ChannelInboundHandler实现,并调用咱们实现的channelRead方法,
                ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }

前面咱们知道首先传入的ChannelPipeline中ChannelHandlerContext链表的head头部节点HeadContext,看下其channelRead的方法实现;.net

 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }

调用当前ChannelHandlerContext的fireChannelRead方法,进入ctx.fireChannelRead(object)方法内部看下具体的源码实现;线程

 @Override public ChannelHandlerContext fireChannelRead(final Object msg) { //开始消息传递,findContextInbound方法按顺序获取当前ChannelHandlerContext的next节点
 invokeChannelRead(findContextInbound(), msg); return this; }

findContextInbound方法获取的是HeadContext的下一个节点,也就是咱们例子中向ChannelPipeline中添加自定义ServerHandlerA;debug

到这里其实就能够看出Pipeline中channelRead事件的传播主要就是经过ctx.fireChannelRead(msg),获取当前ChannelHandlerContext下一个节点中封装的ChannelInboundHandler来实现的,最后一步一步传递到Tail尾部节点。

三、资源的释放及SimpleChannelInboundHandler

Netty中对象的生命周期由它们的引用计数管理的,为保证入站对象资源被释放,咱们须要经过ReferenceCountUtil.release方法减小引用计数,确保对象的的最终计数器最后被置为0,从而被回收释放。咱们看下Netty在入站事件中默认是如何减小引用计数的。

第一种方法,若是咱们跟上面示例同样,在实现的每个ChannelInboundHandler中都调用了ctx.fireChannelRead(msg),最后消息会被传递到Tail尾节点,咱们看下Tail节点中的channelRead方法

 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); } protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }

Tail节点的channelRead方法最终会调用ReferenceCountUtil.release方法来减小引用计数的,因此若是你在处理入站消息的过程当中没有增长引用而且经过ctx.fireChannelRead(msg)方法把消息传到了Tail节点,你就不须要本身显式调用ReferenceCountUtil.release方法了。

其次若是继承的是SimpleChannelInboundHandler,能够看到SimpleChannelInboundHandler的channelRead方法实现中也已经调用了ReferenceCountUtil.release方法来减小引用计数;

 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }

因此关于入站消息的资源释放方式总结以下:

  • 一、继承ChannelInboundHandlerAdapter ,在channelRead的方法实现中调用ctx.fireChannelRead(object)方法,把消息一直向下传递,直到传递到Tail尾部节点,由Tail节点执行 ReferenceCountUtil.release来减小计数器,保证资源释放;
  • 二、继承SimpleChannelInboundHandler,SimpleChannelInboundHandler自己的ChannelRead方法中会执行 ReferenceCountUtil.release来减小引用;
  • 三、若是以上两点都没有作到,那就须要手动调用ReferenceCountUtil.release来减小引用来释放资源;

 

到这里咱们基本了解了ChannelPipeline中入站事件是如何传播与相应的的,以及Netty中入站消息的资源释放机制。其中若有不足与不正确的地方还望指出与海涵。

 

关注微信公众号,查看更多技术文章。

 

 

原文出处:https://www.cnblogs.com/dafanjoy/p/12274701.html

相关文章
相关标签/搜索