Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)

目录

源码之下无秘密 ── 作最好的 Netty 源码分析教程java

此文章已同步发送到个人 github

前言

这篇是 Netty 源码分析 的第二篇, 在这篇文章中, 我会为读者详细地分析 Netty 中的 ChannelPipeline 机制.

Channel 与 ChannelPipeline

相信你们都知道了, 在 Netty 中每一个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系以下:

clipboard.png

经过上图咱们能够看到, 一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表. 这个链表的头是 HeadContext, 链表的尾是 TailContext, 而且每一个 ChannelHandlerContext 中又关联着一个 ChannelHandler.
上面的图示给了咱们一个对 ChannelPipeline 的直观认识, 可是实际上 Netty 实现的 Channel 是否真的是这样的呢? 咱们继续用源码说话.

在第一章 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 中, 咱们已经知道了一个 Channel 的初始化的基本过程, 下面咱们再回顾一下.
下面的代码是 AbstractChannel 构造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

AbstractChannel 有一个 pipeline 字段, 在构造器中会初始化它为 DefaultChannelPipeline的实例. 这里的代码就印证了一点: 每一个 Channel 都有一个 ChannelPipeline.
接着咱们跟踪一下 DefaultChannelPipeline 的初始化过程.
首先进入到 DefaultChannelPipeline 构造器中:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中, 而后实例化两个 ChannelHandlerContext, 一个是 HeadContext 实例 head, 另外一个是 TailContext 实例 tail. 接着将 head 和 tail 互相指向, 构成一个双向链表.
特别注意到, 咱们在开始的示意图中, head 和 tail 并无包含 ChannelHandler, 这是由于 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了, 所以它们有 Context 和 Handler 的双重属性.

ChannelPipeline 的初始化再探

在第一章的时候, 咱们已经对 ChannelPipeline 的初始化有了一个大体的了解, 不过当时重点毕竟不在 ChannelPipeline 这里, 所以没有深刻地分析它的初始化过程. 那么下面咱们就来看一下具体的 ChannelPipeline 的初始化都作了哪些工做吧.

ChannelPipeline 实例化过程

咱们再来回顾一下, 在实例化一个 Channel 时, 会伴随着一个 ChannelPipeline 的实例化, 而且此 Channel 会与这个 ChannelPipeline 相互关联, 这一点能够经过NioSocketChannel 的父类 AbstractChannel 的构造器予以佐证:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

当实例化一个 Channel(这里以 EchoClient 为例, 那么 Channel 就是 NioSocketChannel), 其 pipeline 字段就是咱们新建立的 DefaultChannelPipeline 对象, 那么咱们就来看一下 DefaultChannelPipeline 的构造方法吧:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

能够看到, 在 DefaultChannelPipeline 的构造方法中, 将传入的 channel 赋值给字段 this.channel, 接着又实例化了两个特殊的字段: tailhead. 这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键.
再回顾一下 head 和 tail 的类层次结构:

clipboard.png

clipboard.png

从类层次结构图中能够很清楚地看到, head 实现了 ChannelInboundHandler, 而 tail 实现了 ChannelOutboundHandler 接口, 而且它们都实现了 ChannelHandlerContext 接口, 所以能够说 head 和 tail 便是一个 ChannelHandler, 又是一个 ChannelHandlerContext.

接着看一下 HeadContext 的构造器:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = false, outbound = true.
TailContext 的构造器与 HeadContext 的相反, 它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = true, outbound = false.
即 header 是一个 outboundHandler, 而 tail 是一个inboundHandler, 关于这一点, 你们要特别注意, 由于在后面的分析中, 咱们会反复用到 inbound 和 outbound 这两个属性.

ChannelInitializer 的添加

上面一小节中, 咱们已经分析了 Channel 的组成, 其中咱们了解到, 最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler), 可是这个 Pipeline并不能实现什么特殊的功能, 由于咱们尚未给它添加自定义的 ChannelHandler.
一般来讲, 咱们在初始化 Bootstrap, 会添加咱们自定义的 ChannelHandler, 就以咱们熟悉的 EchoClient 来举例吧:

Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

上面代码的初始化过程, 相信你们都不陌生. 在调用 handler 时, 传入了 ChannelInitializer 对象, 它提供了一个 initChannel 方法供咱们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢? 下面咱们就来揭开它的神秘面纱.

ChannelInitializer 实现了 ChannelHandler, 那么它是在何时添加到 ChannelPipeline 中的呢? 进行了一番搜索后, 咱们发现它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
其代码以下:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
    ...
}

上面的代码将 handler() 返回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 返回的是handler 其实就是咱们在初始化 Bootstrap 调用 handler 设置的 ChannelInitializer 实例, 所以这里就是将 ChannelInitializer 插入到了 Pipeline 的末端.
此时 Pipeline 的结构以下图所示:

clipboard.png

有朋友可能就有疑惑了, 我明明插入的是一个 ChannelInitializer 实例, 为何在 ChannelPipeline 中的双向链表中的元素倒是一个 ChannelHandlerContext? 为了解答这个问题, 咱们继续在代码中寻找答案吧.
咱们刚才提到, 在 Bootstrap.init 中会调用 p.addLast() 方法, 将 ChannelInitializer 插入到链表末端:

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 检查此 handler 是否有重复的名字

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

addLast 有不少重载的方法, 咱们关注这个比较重要的方法就能够了.
上面的 addLast 方法中, 首先检查这个 ChannelHandler 的名字是不是重复的, 若是不重复的话, 则为这个 Handler 建立一个对应的 DefaultChannelHandlerContext 实例, 并与之关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例). 判断此 Handler 是否重名的方法很简单: Netty 中有一个 name2ctx Map 字段, key 是 handler 的名字, 而 value 则是 handler 自己. 所以经过以下代码就能够判断一个 handler 是否重名了:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 所以在上面的代码中咱们能够看到新实例化了一个 newCtx 对象, 并将 handler 做为参数传递到构造方法中. 那么咱们来看一下实例化的 DefaultChannelHandlerContext 到底有什么玄机吧.
首先看它的构造器:

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

DefaultChannelHandlerContext 的构造器中, 调用了两个颇有意思的方法: isInboundisOutbound, 这两个方法是作什么的呢?

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

从源码中能够看到, 当一个 handler 实现了 ChannelInboundHandler 接口, 则 isInbound 返回真; 类似地, 当一个 handler 实现了 ChannelOutboundHandler 接口, 则 isOutbound 就返回真.
而这两个 boolean 变量会传递到父类 AbstractChannelHandlerContext 中, 并初始化父类的两个字段: inboundoutbound.
那么这里的 ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound 与 inbound 字段分别是什么呢? 那就看一下 ChannelInitializer 到底实现了哪一个接口不就好了? 以下是 ChannelInitializer 的类层次结构图:

clipboard.png

能够清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口, 所以这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.
不就是 inbound 和 outbound 两个字段嘛, 为何须要这么大费周章地分析一番? 其实这两个字段关系到 pipeline 的事件的流向与分类, 所以是十分关键的, 不过我在这里先卖个关子, 后面咱们再来详细分析这两个字段所起的做用. 在这里, 读者只须要记住, ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 便可.

当建立好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中:

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

显然, 这个代码就是典型的双向链表的插入操做了. 当调用了 addLast 方法后, Netty 就会将此 handler 添加到双向链表中 tail 元素以前的位置.

自定义 ChannelHandler 的添加过程

在上一小节中, 咱们已经分析了一个 ChannelInitializer 如何插入到 Pipeline 中的, 接下来就来探讨一下 ChannelInitializer 在哪里被调用, ChannelInitializer 的做用, 以及咱们自定义的 ChannelHandler 是如何插入到 Pipeline 中的.

Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端) 一章的 channel 的注册过程 小节中, 咱们已经分析过 Channel 的注册过程了, 这里咱们再简单地复习一下:

  • 首先在 AbstractBootstrap.initAndRegister中, 经过 group().register(channel), 调用 MultithreadEventLoopGroup.register 方法

  • 在MultithreadEventLoopGroup.register 中, 经过 next() 获取一个可用的 SingleThreadEventLoop, 而后调用它的 register

  • 在 SingleThreadEventLoop.register 中, 经过 channel.unsafe().register(this, promise) 来获取 channel 的 unsafe() 底层操做对象, 而后调用它的 register.

  • 在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel

  • 在 AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法

  • AbstractNioChannel.doRegister 方法经过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 而且将当前 Channel 做为 attachment.

而咱们自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现以下:

@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}

上面的代码很简单, 就是调用了 head.fireChannelRegistered() 方法而已.

关于上面代码的 head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的经常使用方式, 咱们之后会常常看到.

还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例, 而且它没有重写 fireChannelRegistered 方法, 所以 head.fireChannelRegistered 实际上是调用的 AbstractChannelHandlerContext.fireChannelRegistered:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}

这个方法的第一句是调用 findContextInbound 获取一个 Context, 那么它返回的 Context 究竟是什么呢? 咱们跟进代码中看一下:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

很显然, 这个代码会从 head 开始遍历 Pipeline 的双向链表, 而后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 想起来了没? 咱们在前面分析 ChannelInitializer 时, 花了大量的笔墨来分析了 inbound 和 outbound 属性, 你看如今这里就用上了. 回想一下, ChannelInitializer 实现了 ChannelInboudHandler, 所以它所对应的 ChannelHandlerContext 的 inbound 属性就是 true, 所以这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:

clipboard.png

当获取到 inbound 的 Context 后, 就调用它的 invokeChannelRegistered 方法:

private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

咱们已经强调过了, 每一个 ChannelHandler 都与一个 ChannelHandlerContext 关联, 咱们能够经过 ChannelHandlerContext 获取到对应的 ChannelHandler. 所以很显然了, 这里 handler() 返回的, 其实就是一开始咱们实例化的 ChannelInitializer 对象, 并接着调用了 ChannelInitializer.channelRegistered 方法. 看到这里, 读者朋友是否会以为有点眼熟呢? ChannelInitializer.channelRegistered 这个方法咱们在第一章的时候已经大量地接触了, 可是咱们并无深刻地分析这个方法的调用过程, 那么在这里读者朋友应该对它的调用有了更加深刻的了解了吧.

那么这个方法中又有什么玄机呢? 继续看代码:

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

initChannel 这个方法咱们很熟悉了吧, 它就是咱们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:

.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

所以当调用了这个方法后, 咱们自定义的 ChannelHandler 就插入到 Pipeline 了, 此时的 Pipeline 以下图所示:

clipboard.png

当添加了自定义的 ChannelHandler 后, 会删除 ChannelInitializer 这个 ChannelHandler, 即 "ctx.pipeline().remove(this)", 所以最后的 Pipeline 以下:

clipboard.png

好了, 到了这里, 咱们的 自定义 ChannelHandler 的添加过程 也分析的查很少了.

下一小节 Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (二)

本文由 yongshun 发表于我的博客, 采用 署名-相同方式共享 3.0 中国大陆许可协议.
Email: yongshun1228@gmail.com
本文标题为: Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)
本文连接为: http://www.javashuo.com/article/p-dzuxyzxk-hk.html

相关文章
相关标签/搜索