Channel 概念与 java.nio.channel 概念一致, 用以链接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起链接, 主动关闭链接, 链路关闭, 获取通讯双方的网络地址等.java
Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).promise
数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).服务器
适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.网络
Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel
, 再以此根据IO类型、数据传输类型、适用方类型实现. 类图能够一目了然, 以下图所示:异步
channelRegistered 状态socket
/** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */ void channelRegistered(ChannelHandlerContext ctx) throws Exception;
从注释里面能够看到是在 Channel
绑定到 Eventloop
上面的时候调用的.ide
不论是 Server 仍是 Client, 绑定到 Eventloop
的时候, 最终都是调用 Abstract.initAndRegister()
这个方法上(Server是在 AbstractBootstrap.doBind()
的时候调用的, Client 是在 Bootstrap.doConnect()
的时候调用的).oop
initAndRegister()
方法定义以下:布局
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // 把channel绑定到Eventloop对象上面去 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0()
方法上.性能
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 作实际的绑定动做。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实如今Abstract.doRegister()方法内 doRegister(); neverRegistered = false; registered = true; // 经过pipeline的传播机制,触发handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // 经过pipeline的传播机制,触发channelRegistered事件 pipeline.fireChannelRegistered(); // 尚未绑定,因此这里的 isActive() 返回false. if (isActive()) { if (firstRegistration) { // 若是当前链路已经激活,则调用channelActive()方法 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
从上面的代码也能够看出, 在调用完 pipeline.fireChannelRegistered()
以后, 紧接着会调用 isActive()
判断当前链路是否激活, 若是激活了则会调用 pipeline.fireChannelActive()
方法.
这个时候, 对于 Client 和 Server 都尚未激活, 因此, 这个时候不论是 Server 仍是 Client 都不会调用 pipeline.fireChanenlActive()
方法.
channelActive 状态
从启动器的 bind()
接口开始, 往下调用 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) { // 初始化及注册 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 调用 doBind0 doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
doBind
方法又会调用 doBind0()
方法, 在 doBind0()
方法中会经过 EventLoop
去执行 channel
的 bind()
任务.
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 调用channel.bind接口 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0()
方法往下会调用到 pipeline.bind(localAddress, promise)
; 方法, 经过 pipeline
的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind()
方法, 这个方法主要作两件事情:
doBind()
: 调用底层JDK API进行 Channel 的端口绑定.pipeline.fireChannelActive()
.@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { .... // wasActive 在绑定成功前为 false boolean wasActive = isActive(); try { // 调用doBind()调用JDK底层API进行端口绑定 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 完成绑定以后,isActive() 返回true if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { // 触发channelActive事件 pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
也就是说当有新客户端链接的时候, 会变成活动状态.
channelInactive 状态
fireChannelnactive()
方法在两个地方会被调用: Channel.close()
和 Channel.disconnect()
.
在调用前会先确认状态是从 Active
--->Inactive
.
channelUnregistered 状态
fireChannelUnregistered()
方法是在 Channel
从 Eventloop
中解除注册的时候被调用的. Channel.close()
的时候被触发执行.
handlerAdded()
: 添加到 ChannelPipeline
时调用.handlerRemoved()
: 从 ChannelPipeline
中移除时调用.exceptionCaught()
: 处理过程当中在 ChannelPipeline
中有错误产生时调用.
处理 I/O 事件或截获 I/O 操做, 并将其转发到 ChannelPipeline
中的下一个处理程序. ChannelHandler
自己不提供许多方法, 但一般必须实现其子类型之一:
ChannelInboundHandler
: 处理入站数据以及各类状态变化.ChannelOutboundHandler
: 处理出站数据而且容许拦截全部的操做.channelRegistered()
: 当 Channel 已经注册到它的 EventLoop 而且可以处理 I/O 时被调用.channelUnregistered()
: 当 Channel 从他的 EventLoop 注销而且没法处理任何 I/O 时被调用.channelActive()
: 当 Channel 处于活动状态时被调用.channelInactive()
: 当 Channel 离开活动状态而且再也不链接远程节点时被调用.channelRead()
: 当从 Channel 读取数据时被调用.channelReadComplete()
: 当 Channel 上的一个读操做完成时被调用. 当全部可读字节都从 Channel 中读取以后, 将会调用该回调方法.
出站操做和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel
ChannelPipeline
以及 ChannelHandlerContext
调用.
ChannelOutboundHandler
的一个强大的功能是能够按需推迟操做或事件, 这使得能够经过一些复杂的方法来处理请求. 例如, 若是到远程节点的写入被暂停, 那么你能够推迟刷新操做并在稍后继续.
connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
: 当请求将 Channel 链接到远程节点时被调用.disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
: 当请求将 Channel 从远程节点断开时被调用.deregister(ChannelHandlerContext ctx, ChannelPromise promise)
: 当请求将 Channel 从它的 EventLoop 注销时被调用.read(ChannelHandlerContext ctx)
: 当请求从 Channel 读取更多的数据时被调用.write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
: 当请求经过 Channel 将数据写到远程节点时被调用.flush(ChannelHandlerContext ctx)
: 当请求从 Channel 将入队数据冲刷到远程节点时被调用.
ChannelPromise 和 ChannelFuture
ChannelFuture
表示 Channel
中异步I/O操做的结果, 在 netty 中全部的 I/O 操做都是异步的, I/O 的调用会直接返回, 能够经过 ChannelFuture
来获取 I/O 操做的结果或者状态信息.
当 I/O 操做开始时, 将建立一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 由于 I/O 操做尚未完成.
若是 I/O 操做已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障缘由.
请注意, 即便失败和取消属于已完成状态.
ChannelPromise
是 ChannelFuture
的一个子接口, 其定义了一些可写的方法, 如 setSuccess()
和 setFailure()
, 从而使 ChannelFuture
不可变.
优先使用addListener(GenericFutureListener),而非await()
当作了一个 I/O 操做并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener)
的方式来得到通知, 而非 await()
addListener(GenericFutureListener)
是非阻塞的. 它会把特定的 ChannelFutureListener
添加到 ChannelFuture
中, 而后 I/O 线程会在 I/O 操做相关的 future 完成的时候通知监听器.
ChannelFutureListener
会利于最佳的性能和资源的利用, 由于它一点阻塞都没有. 并且不会形成死锁.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
这两个适配器类分别提供了 ChannelInboundHandler
和 ChannelOutboundHandler
的基本实现, 它们继承了共同的父接口 ChannelHandler
的方法, 扩展了抽象类 ChannelHandlerAdapter
.
ChannelHandlerAdapter
还提供了实用方法 isSharable()
.
若是其对应的实现被标注为 Sharable
, 那么这个方法将返回 true
, 表示它能够被添加到多个 ChannelPipeline
中.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
中所提供的方法体调用了其相关联的 ChannelHandlerContext
上的等效方法, 从而将事件转发到了 ChannelPipeline
中的 ChannelHandler
中.
ChannelPipeline
将多个 ChannelHandler
连接在一块儿来让事件在其中传播处理. 一个 ChannelPipeline
中可能不只有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.
每个新建立的 Channel
都将会分配一个新的 ChannelPipeline
, 不能附加另外一个 ChannelPipeline
, 也不能分离当前的.
经过调用 ChannelHandlerContext
实现, 它将被转发给同一个超类型的下一个 ChannelHandler
.
从事件途径 ChannelPilpeline
的角度来看, ChannelPipeline
的头部和尾端取决于该事件是入站的仍是出站的.
而 Netty 老是将 ChannelPilpeline
的入站口 (左侧) 做为头部, 将出站口 (右侧) 做为尾端.
当经过调用 ChannelPilpeline.add*()
方法将入站处理器和出站处理器混合添加到 ChannelPilpeline
以后, 每个 ChannelHandler
从头部到尾端的顺序就是咱们添加的顺序.
在 ChannelPilpeline
传播事件时, 它会测试 ChannelPilpeline
中的下一个 ChannelHandler
的类型是否和事件的运动方向相匹配. 若是不匹配, ChannelPilpeline
将跳过该 ChannelHandler
并前进到下一个, 直到它找到和该事件指望的方向相匹配的为止.
修改 ChannelPipeline
这里指修改 ChannelPipeline
中的 ChannelHandler
的编排.
经过调用 ChannelPipeline
上的相关方法, ChannelHandler
能够添加, 删除或者替换其余的 ChannelHandler
, 从而实时地修改 ChannelPipeline
的布局.
addFirst // 将 ChannelHandler 插入第一个位置 addBefore // 在某个 ChannelHandler 以前添加一个 addAfter // 在某个 ChannelHandler 以后添加一个 addLast // 将 ChannelHandler 插入最后一个位置 remove // 移除某个 ChannelHandler replace // 将某个 ChannelHandler 替换成指定 ChannelHandler
ChannelHandlerContext
表明了 ChanelHandler
和 ChannelPipeline
之间的关联, 每当有 ChanelHandler
添加到 ChannelPipeline
中, 都会建立 ChannelHandlerContext
.
ChannelHandlerContext
的主要功能是管理它所关联的 ChannelPipeline
和同一个 ChannelPipeline
中的其余 ChanelHandler
之间的交互.
ChannelHandlerContext
有不少的方法, 其中一些方法也存在于 Channel
和 ChannelPipeline
上, 可是有一点重要的不一样.
若是调用 Channel
和 ChannelPipeline
上的这些方法将沿着 ChannelPipeline
进行传播(从头或尾开始).
而调用位于 ChannelHandlerContext
上的相同方法, 则将从当前所关联的 ChannelHandler
开始, 而且只会传播给位于该 ChannelPipeline
中的下一个可以处理该事件的 ChannelHandler
.
这样作能够减小 ChannelHandler
的调用开销.
上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.