ChannelPipeline 和 ChannelHandler

ChannelHandler

Channel

Channel 概念与 java.nio.channel 概念一致, 用以链接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起链接, 主动关闭链接, 链路关闭, 获取通讯双方的网络地址等.java

Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).promise

数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).服务器

适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.网络

Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel, 再以此根据IO类型、数据传输类型、适用方类型实现. 类图能够一目了然, 以下图所示:异步

clipboard.png

Channel 状态

clipboard.png

channelRegistered 状态socket

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

从注释里面能够看到是在 Channel 绑定到 Eventloop 上面的时候调用的.ide

不论是 Server 仍是 Client, 绑定到 Eventloop 的时候, 最终都是调用 Abstract.initAndRegister() 这个方法上(Server是在 AbstractBootstrap.doBind() 的时候调用的, Client 是在 Bootstrap.doConnect() 的时候调用的).oop

initAndRegister() 方法定义以下:布局

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 把channel绑定到Eventloop对象上面去
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.性能

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 作实际的绑定动做。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实如今Abstract.doRegister()方法内
                doRegister();
                neverRegistered = false;
                registered = true;

                // 经过pipeline的传播机制,触发handlerAdded事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                // 经过pipeline的传播机制,触发channelRegistered事件
                pipeline.fireChannelRegistered();

                // 尚未绑定,因此这里的 isActive() 返回false.
                if (isActive()) {
                    if (firstRegistration) {
                        // 若是当前链路已经激活,则调用channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

从上面的代码也能够看出, 在调用完 pipeline.fireChannelRegistered() 以后, 紧接着会调用 isActive() 判断当前链路是否激活, 若是激活了则会调用 pipeline.fireChannelActive() 方法.

这个时候, 对于 Client 和 Server 都尚未激活, 因此, 这个时候不论是 Server 仍是 Client 都不会调用 pipeline.fireChanenlActive() 方法.

channelActive 状态

从启动器的 bind() 接口开始, 往下调用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

doBind 方法又会调用 doBind0() 方法, 在 doBind0() 方法中会经过 EventLoop 去执行 channelbind()任务.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 调用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0() 方法往下会调用到 pipeline.bind(localAddress, promise); 方法, 经过 pipeline 的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法, 这个方法主要作两件事情:

  • 调用 doBind(): 调用底层JDK API进行 Channel 的端口绑定.
  • 调用 pipeline.fireChannelActive().
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在绑定成功前为 false
    boolean wasActive = isActive();
    try {
        // 调用doBind()调用JDK底层API进行端口绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 完成绑定以后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

也就是说当有新客户端链接的时候, 会变成活动状态.

channelInactive 状态

fireChannelnactive() 方法在两个地方会被调用: Channel.close()Channel.disconnect().

在调用前会先确认状态是从 Active--->Inactive.

channelUnregistered 状态

fireChannelUnregistered() 方法是在 ChannelEventloop 中解除注册的时候被调用的. Channel.close() 的时候被触发执行.

ChannelHandler 的生命周期

handlerAdded(): 添加到 ChannelPipeline 时调用.
handlerRemoved(): 从 ChannelPipeline 中移除时调用.
exceptionCaught(): 处理过程当中在 ChannelPipeline 中有错误产生时调用.

处理 I/O 事件或截获 I/O 操做, 并将其转发到 ChannelPipeline 中的下一个处理程序. ChannelHandler 自己不提供许多方法, 但一般必须实现其子类型之一:

  • ChannelInboundHandler: 处理入站数据以及各类状态变化.
  • ChannelOutboundHandler: 处理出站数据而且容许拦截全部的操做.

ChannelInboundHandler 接口

channelRegistered(): 当 Channel 已经注册到它的 EventLoop 而且可以处理 I/O 时被调用.
channelUnregistered(): 当 Channel 从他的 EventLoop 注销而且没法处理任何 I/O 时被调用.
channelActive(): 当 Channel 处于活动状态时被调用.
channelInactive(): 当 Channel 离开活动状态而且再也不链接远程节点时被调用.
channelRead(): 当从 Channel 读取数据时被调用.
channelReadComplete(): 当 Channel 上的一个读操做完成时被调用. 当全部可读字节都从 Channel 中读取以后, 将会调用该回调方法.

ChannelOutboundHandler 接口

出站操做和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel ChannelPipeline 以及 ChannelHandlerContext 调用.

ChannelOutboundHandler 的一个强大的功能是能够按需推迟操做或事件, 这使得能够经过一些复杂的方法来处理请求. 例如, 若是到远程节点的写入被暂停, 那么你能够推迟刷新操做并在稍后继续.

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise): 当请求将 Channel 链接到远程节点时被调用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise): 当请求将 Channel 从远程节点断开时被调用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise): 当请求将 Channel 从它的 EventLoop 注销时被调用.
read(ChannelHandlerContext ctx): 当请求从 Channel 读取更多的数据时被调用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise): 当请求经过 Channel 将数据写到远程节点时被调用.
flush(ChannelHandlerContext ctx): 当请求从 Channel 将入队数据冲刷到远程节点时被调用.

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中异步I/O操做的结果, 在 netty 中全部的 I/O 操做都是异步的, I/O 的调用会直接返回, 能够经过 ChannelFuture 来获取 I/O 操做的结果或者状态信息.

当 I/O 操做开始时, 将建立一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 由于 I/O 操做尚未完成.

若是 I/O 操做已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障缘由.

请注意, 即便失败和取消属于已完成状态.

ChannelPromiseChannelFuture 的一个子接口, 其定义了一些可写的方法, 如 setSuccess()setFailure(), 从而使 ChannelFuture 不可变.

clipboard.png

优先使用addListener(GenericFutureListener),而非await()

当作了一个 I/O 操做并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener) 的方式来得到通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它会把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 而后 I/O 线程会在 I/O 操做相关的 future 完成的时候通知监听器.

ChannelFutureListener 会利于最佳的性能和资源的利用, 由于它一点阻塞都没有. 并且不会形成死锁.

ChannelHandler 适配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 这两个适配器类分别提供了
ChannelInboundHandlerChannelOutboundHandler 的基本实现, 它们继承了共同的父接口
ChannelHandler 的方法, 扩展了抽象类 ChannelHandlerAdapter.

clipboard.png

ChannelHandlerAdapter 还提供了实用方法 isSharable().

若是其对应的实现被标注为 Sharable, 那么这个方法将返回 true, 表示它能够被添加到多个 ChannelPipeline 中.

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法, 从而将事件转发到了 ChannelPipeline 中的 ChannelHandler 中.

ChannelPipeline 接口

ChannelPipeline 将多个 ChannelHandler 连接在一块儿来让事件在其中传播处理. 一个 ChannelPipeline 中可能不只有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.

每个新建立的 Channel 都将会分配一个新的 ChannelPipeline, 不能附加另外一个 ChannelPipeline, 也不能分离当前的.

经过调用 ChannelHandlerContext 实现, 它将被转发给同一个超类型的下一个 ChannelHandler.

clipboard.png

从事件途径 ChannelPilpeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的仍是出站的.

而 Netty 老是将 ChannelPilpeline 的入站口 (左侧) 做为头部, 将出站口 (右侧) 做为尾端.

当经过调用 ChannelPilpeline.add*() 方法将入站处理器和出站处理器混合添加到 ChannelPilpeline 以后, 每个 ChannelHandler 从头部到尾端的顺序就是咱们添加的顺序.

ChannelPilpeline 传播事件时, 它会测试 ChannelPilpeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配. 若是不匹配, ChannelPilpeline 将跳过该 ChannelHandler 并前进到下一个, 直到它找到和该事件指望的方向相匹配的为止.

修改 ChannelPipeline

这里指修改 ChannelPipeline 中的 ChannelHandler 的编排.

经过调用 ChannelPipeline 上的相关方法, ChannelHandler 能够添加, 删除或者替换其余的 ChannelHandler, 从而实时地修改 ChannelPipeline 的布局.

addFirst  // 将 ChannelHandler 插入第一个位置
addBefore // 在某个 ChannelHandler 以前添加一个
addAfter  // 在某个 ChannelHandler 以后添加一个
addLast   // 将 ChannelHandler 插入最后一个位置
remove    // 移除某个 ChannelHandler
replace   // 将某个 ChannelHandler 替换成指定 ChannelHandler

ChannelHandlerContext 接口

ChannelHandlerContext 表明了 ChanelHandlerChannelPipeline 之间的关联, 每当有 ChanelHandler 添加到 ChannelPipeline 中, 都会建立 ChannelHandlerContext.

ChannelHandlerContext 的主要功能是管理它所关联的 ChannelPipeline 和同一个 ChannelPipeline 中的其余 ChanelHandler 之间的交互.

ChannelHandlerContext 有不少的方法, 其中一些方法也存在于 ChannelChannelPipeline 上, 可是有一点重要的不一样.

若是调用 ChannelChannelPipeline 上的这些方法将沿着 ChannelPipeline 进行传播(从头或尾开始).
而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 而且只会传播给位于该 ChannelPipeline 中的下一个可以处理该事件的 ChannelHandler.

这样作能够减小 ChannelHandler 的调用开销.

使用 ChannelHandlerContext

clipboard.png

上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.

相关文章
相关标签/搜索