认真的 Netty 源码解析(二)

Channel 的 register 操做

通过前面的铺垫,咱们已经具有必定的基础了,咱们开始来把前面学到的内容揉在一块儿。这节,咱们会介绍 register 操做,这一步实际上是很是关键的,对于咱们源码分析很是重要。java

register

咱们从 EchoClient 中的 connect() 方法出发,或者 EchoServer 的 bind(port) 方法出发,都会走到 initAndRegister() 这个方法:promise

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1
channel = channelFactory.newChannel();
// 2 对于 Bootstrap 和 ServerBootstrap,这里面有些不同
init(channel);
} catch (Throwable t) {
...
}
// 3 咱们这里要说的是这行
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

initAndRegister() 这个方法咱们已经接触过两次了,前面介绍了 1️⃣ Channel 的实例化,实例化过程当中,会执行 Channel 内部 Unsafe 和 Pipeline 的实例化,以及在上面 2️⃣ init(channel) 方法中,会往 pipeline 中添加 handler(pipeline 此时是 head+channelnitializer+tail)。app

咱们这节终于要揭秘 ChannelInitializer 中的 initChannel 方法了~异步

如今,咱们继续往下走,看看 3️⃣ register 这一步:ide

ChannelFuture regFuture = config().group().register(channel);

咱们说了,register 这一步是很是关键的,它发生在 channel 实例化之后,你们回忆一下当前 channel 中的一些状况:oop

实例化了 JDK 底层的 Channel,设置了非阻塞,实例化了 Unsafe,实例化了 Pipeline,同时往 pipeline 中添加了 head、tail 以及一个 ChannelInitializer 实例。源码分析

上面的 config().group() 方法会返回前面实例化的 NioEventLoopGroup 的实例,而后调用其 register(channel) 方法:this

// MultithreadEventLoopGroup线程

@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next() 方法很简单,就是选择线程池中的一个线程(还记得 chooserFactory 吗),也就是选择一个 NioEventLoop 实例,这个时候咱们就进入到 NioEventLoop 了。rest

NioEventLoop 的 register(channel) 方法实如今它的父类 SingleThreadEventLoop 中:

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

上面的代码实例化了一个 Promise,将当前 channel 带了进去:

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// promise 关联了 channel,channel 持有 Unsafe 实例,register 操做就封装在 Unsafe 中
promise.channel().unsafe().register(this, promise);
return promise;
}

拿到 channel 中关联的 Unsafe 实例,而后调用它的 register 方法:

咱们说过,Unsafe 专门用来封装底层实现,固然这里也没那么“底层”

// AbstractChannel#AbstractUnsafe

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 将这个 eventLoop 实例设置给这个 channel,今后这个 channel 就是有 eventLoop 的了
// 我以为这一步其实挺关键的,由于后续该 channel 中的全部异步操做,都要提交给这个 eventLoop 来执行
AbstractChannel.this.eventLoop = eventLoop;

// 若是发起 register 动做的线程就是 eventLoop 实例中的线程,那么直接调用 register0(promise)
// 对于咱们来讲,它不会进入到这个分支,
// 之因此有这个分支,是由于咱们是能够 unregister,而后再 register 的,后面再仔细看
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 不然,提交任务给 eventLoop,eventLoop 中的线程会负责调用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}

到这里,咱们要明白,NioEventLoop 中是尚未实例化 Thread 实例的。

这几步涉及到了好几个类:NioEventLoop、Promise、Channel、Unsafe 等,你们要仔细理清楚它们的关系。

对于咱们前面过来的 register 操做,其实提交到 eventLoop 之后,就直接返回 promise 实例了,剩下的register0 是异步操做,它由 NioEventLoop 实例来完成。

咱们这边先不继续往里分析 register0(promise) 方法,先把前面欠下的 NioEventLoop 中的线程介绍清楚,而后再回来介绍这个 register0 方法。

Channel 实例一旦 register 到了 NioEventLoopGroup 实例中的某个 NioEventLoop 实例,那么后续该 Channel 的全部操做,都是由该 NioEventLoop 实例来完成的。

这个也很是简单,由于 Selector 实例是在 NioEventLoop 实例中的,Channel 实例一旦注册到某个 Selector 实例中,固然也只能在这个实例中处理 NIO 事件。

NioEventLoop 工做流程

前面,咱们在分析线程池的实例化的时候说过,NioEventLoop 中并无启动 Java 线程。这里咱们来仔细分析下在 register 过程当中调用的 eventLoop.execute(runnable) 这个方法,这个代码在父类 SingleThreadEventExecutor 中:

@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断添加任务的线程是否就是当前 EventLoop 中的线程
boolean inEventLoop = inEventLoop();

// 添加任务到以前介绍的 taskQueue 中,
// 若是 taskQueue 满了(默认大小 16),根据咱们以前说的,默认的策略是抛出异常
addTask(task);

if (!inEventLoop) {
// 若是不是 NioEventLoop 内部线程提交的 task,那么判断下线程是否已经启动,没有的话,就启动线程
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

原来启动 NioEventLoop 中的线程的方法在这里。

另外,上节咱们说的 register 操做进到了 taskQueue 中,因此它实际上是被归类到了非 IO 操做的范畴。

下面是 startThread 的源码,判断线程是否已经启动来决定是否要进行启动操做:

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}

咱们按照前面的思路,根据线程没有启动的状况,来看看 doStartThread() 方法:

private void doStartThread() {
assert thread == null;
// 这里的 executor 你们是否是有点熟悉的感受,它就是一开始咱们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例。它是每次来一个任务,建立一个线程的那种 executor。
// 一旦咱们调用它的 execute 方法,它就会建立一个新的线程,因此这里终于会建立 Thread 实例
executor.execute(new Runnable() {
@Override
public void run() {
// 看这里,将 “executor” 中建立的这个线程设置为 NioEventLoop 的线程!!!
thread = Thread.currentThread();

if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ... 咱们直接忽略掉这里的代码
}
}
});
}

上面线程启动之后,会执行 NioEventLoop 中的 run() 方法,这是一个很是重要的方法,这个方法确定是没那么容易结束的,必然是像 JDK 线程池的 Worker 那样,不断地循环获取新的任务的。它须要不断地作 select 操做和轮询 taskQueue 这个队列。

咱们先来简单地看一下它的源码,这里先不作深刻地介绍:

@Override
protected void run() {
// 代码嵌套在 for 循环中
for (;;) {
try {
// selectStrategy 终于要派上用场了
// 它有两个值,一个是 CONTINUE 一个是 SELECT
// 针对这块代码,咱们分析一下。
// 1. 若是 taskQueue 不为空,也就是 hasTasks() 返回 true,
// 那么执行一次 selectNow(),该方法不会阻塞
// 2. 若是 hasTasks() 返回 false,那么执行 SelectStrategy.SELECT 分支,
// 进行 select(...),这块是带阻塞的
// 这个很好理解,就是按照是否有任务在排队来决定是否能够进行阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 若是 !hasTasks(),那么进到这个 select 分支,这里 select 带阻塞的
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}


cancelledKeys = 0;
needsToSelectAgain = false;
// 默认地,ioRatio 的值是 50
final int ioRatio = this.ioRatio;

if (ioRatio == 100) {
// 若是 ioRatio 设置为 100,那么先执行 IO 操做,而后在 finally 块中执行 taskQueue 中的任务
try {
// 1. 执行 IO 操做。由于前面 select 之后,可能有些 channel 是须要处理的。
processSelectedKeys();
} finally {
// 2. 执行非 IO 任务,也就是 taskQueue 中的任务
runAllTasks();
}
} else {
// 若是 ioRatio 不是 100,那么根据 IO 操做耗时,限制非 IO 操做耗时
final long ioStartTime = System.nanoTime();
try {
// 执行 IO 操做
processSelectedKeys();
} finally {
// 根据 IO 操做消耗的时间,计算执行非 IO 操做(runAllTasks)能够用多少时间.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

上面这段代码是 NioEventLoop 的核心,这里介绍两点:

  1. 首先,会根据 hasTasks() 的结果来决定是执行 selectNow() 仍是 select(oldWakenUp),这个应该好理解。若是有任务正在等待,那么应该使用无阻塞的 selectNow(),若是没有任务在等待,那么就可使用带阻塞的 select 操做。

  2. ioRatio 控制 IO 操做所占的时间比重:

  • 若是设置为 100%,那么先执行 IO 操做,而后再执行任务队列中的任务。

  • 若是不是 100%,那么先执行 IO 操做,而后执行 taskQueue 中的任务,可是须要控制执行任务的总时间。也就是说,非 IO 操做能够占用的时间,经过 ioRatio 以及此次 IO 操做耗时计算得出。

咱们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别作什么事情就能够了。

回过神来,咱们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,因此这里会实例化 Thread 而且启动,而后进入到 NioEventLoop 中的 run 方法。

继续 register

咱们回到前面的 register0(promise) 方法,咱们知道,这个 register 任务进入到了 NioEventLoop 的 taskQueue 中,而后会启动 NioEventLoop 中的线程,该线程会轮询这个 taskQueue,而后执行这个 register 任务。

注意,此时执行该方法的是 eventLoop 中的线程:

// AbstractChannel

private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// *** 进行 JDK 底层的操做:Channel 注册到 Selector 上 ***
doRegister();

neverRegistered = false;
registered = true;
// 到这里,就算是 registered 了

// 这一步也很关键,由于这涉及到了 ChannelInitializer 的 init(channel)
// 咱们以前说过,init 方法会将 ChannelInitializer 内部添加的 handlers 添加到 pipeline 中
pipeline.invokeHandlerAddedIfNeeded();

// 设置当前 promise 的状态为 success
// 由于当前 register 方法是在 eventLoop 中的线程中执行的,须要通知提交 register 操做的线程
safeSetSuccess(promise);

// 当前的 register 操做已经成功,该事件应该被 pipeline 上
// 全部关心 register 事件的 handler 感知到,往 pipeline 中扔一个事件
pipeline.fireChannelRegistered();

// 这里 active 指的是 channel 已经打开
if (isActive()) {
// 若是该 channel 是第一次执行 register,那么 fire ChannelActive 事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 该 channel 以前已经 register 过了,
// 这里让该 channel 立马去监听通道中的 OP_READ 事件
beginRead();
}
}
} catch (Throwable t) {
...
}
}

咱们先说掉上面的 doRegister() 方法,而后再说 pipeline。

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 附 JDK 中 Channel 的 register 方法:
// public final SelectionKey register(Selector sel, int ops, Object att) {...}
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}

咱们能够看到,这里作了 JDK 底层的 register 操做,将 SocketChannel(或 ServerSocketChannel) 注册到 Selector 中,而且能够看到,这里的监听集合设置为了 0,也就是什么都不监听。

固然,也就意味着,后续必定有某个地方会须要修改这个 selectionKey 的监听集合,否则啥都干不了

咱们重点来讲说 pipeline 操做,咱们以前在介绍 NioSocketChannel 的 pipeline 的时候介绍到,咱们的 pipeline 如今长这个样子:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

如今,咱们将看到这里会把 LoggingHandler 和 EchoClientHandler 添加到 pipeline。

咱们继续看代码,register 成功之后,执行了如下操做:

pipeline.invokeHandlerAddedIfNeeded();

你们能够跟踪一下,这一步会执行到 pipeline 中 ChannelInitializer 实例的 handlerAdded 方法,在这里会执行它的 init(context) 方法:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}

而后咱们看下 initChannel(ctx),这里终于来了咱们以前介绍过的 init(channel) 方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
// 1. 将把咱们自定义的 handlers 添加到 pipeline 中
initChannel((C) ctx.channel());
} catch (Throwable cause) {
...
} finally {
// 2. 将 ChannelInitializer 实例从 pipeline 中删除
remove(ctx);
}
return true;
}
return false;
}

咱们前面也说过,ChannelInitializer 的 init(channel) 被执行之后,那么其内部添加的 handlers 会进入到 pipeline 中,而后上面的 finally 块中将 ChannelInitializer 的实例从 pipeline 中删除,那么此时 pipeline 就算创建起来了,以下图:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

其实这里还有个问题,若是咱们在 ChannelInitializer 中添加的是一个 ChannelInitializer 实例呢?你们能够考虑下这个状况。

pipeline 创建了之后,而后咱们继续往下走,会执行到这一句:

pipeline.fireChannelRegistered();

咱们只要摸清楚了 fireChannelRegistered() 方法,之后碰到其余像 fireChannelActive()、fireXxx() 等就知道怎么回事了,它们都是相似的。咱们来看看这句代码会发生什么:

// DefaultChannelPipeline

@Override
public final ChannelPipeline fireChannelRegistered() {
// 注意这里的传参是 head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

也就是说,咱们往 pipeline 中扔了一个 channelRegistered 事件,这里的 register 属于 Inbound 事件,pipeline 接下来要作的就是执行 pipeline 中的 Inbound 类型的 handlers 中的 channelRegistered() 方法。

从上面的代码,咱们能够看出,往 pipeline 中扔出 channelRegistered 事件之后,第一个处理的 handler 是 head。

接下来,咱们仍是跟着代码走,此时咱们来到了 pipeline 的第一个节点 head 的处理中:

// AbstractChannelHandlerContext

// next 此时是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

EventExecutor executor = next.executor();
// 执行 head 的 invokeChannelRegistered()
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}

也就是说,这里会先执行 head.invokeChannelRegistered() 方法,并且是放到 NioEventLoop 中的 taskQueue 中执行的:

// AbstractChannelHandlerContext

private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// handler() 方法此时会返回 head
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}

咱们去看 head 的 channelRegistered 方法:

// HeadContext

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 1. 这一步是 head 对于 channelRegistered 事件的处理。没有咱们要关心的
invokeHandlerAddedIfNeeded();
// 2. 向后传播 Inbound 事件
ctx.fireChannelRegistered();
}

而后 head 会执行 fireChannelRegister() 方法:

// AbstractChannelHandlerContext

@Override
public ChannelHandlerContext fireChannelRegistered() {
// 这里很关键
// findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
invokeChannelRegistered(findContextInbound());
return this;
}

注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了之后,向后传播给下一个 handler。

它们两个的方法名字是同样的,可是来自于不一样的类。

findContextInbound() 将找到下一个 Inbound 类型的 handler,而后又是重复上面的几个方法。

我以为上面这块代码不必太纠结,总之就是从 head 中开始,依次往下寻找全部 Inbound handler,执行其 channelRegistered(ctx) 操做。

说了这么多,咱们的 register 操做算是真正完成了。

下面,咱们回到 initAndRegister 这个方法:

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}

// 咱们上面说完了这行
ChannelFuture regFuture = config().group().register(channel);

// 若是在 register 的过程当中,发生了错误
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// 源码中说得很清楚,若是到这里,说明后续能够进行 connect() 或 bind() 了,由于两种状况:
// 1. 若是 register 动做是在 eventLoop 中发起的,那么到这里的时候,register 必定已经完成
// 2. 若是 register 任务已经提交到 eventLoop 中,也就是进到了 eventLoop 中的 taskQueue 中,
// 因为后续的 connect 或 bind 也会进入到同一个 eventLoop 的 queue 中,因此必定是会先 register 成功,才会执行 connect 或 bind
return regFuture;
}

咱们要知道,不论是服务端的 NioServerSocketChannel 仍是客户端的 NioSocketChannel,在 bind 或 connect 时,都会先进入 initAndRegister 这个方法,因此咱们上面说的那些,对于二者都是通用的。

你们要记住,register 操做是很是重要的,要知道这一步大概作了哪些事情,register 操做之后,将进入到 bind 或 connect 操做中。

connect 过程和 bind 过程分析

上面咱们介绍的 register 操做很是关键,它创建起来了不少的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工做的起点。

这一节,咱们来讲说 register 以后的 connect 操做和 bind 操做。这节很是简单。

connect 过程分析

对于客户端 NioSocketChannel 来讲,前面 register 完成之后,就要开始 connect 了,这一步将链接到服务端。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 这里完成了 register 操做
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

// 这里咱们不去纠结 register 操做是否 isDone()
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看这里
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
....
}
}

这里你们本身一路点进去,我就不浪费篇幅了。最后,咱们会来到 AbstractChannel 的 connect 方法:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}

咱们看到,connect 操做是交给 pipeline 来执行的。进入 pipeline 中,咱们会发现,connect 这种 Outbound 类型的操做,是从 pipeline 的 tail 开始的:

前面咱们介绍的 register 操做是 Inbound 的,是从 head 开始的

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}

接下来就是 pipeline 的操做了,从 tail 开始,执行 pipeline 上的 Outbound 类型的 handlers 的 connect(...) 方法,那么真正的底层的 connect 的操做发生在哪里呢?还记得咱们的 pipeline 的图吗?

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

从 tail 开始往前找 out 类型的 handlers,每通过一个 handler,都执行里面的 connect() 方法,最后会到 head 中,由于 head 也是 Outbound 类型的,咱们须要的 connect 操做就在 head 中,它会负责调用 unsafe 中提供的 connect 方法:

// HeadContext
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}

接下来,咱们来看一看 connect 在 unsafe 类中所谓的底层操做:

// AbstractNioChannel.AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
......

boolean wasActive = isActive();
// 你们本身点进去看 doConnect 方法
// 这一步会作 JDK 底层的 SocketChannel connect,而后设置 interestOps 为 SelectionKey.OP_CONNECT
// 返回值表明是否已经链接成功
if (doConnect(remoteAddress, localAddress)) {
// 处理链接成功的状况
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// 下面这块代码,在处理链接超时的状况,代码很简单
// 这里用到了 NioEventLoop 的定时任务的功能,这个咱们以前一直都没有介绍过,由于我以为也不过重要
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

若是上面的 doConnect 方法返回 false,那么后续是怎么处理的呢?

在上一节介绍的 register 操做中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听。

而在上面的 doConnect 方法中,咱们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT。

剩下的就是 NioEventLoop 的事情了,还记得 NioEventLoop 的 run() 方法吗?也就是说这里的 connect 成功之后,这个 TCP 链接就创建起来了,后续的操做会在 NioEventLoop.run() 方法中被 processSelectedKeys() 方法处理掉。

bind 过程分析

说完 connect 过程,咱们再来简单看下 bind 过程:

private ChannelFuture doBind(final SocketAddress localAddress) {
// **前面说的 initAndRegister**
final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// register 动做已经完成,那么执行 bind 操做
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
......
}
}

而后一直往里看,会看到,bind 操做也是要由 pipeline 来完成的:

// AbstractChannel

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

bind 操做和 connect 同样,都是 Outbound 类型的,因此都是 tail 开始:

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

最后的 bind 操做又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

感兴趣的读者本身去看一下 unsafe 中的 bind 方法,很是简单,bind 操做也不是什么异步方法,咱们就介绍到这里了。

本节很是简单,就是想和你们介绍下 Netty 中各类操做的套路。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相关文章
相关标签/搜索