源码之下无秘密 ── 作最好的 Netty 源码分析教程java
Netty 源码分析之 番外篇 Java NIO 的前生今世git
Java NIO 的前生今世 之一 简介github
Java NIO 的前生今世 之二 NIO Channel 小结segmentfault
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头oop
此文章已同步发送到个人 github 上
这篇是 Netty 源码分析 的第二篇, 在这篇文章中, 我会为读者详细地分析 Netty 中的 ChannelPipeline 机制.
相信你们都知道了, 在 Netty 中每一个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系以下:
经过上图咱们能够看到, 一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表. 这个链表的头是 HeadContext, 链表的尾是 TailContext, 而且每一个 ChannelHandlerContext 中又关联着一个 ChannelHandler.
上面的图示给了咱们一个对 ChannelPipeline 的直观认识, 可是实际上 Netty 实现的 Channel 是否真的是这样的呢? 咱们继续用源码说话.
在第一章 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 中, 咱们已经知道了一个 Channel 的初始化的基本过程, 下面咱们再回顾一下.
下面的代码是 AbstractChannel 构造器:
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
AbstractChannel 有一个 pipeline 字段, 在构造器中会初始化它为 DefaultChannelPipeline的实例. 这里的代码就印证了一点: 每一个 Channel 都有一个 ChannelPipeline
.
接着咱们跟踪一下 DefaultChannelPipeline 的初始化过程.
首先进入到 DefaultChannelPipeline 构造器中:
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中, 而后实例化两个 ChannelHandlerContext, 一个是 HeadContext 实例 head, 另外一个是 TailContext 实例 tail. 接着将 head 和 tail 互相指向, 构成一个双向链表.特别注意到, 咱们在开始的示意图中, head 和 tail 并无包含 ChannelHandler
, 这是由于 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了, 所以它们有 Context 和 Handler 的双重属性.
在第一章的时候, 咱们已经对 ChannelPipeline 的初始化有了一个大体的了解, 不过当时重点毕竟不在 ChannelPipeline 这里, 所以没有深刻地分析它的初始化过程. 那么下面咱们就来看一下具体的 ChannelPipeline 的初始化都作了哪些工做吧.
咱们再来回顾一下, 在实例化一个 Channel 时, 会伴随着一个 ChannelPipeline 的实例化, 而且此 Channel 会与这个 ChannelPipeline 相互关联, 这一点能够经过NioSocketChannel 的父类 AbstractChannel 的构造器予以佐证:
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
当实例化一个 Channel(这里以 EchoClient 为例, 那么 Channel 就是 NioSocketChannel), 其 pipeline 字段就是咱们新建立的 DefaultChannelPipeline 对象, 那么咱们就来看一下 DefaultChannelPipeline 的构造方法吧:
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
能够看到, 在 DefaultChannelPipeline 的构造方法中, 将传入的 channel 赋值给字段 this.channel, 接着又实例化了两个特殊的字段: tail 与 head. 这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键.
再回顾一下 head 和 tail 的类层次结构:
从类层次结构图中能够很清楚地看到, head 实现了 ChannelInboundHandler, 而 tail 实现了 ChannelOutboundHandler 接口, 而且它们都实现了 ChannelHandlerContext 接口, 所以能够说 head 和 tail 便是一个 ChannelHandler, 又是一个 ChannelHandlerContext.
接着看一下 HeadContext 的构造器:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); }
它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = false, outbound = true.
TailContext 的构造器与 HeadContext 的相反, 它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = true, outbound = false.
即 header 是一个 outboundHandler, 而 tail 是一个inboundHandler, 关于这一点, 你们要特别注意, 由于在后面的分析中, 咱们会反复用到 inbound 和 outbound 这两个属性.
上面一小节中, 咱们已经分析了 Channel 的组成, 其中咱们了解到, 最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler), 可是这个 Pipeline并不能实现什么特殊的功能, 由于咱们尚未给它添加自定义的 ChannelHandler.
一般来讲, 咱们在初始化 Bootstrap, 会添加咱们自定义的 ChannelHandler, 就以咱们熟悉的 EchoClient 来举例吧:
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoClientHandler()); } });
上面代码的初始化过程, 相信你们都不陌生. 在调用 handler 时, 传入了 ChannelInitializer 对象, 它提供了一个 initChannel 方法供咱们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢? 下面咱们就来揭开它的神秘面纱.
ChannelInitializer 实现了 ChannelHandler, 那么它是在何时添加到 ChannelPipeline 中的呢? 进行了一番搜索后, 咱们发现它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
其代码以下:
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); ... }
上面的代码将 handler() 返回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 返回的是handler 其实就是咱们在初始化 Bootstrap 调用 handler 设置的 ChannelInitializer 实例, 所以这里就是将 ChannelInitializer 插入到了 Pipeline 的末端.
此时 Pipeline 的结构以下图所示:
有朋友可能就有疑惑了, 我明明插入的是一个 ChannelInitializer 实例, 为何在 ChannelPipeline 中的双向链表中的元素倒是一个 ChannelHandlerContext? 为了解答这个问题, 咱们继续在代码中寻找答案吧.
咱们刚才提到, 在 Bootstrap.init 中会调用 p.addLast() 方法, 将 ChannelInitializer 插入到链表末端:
@Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); // 检查此 handler 是否有重复的名字 AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; }
addLast 有不少重载的方法, 咱们关注这个比较重要的方法就能够了.
上面的 addLast 方法中, 首先检查这个 ChannelHandler 的名字是不是重复的, 若是不重复的话, 则为这个 Handler 建立一个对应的 DefaultChannelHandlerContext 实例, 并与之关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例). 判断此 Handler 是否重名的方法很简单: Netty 中有一个 name2ctx Map 字段, key 是 handler 的名字, 而 value 则是 handler 自己. 所以经过以下代码就能够判断一个 handler 是否重名了:
private void checkDuplicateName(String name) { if (name2ctx.containsKey(name)) { throw new IllegalArgumentException("Duplicate handler name: " + name); } }
为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 所以在上面的代码中咱们能够看到新实例化了一个 newCtx 对象, 并将 handler 做为参数传递到构造方法中. 那么咱们来看一下实例化的 DefaultChannelHandlerContext 到底有什么玄机吧.
首先看它的构造器:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) { super(pipeline, group, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }
DefaultChannelHandlerContext 的构造器中, 调用了两个颇有意思的方法: isInbound 与 isOutbound, 这两个方法是作什么的呢?
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
从源码中能够看到, 当一个 handler 实现了 ChannelInboundHandler 接口, 则 isInbound 返回真; 类似地, 当一个 handler 实现了 ChannelOutboundHandler 接口, 则 isOutbound 就返回真.
而这两个 boolean 变量会传递到父类 AbstractChannelHandlerContext 中, 并初始化父类的两个字段: inbound 与 outbound.
那么这里的 ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound 与 inbound 字段分别是什么呢? 那就看一下 ChannelInitializer 到底实现了哪一个接口不就好了? 以下是 ChannelInitializer 的类层次结构图:
能够清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口, 所以这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.
不就是 inbound 和 outbound 两个字段嘛, 为何须要这么大费周章地分析一番? 其实这两个字段关系到 pipeline 的事件的流向与分类, 所以是十分关键的, 不过我在这里先卖个关子, 后面咱们再来详细分析这两个字段所起的做用. 在这里, 读者只须要记住, ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 便可.
当建立好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中:
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; name2ctx.put(name, newCtx); callHandlerAdded(newCtx); }
显然, 这个代码就是典型的双向链表的插入操做了. 当调用了 addLast 方法后, Netty 就会将此 handler 添加到双向链表中 tail 元素以前的位置.
在上一小节中, 咱们已经分析了一个 ChannelInitializer 如何插入到 Pipeline 中的, 接下来就来探讨一下 ChannelInitializer 在哪里被调用, ChannelInitializer 的做用, 以及咱们自定义的 ChannelHandler 是如何插入到 Pipeline 中的.
在 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端) 一章的 channel 的注册过程 小节中, 咱们已经分析过 Channel 的注册过程了, 这里咱们再简单地复习一下:
首先在 AbstractBootstrap.initAndRegister中, 经过 group().register(channel), 调用 MultithreadEventLoopGroup.register 方法
在MultithreadEventLoopGroup.register 中, 经过 next() 获取一个可用的 SingleThreadEventLoop, 而后调用它的 register
在 SingleThreadEventLoop.register 中, 经过 channel.unsafe().register(this, promise) 来获取 channel 的 unsafe() 底层操做对象, 而后调用它的 register.
在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel
在 AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
AbstractNioChannel.doRegister 方法经过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 而且将当前 Channel 做为 attachment.
而咱们自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现以下:
@Override public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; }
上面的代码很简单, 就是调用了 head.fireChannelRegistered() 方法而已.
关于上面代码的 head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的经常使用方式, 咱们之后会常常看到.
还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例, 而且它没有重写 fireChannelRegistered 方法, 所以 head.fireChannelRegistered 实际上是调用的 AbstractChannelHandlerContext.fireChannelRegistered:
@Override public ChannelHandlerContext fireChannelRegistered() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRegistered(); } }); } return this; }
这个方法的第一句是调用 findContextInbound 获取一个 Context, 那么它返回的 Context 究竟是什么呢? 咱们跟进代码中看一下:
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
很显然, 这个代码会从 head 开始遍历 Pipeline 的双向链表, 而后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 想起来了没? 咱们在前面分析 ChannelInitializer 时, 花了大量的笔墨来分析了 inbound 和 outbound 属性, 你看如今这里就用上了. 回想一下, ChannelInitializer 实现了 ChannelInboudHandler, 所以它所对应的 ChannelHandlerContext 的 inbound 属性就是 true, 所以这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:
当获取到 inbound 的 Context 后, 就调用它的 invokeChannelRegistered 方法:
private void invokeChannelRegistered() { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } }
咱们已经强调过了, 每一个 ChannelHandler 都与一个 ChannelHandlerContext 关联, 咱们能够经过 ChannelHandlerContext 获取到对应的 ChannelHandler. 所以很显然了, 这里 handler() 返回的, 其实就是一开始咱们实例化的 ChannelInitializer 对象, 并接着调用了 ChannelInitializer.channelRegistered 方法. 看到这里, 读者朋友是否会以为有点眼熟呢? ChannelInitializer.channelRegistered 这个方法咱们在第一章的时候已经大量地接触了, 可是咱们并无深刻地分析这个方法的调用过程, 那么在这里读者朋友应该对它的调用有了更加深刻的了解了吧.
那么这个方法中又有什么玄机呢? 继续看代码:
@Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { initChannel((C) ctx.channel()); ctx.pipeline().remove(this); ctx.fireChannelRegistered(); }
initChannel 这个方法咱们很熟悉了吧, 它就是咱们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoClientHandler()); } });
所以当调用了这个方法后, 咱们自定义的 ChannelHandler 就插入到 Pipeline 了, 此时的 Pipeline 以下图所示:
当添加了自定义的 ChannelHandler 后, 会删除 ChannelInitializer 这个 ChannelHandler, 即 "ctx.pipeline().remove(this)", 所以最后的 Pipeline 以下:
好了, 到了这里, 咱们的 自定义 ChannelHandler 的添加过程 也分析的查很少了.
下一小节 Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (二)
本文由 yongshun 发表于我的博客, 采用 署名-相同方式共享 3.0 中国大陆许可协议.
Email: yongshun1228@gmail.com
本文标题为: Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)
本文连接为: http://www.javashuo.com/article/p-dzuxyzxk-hk.html