Netty源码分析第4章(pipeline)---->第4节: 传播inbound事件

 

Netty源码分析第四章: pipelinehtml

 

第四节: 传播inbound事件ide

 

 

有关于inbound事件, 在概述中作过简单的介绍, 就是以本身为基准, 流向本身的事件, 好比最多见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程oop

 

在业务代码中, 咱们本身的handler每每会经过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢, 也是咱们这一小节要剖析的内容源码分析

 

在业务代码中, 传递channelRead事件方式是经过fireChannelRead方法进行传播的this

 

这里给你们看两种写法:spa

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //写法1:
 ctx.fireChannelRead(msg); //写法2
 ctx.pipeline().fireChannelRead(msg); }

这里重写了channelRead方法, 而且方法体内继续经过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?线程

咱们先以写法2为例, 将这种写法进行剖析debug

这里首先获取当前context的pipeline对象, 而后经过pipeline对象调用自身的fireChannelRead方法进行传播, 由于默认建立的DefaultChannelpipelinecode

咱们跟到DefaultChannelpipeline的fireChannelRead方法中:htm

public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }

这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

咱们跟进invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }

这里的Object m m一般就是咱们传入的msg, 而next, 目前是head节点, 而后再判断是否为当前eventLoop线程, 若是不是则将方法包装成task交给eventLoop线程处理

咱们跟到invokeChannelRead方法中:

private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }

首先经过invokeHandler()判断当前handler是否已添加, 若是添加, 则执行当前handler的chanelRead方法, 其实这里咱们基本上就明白了, 经过fireChannelRead方法传递事件的过程当中, 其实就是找到相关handler执行其channelRead方法, 因为咱们在这里的handler就是head节点, 因此咱们跟到HeadContext的channelRead方法中:

HeadContext的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下传递channelRead事件
 ctx.fireChannelRead(msg); }

在这里咱们看到, 这里经过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是咱们刚才分析用户代码的第一种调用方式:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //写法1:
 ctx.fireChannelRead(msg); //写法2
 ctx.pipeline().fireChannelRead(msg); }

这里直接经过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别的, 我会回到HeadConetx的channelRead方法, 咱们来剖析ctx.fireChannelRead(msg)这句, 你们就会对这个问题有答案了, 跟到ctx的fireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

跟到AbstractChannelHandlerContext类中的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }

这里咱们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

跟到findContextInbound方法:

private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }

这里的逻辑也比较简单, 是经过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

有关事件的标注, 以前的小节已经剖析过了, 若是是用户定义的handler, 是经过handler继承的接口而定的, 若是tail或者head, 那么是在初始化的时候就已经定义好, 这里再也不赘述

回到AbstractChannelHandlerContext类的fireChannelRead方法中:

public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }

找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); //第一次执行next其实就是head
    EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }

这里的逻辑咱们又不陌生了, 由于咱们传入的是当前context的下一个节点, 因此这里会调用下一个节点invokeChannelRead方法, 因咱们刚才剖析的是head节点, 因此下一个节点有多是用户添加的handler的包装类HandlerConext的对象

这里咱们跟进invokeChannelRead方法中去:

private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //发生异常的时候在这里捕获异常
 notifyHandlerException(t); } } else { fireChannelRead(msg); } }

又是咱们熟悉的逻辑, 调用了自身handler的channelRead方法, 若是是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 因此这里就解释了为何经过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

一样, 也解释了该小节最初提到过的两种写法的区别:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //写法1:
 ctx.fireChannelRead(msg); //写法2
 ctx.pipeline().fireChannelRead(msg); }

写法1是经过当前节点往下传播事件

写法2是经过头节点往下传递事件

因此, 在handler中若是若是要在channelRead方法中传递channelRead事件, 必定要采用写法2的方式向下传递, 或者交给其父类处理, 若是采用1的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常

这里有一点须要注意, 若是用户代码中channelRead方法, 若是没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 因此若是咱们写业务代码的时候要考虑有关资源释放的相关操做

若是ctx.fireChannelRead(msg)则事件会继续往下传播, 若是每个handler都向下传播事件, 固然, 根据咱们以前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

咱们跟到tailConext的channelRead方法中:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }

咱们跟进到onUnhandledInboundMessage方法中:

protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg); } finally { //释放资源
 ReferenceCountUtil.release(msg); } }

这里作了释放资源的相关的操做

至此, channelRead事件传输相关罗辑剖析完整, 其实对于inbound事件的传输流程都会遵循这一逻辑, 小伙伴们能够自行剖析其余inbound事件的传输流程

 

上一节: handler的删除

下一节: 传播outbound事件

相关文章
相关标签/搜索