认真的 Netty 源码解析(一)

本文又是一篇源码分析文章,其实除了 Doug Lea 的并发包源码,我是真不太爱写源码分析。由于要花很是多的时间,并且不少地方须要反复组织语言。java

本文将介绍 Netty,Java 平台上使用最普遍的 NIO 包,它是对 JDK 中的 NIO 实现的一层封装,让咱们能更方便地开发 NIO 程序。其实,Netty 不只仅是 NIO 吧,可是,基本上你们都冲着 NIO 来的。spring

我的感受国内对于 Netty 的吹嘘是有点过了,主要是不少人靠它吃饭,要么是搞培训的,要么是出书的,巴不得把 Netty 吹上天去,这种现象也是挺很差的,反而使得初学者以为 Netty 是什么高深的技术同样。编程

Netty 的源码不是很简单,由于它比较多,并且各个类之间的关系错综复杂,不少人说它的源码很好,这点我以为通常,真要说好代码,还得 Doug Lea 的并发源码比较漂亮,一行行都是精华,不过它们是不一样类型的,也没什么好对比的。Netty 源码好就好在它的接口使用比较灵活,每每接口好用的框架,源码都不会太简单。数组

本文将立足于源码分析,因此读者须要先掌握 NIO 的基础知识,至少我以前写的 《Java NIO:Buffer、Channel 和 Selector》 中介绍的基础知识要清楚,若是读者已经对 Netty 有些了解,或者使用过,那就更好了。promise

  • 本文只介绍 TCP 相关的内容,Netty 对于其余协议的支持,不在本文的讨论范围内。安全

  • 和并发包的源码分析不同,我不可能一行一行源码说,因此有些异常分支是会直接略过,除非我以为须要介绍。服务器

  • Netty 源码一直在更新,各版本之间有些差别,我是按照 2018-09-06 的最新版本 4.1.25.Final 来进行介绍的。并发

建议初学者在看完本文之后,能够去翻翻《Netty In Action》,网上也能够找到中文文字版的。app

准备

学习源码,一开始确定是准备环境。框架

我喜欢用 maven,也喜欢 Spring Boot,因此我通常先到 https://start.spring.io/ 准备一个最简单的脚手架。

10 秒搞定脚手架,而后就是导入到 Intellij 中,若是用新版本的 Spring Boot,可能还须要等待下载依赖,期间打开 https://mvnrepository.com/ 搜索立刻要用到的 maven 依赖。

Netty 分为好些模块,有 netty-handler、netty-buffer、netty-transport、netty-common 等等,也有一个 netty-all,它包含了全部的模块。

既然咱们是源码分析,那么天然是用一个最简单的。netty-all 不是最好的选择,netty-example 才是:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.25.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.25.Final</version>
</dependency>

它不只能够解决咱们的依赖,并且 example 里面的示例很是适合咱们学习使用。

Echo 例子

Netty 做为 NIO 的库,天然既能够做为服务端接受请求,也能够做为客户端发起请求。使用 Netty 开发客户端或服务端都是很是简单的,Netty 作了很好的封装,咱们一般只要开发一个或多个 handler 用来处理咱们的自定义逻辑就能够了。

下面,咱们来看一个常常会见到的例子,它叫 Echo,也就是回声,客户端传过去什么值,服务端原样返回什么值。

打开 netty-example 的源码,把 echo 包下面的代码复制出来玩一玩。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

左边是服务端代码,右边是客户端代码。

上面的代码基本就是模板代码,每次使用都是这一个套路,惟一须要咱们开发的部分是 handler(…) 和 childHandler(…) 方法中指定的各个 handler,如 EchoServerHandler 和 EchoClientHandler,固然 Netty 源码也给咱们提供了不少的 handler,好比上面的 LoggingHandler,它就是 Netty 源码中为咱们提供的,须要的时候直接拿过来用就行了。

咱们先来看一下上述代码中涉及到的一些内容:

  • ServerBootstrap 类用于建立服务端实例,Bootstrap 用于建立客户端实例。

  • 两个 EventLoopGroup:bossGroup 和 workerGroup,它们涉及的是 Netty 的线程模型,能够看到服务端有两个 group,而客户端只有一个,它们就是 Netty 中的线程池。

  • Netty 中的 Channel,没有直接使用 Java 原生的 ServerSocketChannel 和 SocketChannel,而是包装了 NioServerSocketChannel 和 NioSocketChannel 与之对应。

    固然,也有对其余协议的支持,如支持 UDP 协议的 NioDatagramChannel,本文只关心 TCP 相关的。

  • 左边 handler(…) 方法指定了一个 handler(LoggingHandler),这个 handler 是给服务端收到新的请求的时候处理用的。右边 handler(...) 方法指定了客户端处理请求过程当中须要使用的 handlers。

    若是你想在 EchoServer 中也指定多个 handler,也能够像右边的 EchoClient 同样使用 ChannelInitializer

  • 左边 childHandler(…) 指定了 childHandler,这边的 handlers 是给新建立的链接用的,咱们知道服务端 ServerSocketChannel 在 accept 一个链接之后,须要建立 SocketChannel 的实例,childHandler(…) 中设置的 handler 就是用于处理新建立的 SocketChannel 的,而不是用来处理 ServerSocketChannel 实例的。

  • pipeline:handler 能够指定多个(须要上面的 ChannelInitializer 类辅助),它们会组成了一个 pipeline,它们其实就相似拦截器的概念,如今只要记住一点,每一个 NioSocketChannel 或 NioServerSocketChannel 实例内部都会有一个 pipeline 实例。pipeline 中还涉及到 handler 的执行顺序。

  • ChannelFuture:这个涉及到 Netty 中的异步编程,和 JDK 中的 Future 接口相似。

对于不了解 Netty 的读者,也不要有什么压力,我会一一介绍它们,本文主要面向新手,我以为比较难理解或比较重要的部分,会花比较大的篇幅来介绍清楚。

上面的源码中没有展现消息发送和消息接收的处理,此部分我会在介绍完上面的这些内容之后再进行介绍。

下面,将分块来介绍这些内容。鉴于读者对 NIO 或 Netty 的了解程度可能良莠不齐,为了照顾初学者,不少地方须要啰嗦一些,因此但愿读者一节一节往下看,对于本身熟悉的内容能够适当看快一些。

Netty 中的 Channel

这节咱们来看看 NioSocketChannel 是怎么和 JDK 底层的 SocketChannel 联系在一块儿的,它们是一对一的关系。NioServerSocketChannel 和 ServerSocketChannel 同理,也是一对一的关系。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

在 Bootstrap(客户端) 和 ServerBootstrap(服务端) 的启动过程当中都会调用 channel(…) 方法:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

下面,咱们来看 channel(…) 方法的源码:

// AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
// AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

咱们能够看到,这个方法只是设置了 channelFactory 为 ReflectiveChannelFactory 的一个实例,而后咱们看下这里的 ReflectiveChannelFactory 究竟是什么:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

newChannel() 方法是 ChannelFactory 接口中的惟一方法,工厂模式你们都很熟悉。咱们能够看到,ReflectiveChannelFactory#newChannel() 方法中使用了反射调用 Channel 的无参构造方法来建立 Channel,咱们只要知道,ChannelFactory 的 newChannel() 方法何时会被调用就能够了。

  • 对于 NioSocketChannel,因为它充当客户端的功能,它的建立时机在 connect(…) 的时候;

  • 对于 NioServerSocketChannel 来讲,它充当服务端功能,它的建立时机在绑定端口 bind(…) 的时候。

接下来,咱们来简单追踪下充当客户端的 Bootstrap 中 NioSocketChannel 的建立过程,看看 NioSocketChannel 是怎么和 JDK 中的 SocketChannel 关联在一块儿的:

// Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
// Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

而后再往里看,到这个方法:

public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
// validate 只是校验一下各个参数是否是正确设置了
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
// validate 只是校验一下各个参数是否是正确设置了
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}

继续:

// 再往里就到这里了
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 咱们要说的部分在这里
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
}
// 再往里就到这里了
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 咱们要说的部分在这里
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
}

而后,咱们看 initAndRegister() 方法:

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 前面咱们说过,这里会进行 Channel 的实例化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
...
return regFuture;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 前面咱们说过,这里会进行 Channel 的实例化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
...
return regFuture;
}

咱们找到了 channel = channelFactory.newChannel() 这行代码,根据前面说的,这里会调用相应 Channel 的无参构造方法。

而后咱们就能够去看 NioSocketChannel 的构造方法了:

public NioSocketChannel() {
// SelectorProvider 实例用于建立 JDK 的 SocketChannel 实例
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
// 看这里,newSocket(provider) 方法会建立 JDK 的 SocketChannel
this(newSocket(provider));
}
public NioSocketChannel() {
// SelectorProvider 实例用于建立 JDK 的 SocketChannel 实例
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
// 看这里,newSocket(provider) 方法会建立 JDK 的 SocketChannel
this(newSocket(provider));
}

咱们能够看到,在调用 newSocket(provider) 的时候,会建立 JDK NIO 的一个 SocketChannel 实例:

private static SocketChannel newSocket(SelectorProvider provider) {
try {
// 建立 SocketChannel 实例
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
// 建立 SocketChannel 实例
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}

NioServerSocketChannel 同理,也很是简单,从 ServerBootstrap#bind(...) 方法一路点进去就清楚了。

因此咱们知道了,NioSocketChannel 在实例化过程当中,会先实例化 JDK 底层的 SocketChannel,NioServerSocketChannel 也同样,会先实例化 ServerSocketChannel 实例:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

说到这里,咱们顺便再继续往里看一下 NioSocketChannel 的构造方法:

public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

刚才咱们看到这里,newSocket(provider) 建立了底层的 SocketChannel 实例,咱们继续往下看构造方法:

public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

上面有两行代码,第二行代码很简单,实例化了内部的 NioSocketChannelConfig 实例,它用于保存 channel 的配置信息,这里没有咱们如今须要关心的内容,直接跳过。

第一行调用父类构造器,除了设置属性外,还设置了 SocketChannel 的非阻塞模式:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 毫无疑问,客户端关心的是 OP_READ 事件,等待读取服务端返回数据
super(parent, ch, SelectionKey.OP_READ);
}

// 而后是到这里
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 咱们看到这里只是保存了 SelectionKey.OP_READ 这个信息,在后面的时候会用到
this.readInterestOp = readInterestOp;
try {
// ******设置 channel 的非阻塞模式******
ch.configureBlocking(false);
} catch (IOException e) {
......
}
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 毫无疑问,客户端关心的是 OP_READ 事件,等待读取服务端返回数据
super(parent, ch, SelectionKey.OP_READ);
}

// 而后是到这里
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 咱们看到这里只是保存了 SelectionKey.OP_READ 这个信息,在后面的时候会用到
this.readInterestOp = readInterestOp;
try {
// ******设置 channel 的非阻塞模式******
ch.configureBlocking(false);
} catch (IOException e) {
......
}
}

NioServerSocketChannel 的构造方法相似,也设置了非阻塞,而后设置服务端关心的 SelectionKey.OP_ACCEPT 事件:

public NioServerSocketChannel(ServerSocketChannel channel) {
// 对于服务端来讲,关心的是 SelectionKey.OP_ACCEPT 事件,等待客户端链接
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 对于服务端来讲,关心的是 SelectionKey.OP_ACCEPT 事件,等待客户端链接
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这节关于 Channel 的内容咱们先介绍这么多,主要就是实例化了 JDK 层的 SocketChannel 或 ServerSocketChannel,而后设置了非阻塞模式,咱们后面再继续深刻下去。

Netty 中的 Future、Promise

Netty 中很是多的异步调用,因此在介绍更多 NIO 相关的内容以前,咱们来看看它的异步接口是怎么使用的。

前面咱们在介绍 Echo 例子的时候,已经用过了 ChannelFuture 这个接口了:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

争取在看完本节后,读者能搞清楚上面的这几行划线部分是怎么走的。

关于 Future 接口,我想你们应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,经过它来获取提交的任务的执行状态和最终的执行结果,咱们最经常使用它的 isDone() 和 get() 方法。

下面是 JDK 中的 Future 接口 java.util.concurrent.Future:

public interface Future<V> {
// 取消该任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 阻塞获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 带超时参数的获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Future<V> {
// 取消该任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 阻塞获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 带超时参数的获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Netty 中的 Future 接口(同名)继承了 JDK 中的 Future 接口,而后添加了一些方法:

// io.netty.util.concurrent.Future

public interface Future<V> extends java.util.concurrent.Future<V> {

// 是否成功
boolean isSuccess();

// 是否可取消
boolean isCancellable();

// 若是任务执行失败,这个方法返回异常信息
Throwable cause();

// 添加 Listener 来进行回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 阻塞等待任务结束,若是任务失败,将“致使失败的异常”从新抛出来
Future<V> sync() throws InterruptedException;
// 不响应中断的 sync(),这个你们应该都很熟了
Future<V> syncUninterruptibly();

// 阻塞等待任务结束,和 sync() 功能是同样的,不过若是任务失败,它不会抛出执行过程当中的异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);

// 获取执行结果,不阻塞。咱们都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();

// 取消任务执行,若是取消成功,任务会由于 CancellationException 异常而致使失败
// 也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
// mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能中止该任务的执行),
// 彷佛 Netty 中 Future 接口的各个实现类,都没有使用这个参数
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
public interface Future<V> extends java.util.concurrent.Future<V> {

// 是否成功
boolean isSuccess();

// 是否可取消
boolean isCancellable();

// 若是任务执行失败,这个方法返回异常信息
Throwable cause();

// 添加 Listener 来进行回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 阻塞等待任务结束,若是任务失败,将“致使失败的异常”从新抛出来
Future<V> sync() throws InterruptedException;
// 不响应中断的 sync(),这个你们应该都很熟了
Future<V> syncUninterruptibly();

// 阻塞等待任务结束,和 sync() 功能是同样的,不过若是任务失败,它不会抛出执行过程当中的异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);

// 获取执行结果,不阻塞。咱们都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();

// 取消任务执行,若是取消成功,任务会由于 CancellationException 异常而致使失败
// 也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
// mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能中止该任务的执行),
// 彷佛 Netty 中 Future 接口的各个实现类,都没有使用这个参数
@Override
boolean cancel(boolean mayInterruptIfRunning);
}

看完上面的 Netty 的 Future 接口,咱们能够发现,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就能够了,那么咱们就不必定要主动调用 isDone() 来获取状态,或经过 get() 阻塞方法来获取值。

因此它其实有两种使用范式

顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,若是失败,从新将致使失败的异常抛出来。也就是说,若是使用 await(),任务抛出异常后,await() 方法会返回,可是不会抛出异常,而 sync() 方法返回的同时会抛出异常。

咱们也能够看到,Future 接口没有和 IO 操做关联在一块儿,仍是比较纯净的接口。

接下来,咱们来看 Future 接口的子接口 ChannelFuture,这个接口用得最多,它将和 IO 操做中的 Channel 关联在一块儿了,用于异步处理 Channel 中的事件。

public interface ChannelFuture extends Future<Void> {

// ChannelFuture 关联的 Channel
Channel channel();

// 覆写如下几个方法,使得它们返回值为 ChannelFuture 类型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();

// 用来标记该 future 是 void 的,
// 这样就不容许使用 addListener(...), sync(), await() 以及它们的几个重载方法
boolean isVoid();
}
public interface ChannelFuture extends Future<Void> {

// ChannelFuture 关联的 Channel
Channel channel();

// 覆写如下几个方法,使得它们返回值为 ChannelFuture 类型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();

// 用来标记该 future 是 void 的,
// 这样就不容许使用 addListener(...), sync(), await() 以及它们的几个重载方法
boolean isVoid();
}

咱们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来,没有增长什么东西。还有个 isVoid() 方法算是不那么重要的存在吧。其余几个都是方法覆写,为了让返回值类型变为 ChannelFuture,而不是原来的 Future。

这里有点跳,咱们来介绍下 Promise 接口,它和 ChannelFuture 接口无关,而是和前面的 Future 接口相关,Promise 这个接口很是重要。

Promise 接口和 ChannelFuture 同样,也继承了 Netty 的 Future 接口,而后加了一些 Promise 的内容:

public interface Promise<V> extends Future<V> {

// 标记该 future 成功及设置其执行结果,而且会通知全部的 listeners。
// 若是该操做失败,将抛出异常(失败指的是该 future 已经有告终果了,成功的结果,或者失败的结果)
Promise<V> setSuccess(V result);

// 和 setSuccess 方法同样,只不过若是失败,它不抛异常,返回 false
boolean trySuccess(V result);

// 标记该 future 失败,及其失败缘由。
// 若是失败,将抛出异常(失败指的是已经有告终果了)
Promise<V> setFailure(Throwable cause);

// 标记该 future 失败,及其失败缘由。
// 若是已经有结果,返回 false,不抛出异常
boolean tryFailure(Throwable cause);

// 标记该 future 不能够被取消
boolean setUncancellable();

// 这里和 ChannelFuture 同样,对这几个方法进行覆写,目的是为了返回 Promise 类型的实例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
public interface Promise<V> extends Future<V> {

// 标记该 future 成功及设置其执行结果,而且会通知全部的 listeners。
// 若是该操做失败,将抛出异常(失败指的是该 future 已经有告终果了,成功的结果,或者失败的结果)
Promise<V> setSuccess(V result);

// 和 setSuccess 方法同样,只不过若是失败,它不抛异常,返回 false
boolean trySuccess(V result);

// 标记该 future 失败,及其失败缘由。
// 若是失败,将抛出异常(失败指的是已经有告终果了)
Promise<V> setFailure(Throwable cause);

// 标记该 future 失败,及其失败缘由。
// 若是已经有结果,返回 false,不抛出异常
boolean tryFailure(Throwable cause);

// 标记该 future 不能够被取消
boolean setUncancellable();

// 这里和 ChannelFuture 同样,对这几个方法进行覆写,目的是为了返回 Promise 类型的实例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}

可能有些读者对 Promise 的概念不是很熟悉,这里简单说两句。

我以为只要明白一点,Promise 实例内部是一个任务,任务的执行每每是异步的,一般是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未来会被某个执行任务的线程在执行完成之后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(固然,回调的具体内容不必定要由执行任务的线程本身来执行,它能够建立新的线程来执行,也能够将回调任务提交到某个线程池来执行)。并且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

因此这里就有两种编程方式,一种是用 await(),等 await() 方法返回后,获得 promise 的执行结果,而后处理它;另外一种就是提供 Listener 实例,咱们不太关心任务何时会执行完,只要它执行完了之后会去执行 listener 中的处理方法就行。

接下来,咱们再来看下 ChannelPromise,它继承了前面介绍的 ChannelFuture 和 Promise 接口。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

ChannelPromise 接口在 Netty 中使用得比较多,由于它综合了 ChannelFuture 和 Promise 两个接口:

/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

// 覆写 ChannelFuture 中的 channel() 方法,其实这个方法一点没变
@Override
Channel channel();

// 下面几个方法是覆写 Promise 中的接口,为了返回值类型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);

// 到这里你们应该都熟悉了,下面几个方法的覆写也是为了获得 ChannelPromise 类型的实例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();

/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 咱们忽略这个方法吧。
ChannelPromise unvoid();
}
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

// 覆写 ChannelFuture 中的 channel() 方法,其实这个方法一点没变
@Override
Channel channel();

// 下面几个方法是覆写 Promise 中的接口,为了返回值类型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);

// 到这里你们应该都熟悉了,下面几个方法的覆写也是为了获得 ChannelPromise 类型的实例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();

/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 咱们忽略这个方法吧。
ChannelPromise unvoid();
}

咱们能够看到,它综合了 ChannelFuture 和 Promise 中的方法,只不过经过覆写将返回值都变为 ChannelPromise 了而已,没有增长什么新的功能。

小结一下,咱们上面介绍了几个接口,Future 以及它的子接口 ChannelFuture 和 Promise,而后是 ChannelPromise 接口同时继承了 ChannelFuture 和 Promise。

我把这几个接口的主要方法列在一块儿,这样你们看得清晰些:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

接下来,咱们须要来一个实现类,这样才能比较直观地看出它们是怎么使用的,由于上面的这些都是接口定义,具体还得看实现类是怎么工做的。

下面,咱们来介绍下 DefaultPromise 这个实现类,这个类很经常使用,它的源码也不短,咱们先介绍几个关键的内容,而后介绍一个示例使用。

首先,咱们看下它有哪些属性:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 保存执行结果
private volatile Object result;
// 执行任务的线程池,promise 持有 executor 的引用,这个其实有点奇怪了
// 由于“任务”其实不必知道本身在哪里被执行的
private final EventExecutor executor;
// 监听者,回调函数,任务结束后(正常或异常结束)执行
private Object listeners;

// 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)
private short waiters;

// 是否正在唤醒等待线程,用于防止重复执行唤醒,否则会重复执行 listeners 的回调方法
private boolean notifyingListeners;
......
}
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 保存执行结果
private volatile Object result;
// 执行任务的线程池,promise 持有 executor 的引用,这个其实有点奇怪了
// 由于“任务”其实不必知道本身在哪里被执行的
private final EventExecutor executor;
// 监听者,回调函数,任务结束后(正常或异常结束)执行
private Object listeners;

// 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)
private short waiters;

// 是否正在唤醒等待线程,用于防止重复执行唤醒,否则会重复执行 listeners 的回调方法
private boolean notifyingListeners;
......
}

能够看出,此类实现了 Promise,可是没有实现 ChannelFuture,因此它和 Channel 联系不起来。

别急,咱们后面会碰到另外一个类 DefaultChannelPromise 的使用,这个类是综合了 ChannelFuture 和 Promise 的,可是它的实现其实大部分都是继承自这里的 DefaultPromise 类的。

说完上面的属性之后,你们能够看下 setSuccess(V result) 、trySuccess(V result) 和 setFailure(Throwable cause) 、 tryFailure(Throwable cause) 这几个方法:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

看出 setSuccess(result) 和 trySuccess(result) 的区别了吗?

上面几个方法都很是简单,先设置好值,而后执行监听者们的回调方法。notifyListeners() 方法感兴趣的读者也能够看一看,不过它还涉及到 Netty 线程池的一些内容,咱们尚未介绍到线程池,这里就不展开了。上面的代码,在 setSuccess0 或 setFailure0 方法中都会唤醒阻塞在 sync() 或 await() 的线程

另外,就是能够看下 sync() 和 await() 的区别,其余的我以为随便看看就行了。

@Override
public Promise<V> sync() throws InterruptedException {
await();
// 若是任务是失败的,从新抛出相应的异常
rethrowIfFailed();
return this;
}
@Override
public Promise<V> sync() throws InterruptedException {
await();
// 若是任务是失败的,从新抛出相应的异常
rethrowIfFailed();
return this;
}

接下来,咱们来写个实例代码吧:

 public static void main(String[] args) {

// 构造线程池
EventExecutor executor = new DefaultEventExecutor();

// 建立 DefaultPromise 实例
Promise promise = new DefaultPromise(executor);

// 下面给这个 promise 添加两个 listener
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任务结束,结果:" + future.get());
} else {
System.out.println("任务失败,异常:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任务结束,balabala...");
}
});

// 提交任务到线程池,五秒后执行结束,设置执行 promise 的结果
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
// 设置 promise 的结果
// promise.setFailure(new RuntimeException());
promise.setSuccess(123456);
}
});

// main 线程阻塞等待执行结果
try {
promise.sync();
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {

// 构造线程池
EventExecutor executor = new DefaultEventExecutor();

// 建立 DefaultPromise 实例
Promise promise = new DefaultPromise(executor);

// 下面给这个 promise 添加两个 listener
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任务结束,结果:" + future.get());
} else {
System.out.println("任务失败,异常:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任务结束,balabala...");
}
});

// 提交任务到线程池,五秒后执行结束,设置执行 promise 的结果
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
// 设置 promise 的结果
// promise.setFailure(new RuntimeException());
promise.setSuccess(123456);
}
});

// main 线程阻塞等待执行结果
try {
promise.sync();
} catch (InterruptedException e) {
}
}

运行代码,两个 listener 将在 5 秒后将输出:

任务结束,结果:123456
任务结束,balabala...
任务结束,结果:123456
任务结束,balabala...

读者这里能够试一下 sync() 和 await() 的区别,在任务中调用 promise.setFailure(new RuntimeException()) 试试看。

上面的代码中,你们可能会对线程池 executor 和 promise 之间的关系感到有点迷惑。读者应该也要清楚,具体的任务不必定就要在这个 executor 中被执行。任务结束之后,须要调用 promise.setSuccess(result) 做为通知。

一般来讲,promise 表明的 future 是不须要和线程池搅在一块儿的,future 只关心任务是否结束以及任务的执行结果,至因而哪一个线程或哪一个线程池执行的任务,future 实际上是不关心的。

不过 Netty 毕竟不是要建立一个通用的线程池实现,而是和它要处理的 IO 息息相关的,因此咱们只不过要理解它就行了。

这节就说这么多吧,咱们回过头来再看一下这张图,看看你们是否是看懂了这节内容:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

咱们就说说上图左边的部分吧,虽然咱们还不知道 bind() 操做中具体会作什么工做,可是咱们应该能够猜出一二。

显然,main 线程调用 b.bind(port) 这个方法会返回一个 ChannelFuture,bind() 是一个异步方法,当某个执行线程执行了真正的绑定操做后,那个执行线程必定会标记这个 future 为成功(咱们假定 bind 会成功),而后这里的 sync() 方法(main 线程)就会返回了。

若是 bind(port) 失败,咱们知道,sync() 方法会将异常抛出来,而后就会执行到 finally 块了。

一旦绑定端口 bind 成功,进入下面一行,f.channel() 方法会返回该 future 关联的 channel。

channel.closeFuture() 也会返回一个 ChannelFuture,而后调用了 sync() 方法,这个 sync() 方法返回的条件是:有其余的线程关闭了 NioServerSocketChannel,每每是由于须要停掉服务了,而后那个线程会设置 future 的状态( setSuccess(result) 或 setFailure(cause) ),这个 sync() 方法才会返回。

这节就到这里,但愿你们对 Netty 中的异步编程有些了解,后续碰到源码的时候能知道是怎么使用的了。

ChannelPipeline,和 Inbound、Outbound

我想不少读者应该或多或少都有 Netty 中 pipeline 的概念。前面咱们说了,使用 Netty 的时候,咱们一般就只要写一些自定义的 handler 就能够了,咱们定义的这些 handler 会组成一个 pipeline,用于处理 IO 事件,这个和咱们平时接触的 Filter 或 Interceptor 表达的差很少是一个意思。

每一个 Channel 内部都有一个 pipeline,pipeline 由多个 handler 组成,handler 之间的顺序是很重要的,由于 IO 事件将按照顺序顺次通过 pipeline 上的 handler,这样每一个 handler 能够专一于作一点点小事,由多个 handler 组合来完成一些复杂的逻辑。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

从图中,咱们知道这是一个双向链表。

首先,咱们看两个重要的概念:Inbound 和 Outbound。在 Netty 中,IO 事件被分为 Inbound 事件和 Outbound 事件。

Outbound 的 out 指的是 出去,有哪些 IO 事件属于此类呢?好比 connect、write、flush 这些 IO 操做是往外部方向进行的,它们就属于 Outbound 事件。

其余的,诸如 accept、read 这种就属于 Inbound 事件。

好比客户端在发起请求的时候,须要 1️⃣connect 到服务器,而后 2️⃣write 数据传到服务器,再而后 3️⃣read 服务器返回的数据,前面的 connect 和 write 就是 out 事件,后面的 read 就是 in 事件。

好比不少初学者看不懂下面的这段代码,这段代码用于服务端的 childHandler 中:

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());

初学者确定都纳闷,觉得这个顺序写错了,应该是先 decode 客户端过来的数据,而后用 BizHandler 处理业务逻辑,最后再 encode 数据而后返回给客户端,因此添加的顺序应该是 1 -> 3 -> 2 才对。

其实这里的三个 handler 是分组的,分为 Inbound(1 和 3) 和 Outbound(2):

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());
  • 客户端链接进来的时候,读取(read)客户端请求数据的操做是 Inbound 的,因此会先使用 1,而后是 3 对处理进行处理;

  • 处理完数据后,返回给客户端数据的 write 操做是 Outbound 的,此时使用的是 2。

因此虽然添加顺序有点怪,可是执行顺序实际上是按照 1 -> 3 -> 2 进行的。

若是咱们在上面的基础上,加上下面的第四行,这是一个 OutboundHandler:

4. pipeline.addLast(new OutboundHandlerA());4. pipeline.addLast(new OutboundHandlerA());

那么执行顺序是否是就是 1 -> 3 -> 2 -> 4 呢?答案是:不是的。

对于 Inbound 操做,按照添加顺序执行每一个 Inbound 类型的 handler;而对于 Outbound 操做,是反着来的,从后往前,顺次执行 Outbound 类型的 handler。

因此,上面的顺序应该是先 1 后 3,它们是 Inbound 的,而后是 4,最后才是 2,它们两个是 Outbound 的。说实话,我真不喜欢这种组织方式。

到这里,我想你们应该都知道 Inbound 和 Outbound 了吧?下面咱们来介绍它们的接口使用。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

定义处理 Inbound 事件的 handler 须要实现 ChannelInboundHandler,定义处理 Outbound 事件的 handler 须要实现 ChannelOutboundHandler。最下面的三个类,是 Netty 提供的适配器,特别的,若是咱们但愿定义一个 handler 能同时处理 Inbound 和 Outbound 事件,能够经过继承中间的 ChannelDuplexHandler 的方式,好比 LoggingHandler 这种既能够用来处理 Inbound 也能够用来处理 Outbound 事件的 handler。

有了 Inbound 和 Outbound 的概念之后,咱们来开始介绍 Pipeline 的源码。

咱们说过,一个 Channel 关联一个 pipeline,NioSocketChannel 和 NioServerSocketChannel 在执行构造方法的时候,都会走到它们的父类 AbstractChannel 的构造方法中:

protected AbstractChannel(Channel parent) {
this.parent = parent;
// 给每一个 channel 分配一个惟一 id
id = newId();
// 每一个 channel 内部须要一个 Unsafe 的实例
unsafe = newUnsafe();
// 每一个 channel 内部都会建立一个 pipeline
pipeline = newChannelPipeline();
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 给每一个 channel 分配一个惟一 id
id = newId();
// 每一个 channel 内部须要一个 Unsafe 的实例
unsafe = newUnsafe();
// 每一个 channel 内部都会建立一个 pipeline
pipeline = newChannelPipeline();
}

上面的三行代码中,id 比较不重要,Netty 中的 Unsafe 实例其实挺重要的,这里简单介绍一下。

在 JDK 的源码中,sun.misc.Unsafe 类提供了一些底层操做的能力,它设计出来是给 JDK 中的源码使用的,好比 AQS、ConcurrentHashMap 等,咱们在以前的并发包的源码分析中也看到了不少它们使用 Unsafe 的场景,这个 Unsafe 类不是给咱们的代码使用的,是给 JDK 源码使用的(须要的话,咱们也是能够获取它的实例的)。

Unsafe 类的构造方法是 private 的,可是它提供了 getUnsafe() 这个静态方法:

Unsafe unsafe = Unsafe.getUnsafe();Unsafe unsafe = Unsafe.getUnsafe();

你们能够试一下,上面这行代码编译没有问题,可是执行的时候会抛 java.lang.SecurityException 异常,由于它就不是给咱们的代码用的。

可是若是你就是想获取 Unsafe 的实例,能够经过下面这个代码获取到:

Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

Netty 中的 Unsafe 也是一样的意思,它封装了 Netty 中会使用到的 JDK 提供的 NIO 接口,好比将 channel 注册到 selector 上,好比 bind 操做,好比 connect 操做等,这些操做都是稍微偏底层一些。Netty 一样也是不但愿咱们的业务代码使用 Unsafe 的实例,它是提供给 Netty 中的源码使用的。

不过,对于咱们源码分析来讲,咱们仍是会有不少时候须要分析 Unsafe 中的源码的

关于 Unsafe,咱们后面用到了再说,这里只要知道,它封装了大部分须要访问 JDK 的 NIO 接口的操做就行了。这里咱们继续将焦点放在实例化 pipeline 上:

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

这里开始调用 DefaultChannelPipeline 的构造方法,并把当前 channel 的引用传入:

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

这里实例化了 tail 和 head 这两个 handler。tail 实现了 ChannelInboundHandler 接口,而 head 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 两个接口,而且最后两行代码将 tail 和 head 链接起来:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

注意,在不一样的版本中,源码也略有差别,head 不必定是 in + out,你们知道这点就行了。

还有,从上面的 head 和 tail 咱们也能够看到,其实 pipeline 中的每一个元素是 ChannelHandlerContext 的实例,而不是 ChannelHandler 的实例,context 包装了一下 handler,可是,后面咱们都会用 handler 来描述一个 pipeline 上的节点,而不是使用 context,但愿读者知道这一点。

这里只是构造了 pipeline,而且添加了两个固定的 handler 到其中(head + tail),还不涉及到自定义的 handler 代码执行。咱们回过头来看下面这段代码:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

咱们说过 childHandler 中指定的 handler 不是给 NioServerSocketChannel 使用的,是给 NioSocketChannel 使用的,因此这里咱们不看它。

这里调用 handler(…) 方法指定了一个 LoggingHandler 的实例,而后咱们再进去下面的 bind(…) 方法中看看这个 LoggingHandler 实例是怎么进入到咱们以前构造的 pipeline 内的。

顺着 bind() 一直往前走,bind() -> doBind() -> initAndRegister():

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 构造 channel 实例,同时会构造 pipeline 实例,
// 如今 pipeline 中有 head 和 tail 两个 handler 了
channel = channelFactory.newChannel();
// 2. 看这里
init(channel);
} catch (Throwable t) {
......
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 构造 channel 实例,同时会构造 pipeline 实例,
// 如今 pipeline 中有 head 和 tail 两个 handler 了
channel = channelFactory.newChannel();
// 2. 看这里
init(channel);
} catch (Throwable t) {
......
}

上面的两行代码,第一行实现了构造 channel 和 channel 内部的 pipeline,咱们来看第二行 init 代码:

// ServerBootstrap:

@Override
void init(Channel channel) throws Exception {
......
// 拿到刚刚建立的 channel 内部的 pipeline 实例
ChannelPipeline p = channel.pipeline();
...
// 开始往 pipeline 中添加一个 handler,这个 handler 是 ChannelInitializer 的实例
p.addLast(new ChannelInitializer<Channel>() {

// 咱们之后会看到,下面这个 initChannel 方法什么时候会被调用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 这个方法返回咱们最开始指定的 LoggingHandler 实例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}

// 先不用管这里的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个 handler 到 pipeline 中:ServerBootstrapAcceptor
// 从名字能够看到,这个 handler 的目的是用于接收客户端请求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
@Override
void init(Channel channel) throws Exception {
......
// 拿到刚刚建立的 channel 内部的 pipeline 实例
ChannelPipeline p = channel.pipeline();
...
// 开始往 pipeline 中添加一个 handler,这个 handler 是 ChannelInitializer 的实例
p.addLast(new ChannelInitializer<Channel>() {

// 咱们之后会看到,下面这个 initChannel 方法什么时候会被调用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 这个方法返回咱们最开始指定的 LoggingHandler 实例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}

// 先不用管这里的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个 handler 到 pipeline 中:ServerBootstrapAcceptor
// 从名字能够看到,这个 handler 的目的是用于接收客户端请求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

这里涉及到 pipeline 中的辅助类 ChannelInitializer,咱们看到,它自己是一个 handler(Inbound 类型),可是它的做用和普通 handler 有点不同,它纯碎是用来辅助将其余的 handler 加入到 pipeline 中的。

你们能够稍微看一下 ChannelInitializer 的 initChannel 方法,有个简单的认识就好,此时的 pipeline 应该是这样的:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

ChannelInitializer 的 initChannel(channel) 方法被调用的时候,会往 pipeline 中添加咱们最开始指定的 LoggingHandler 和添加一个 ServerBootstrapAcceptor。可是咱们如今还不知道这个 initChannel 方法什么时候会被调用。

上面咱们说的是做为服务端的 NioServerSocketChannel 的 pipeline,NioSocketChannel 也是差很少的,咱们能够看一下 Bootstrap 类的 init(channel) 方法:

void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

它和服务端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不同,它只须要将 EchoClient 类中的 ChannelInitializer 实例加进来就能够了,它的 ChannelInitializer 中添加了两个 handler,LoggingHandler 和 EchoClientHandler:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

很显然,咱们须要的是像 LoggingHandler 和 EchoClientHandler 这样的 handler,可是,它们如今还不在 pipeline 中,那么它们何时会真正进入到 pipeline 中呢?之后咱们再揭晓。

还有,为何 Server 端咱们指定的是一个 handler 实例,而 Client 指定的是一个 ChannelInitializer 实例?其实它们是能够随意搭配使用的,你甚至能够在 ChannelInitializer 实例中添加 ChannelInitializer 的实例。

很是抱歉,这里又要断了,下面要先介绍线程池了,你们要记住 pipeline 如今的样子,head + channelInitializer + tail。

本节没有介绍 handler 的向后传播,就是一个 handler 处理完了之后,怎么传递给下一个 handler 来处理?好比咱们熟悉的 JavaEE 中的 Filter 是采用在一个 Filter 实例中调用 chain.doFilter(request, response) 来传递给下一个 Filter 这种方式的。

咱们用下面这张图结束本节。下图展现了传播的方法,但我实际上是更想让你们看一下,哪些事件是 Inbound 类型的,哪些是 Outbound 类型的:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

Outbound 类型的几个事件你们应该比较好认,注意 bind 也是 Outbound 类型的。

Netty 中的线程池 EventLoopGroup

接下来,咱们来分析 Netty 中的线程池。Netty 中的线程池比较很差理解,由于它的类比较多,并且它们之间的关系错综复杂。看下图,感觉下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

这张图我按照继承关系整理而来,你们仔细看一下就会发现,涉及到的类确实挺多的。本节来给你们理理清楚这部份内容。

首先,咱们说的 Netty 的线程池,指的就是 NioEventLoopGroup 的实例;线程池中的单个线程,指的是右边 NioEventLoop 的实例。

咱们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始咱们老是先实例化 NioEventLoopGroup:

// EchoClient 代码最开始:
EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代码最开始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// EchoClient 代码最开始:
EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代码最开始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

下面,咱们就从 NioEventLoopGroup 的源码开始进行分析。

咱们打开 NioEventLoopGroup 的源码,能够看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,咱们采用无参构造函数,或仅仅设置线程数量就能够了,其余的参数采用默认值。

好比上面的代码中,咱们只在实例化 bossGroup 的时候指定了参数,表明该线程池须要一个线程。

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

...

// 参数最全的构造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类的构造方法
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

...

// 参数最全的构造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类的构造方法
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

咱们来稍微看一下构造方法中的各个参数:

  • nThreads:这个最简单,就是线程池中的线程数,也就是 NioEventLoop 的实例数量。

  • executor:咱们知道,咱们自己就是要构造一个线程池(Executor),为何这里传一个 executor 实例呢?它其实不是给线程池用的,而是给 NioEventLoop 用的,之后再说。

  • chooserFactory:当咱们提交一个任务到线程池的时候,线程池须要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。

  • selectorProvider:这个简单,咱们须要经过它来实例化 JDK 的 Selector,能够看到每一个线程池都持有一个 selectorProvider 实例。

  • selectStrategyFactory:这个涉及到的是线程池中线程的工做流程,在介绍 NioEventLoop 的时候会说。

  • rejectedExecutionHandler:这个也是线程池的好朋友了,用于处理线程池中没有可用的线程来执行任务的状况。在 Netty 中稍微有一点点不同,这个是给 NioEventLoop 实例用的,之后咱们再详细介绍。

这里介绍这些参数是但愿你们有个印象而已,你们发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。

下面,咱们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup() {
this(0);
}

而后一步步走下去,到这个构造方法:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {

super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {

super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

你们本身要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:

  • selectorProvider = SelectorProvider.provider()

    这个没什么好说的,调用了 JDK 提供的方法

  • selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE

    这个涉及到的是线程在作 select 操做和执行任务过程当中的策略选择问题,在介绍 NioEventLoop 的时候会用到。

  • rejectedExecutionHandler = RejectedExecutionHandlers.reject()

    你们进去看一下 reject() 方法,也就是说,Netty 选择的默认拒绝策略是:抛出异常

跟着源码走,咱们会来到父类 MultithreadEventLoopGroup 的构造方法中:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

这里咱们发现,若是采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 CPU 核心数 *2。你们能够看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。

咱们继续往下走:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

到这一步的时候,new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor。

咱们如今还不知道这个 executor 怎么用。这里咱们先看下它的源码:

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
// 为每一个任务新建一个线程
threadFactory.newThread(command).start();
}
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
// 为每一个任务新建一个线程
threadFactory.newThread(command).start();
}
}

Executor 做为线程池的最顶层接口, 咱们知道,它只有一个 execute(runnable) 方法,从上面咱们能够看到,实现类 ThreadPerTaskExecutor 的逻辑就是每来一个任务,新建一个线程。

咱们先记住这个,前面也说了,它是给 NioEventLoop 用的,不是给 NioEventLoopGroup 用的。

上一步设置完了 executor,咱们继续往下看:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。

ChooserFactory 的逻辑比较简单,咱们看下 DefaultEventExecutorChooserFactory 的实现:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

这里设置的策略也很简单:

一、若是线程池的线程数量是 2^n,采用下面的方式会高效一些:

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}

二、若是不是,用取模的方式:

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

走了这么久,咱们终于到了一个干实事的构造方法中了:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// executor 若是是 null,作一次和前面同样的默认设置。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 这里的 children 数组很是重要,它就是线程池中的线程数组,这么说不太严谨,可是就大概这个意思
children = new EventExecutor[nThreads];

// 下面这个 for 循环将实例化 children 数组中的每个元素
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 实例化!!!!!!
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 若是有一个 child 实例化失败,那么 success 就会为 false,而后进入下面的失败处理逻辑
if (!success) {
// 把已经成功实例化的“线程” shutdown,shutdown 是异步操做
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

// 等待这些线程成功 shutdown
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 把中断状态设置回去,交给关心的线程来处理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// ================================================
// === 到这里,就是表明上面的实例化全部线程已经成功结束 ===
// ================================================

// 经过以前设置的 chooserFactory 来实例化 Chooser,把线程池数组传进去,
// 这就没必要再说了吧,实现线程选择策略
chooser = chooserFactory.newChooser(children);

// 设置一个 Listener 用来监听该线程池的 termination 事件
// 下面的代码逻辑是:给池中每个线程都设置这个 listener,当监听到全部线程都 terminate 之后,这个线程池就算真正的 terminate 了。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

// 设置 readonlyChildren,它是只读集合,之后用到再说
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// executor 若是是 null,作一次和前面同样的默认设置。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 这里的 children 数组很是重要,它就是线程池中的线程数组,这么说不太严谨,可是就大概这个意思
children = new EventExecutor[nThreads];

// 下面这个 for 循环将实例化 children 数组中的每个元素
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 实例化!!!!!!
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 若是有一个 child 实例化失败,那么 success 就会为 false,而后进入下面的失败处理逻辑
if (!success) {
// 把已经成功实例化的“线程” shutdown,shutdown 是异步操做
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

// 等待这些线程成功 shutdown
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 把中断状态设置回去,交给关心的线程来处理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// ================================================
// === 到这里,就是表明上面的实例化全部线程已经成功结束 ===
// ================================================

// 经过以前设置的 chooserFactory 来实例化 Chooser,把线程池数组传进去,
// 这就没必要再说了吧,实现线程选择策略
chooser = chooserFactory.newChooser(children);

// 设置一个 Listener 用来监听该线程池的 termination 事件
// 下面的代码逻辑是:给池中每个线程都设置这个 listener,当监听到全部线程都 terminate 之后,这个线程池就算真正的 terminate 了。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

// 设置 readonlyChildren,它是只读集合,之后用到再说
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

上面的代码很是简单吧,没有什么须要特别说的,接下来,咱们来看看 newChild() 这个方法,这个方法很是重要,它将建立线程池中的线程。

我上面已经用过不少次"线程"这个词了,它可不是 Thread 的意思,而是指池中的个体,后面咱们会看到每一个"线程"在何时会真正建立 Thread 实例。反正每一个 NioEventLoop 实例内部都会有一个本身的 Thread 实例,因此把这两个概念混在一块儿也无所谓吧。

newChild(…) 方法在 NioEventLoopGroup 中覆写了,上面说的"线程"其实就是 NioEventLoop:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

它调用了 NioEventLoop 的构造方法:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类构造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 开启 NIO 中最重要的组件:Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类构造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 开启 NIO 中最重要的组件:Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

咱们先粗略观察一下,而后再往下看:

  • 在 Netty 中,NioEventLoopGroup 表明线程池,NioEventLoop 就是其中的线程。

  • 线程池 NioEventLoopGroup 是池中的线程 NioEventLoop 的 parent,从上面的代码中的取名能够看出。

  • 每一个 NioEventLoop 都有本身的 Selector,上面的代码也反应了这一点,这和 Tomcat 中的 NIO 模型有点区别。

  • executor、selectStrategy 和 rejectedExecutionHandler 从 NioEventLoopGroup 中一路传到了 NioEventLoop 中。

这个时候,咱们来看一下 NioEventLoop 类的属性都有哪些,咱们先忽略它继承自父类的属性,单单看它本身的:

private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();

private final SelectStrategy selectStrategy;

private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();

private final SelectStrategy selectStrategy;

private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;

结合它的构造方法咱们来总结一下:

  • provider:它由 NioEventLoopGroup 传进来,前面咱们说了一个线程池有一个 selectorProvider,用于建立 Selector 实例

  • selector:虽然咱们还没看建立 selector 的代码,但咱们已经知道,在 Netty 中 Selector 是跟着线程池中的线程走的。也就是说,并不是一个线程池一个 Selector 实例,而是线程池中每个线程都有一个 Selector 实例。

  • selectStrategy:select 操做的策略,这个不急。

  • ioRatio:这是 IO 任务的执行时间比例,由于每一个线程既有 IO 任务执行,也有非 IO 任务须要执行,因此该参数为了保证有足够时间是给 IO 的。这里也不须要急着去理解什么 IO 任务、什么非 IO 任务。

而后咱们继续走它的构造方法,咱们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

// 咱们能够直接忽略这个东西,之后咱们也不会再介绍它
tailTasks = newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

// 咱们能够直接忽略这个东西,之后咱们也不会再介绍它
tailTasks = newTaskQueue(maxPendingTasks);
}

SingleThreadEventLoop 这个名字很诡异有没有?而后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// taskQueue,这个东西很重要,提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
// 这个 queue 的默认容量是 16
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// taskQueue,这个东西很重要,提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
// 这个 queue 的默认容量是 16
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 SingleThreadEventExecutor,它的名字告诉咱们,它是一个 Executor,是一个线程池,并且是 Single Thread 单线程的。

也就是说,线程池 NioEventLoopGroup 中的每个线程 NioEventLoop 也能够当作一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。

上面这个构造函数比较简单:

  • 设置了 parent,也就是以前建立的线程池 NioEventLoopGroup 实例

  • executor:它是咱们以前实例化的 ThreadPerTaskExecutor,咱们说过,这个东西在线程池中没有用,它是给 NioEventLoop 用的,立刻咱们就要看到它了。提早透露一下,它用来开启 NioEventLoop 中的线程(Thread 实例)。

  • taskQueue:这算是该构造方法中新的东西,它是任务队列。咱们前面说过,NioEventLoop 须要负责 IO 事件和非 IO 事件,一般它都在执行 selector 的 select 方法或者正在处理 selectedKeys,若是咱们要 submit 一个任务给它,任务就会被放到 taskQueue 中,等它来轮询。该队列是线程安全的 LinkedBlockingQueue,默认容量为 16。

  • rejectedExecutionHandler:taskQueue 的默认容量是 16,因此,若是 submit 的任务堆积了到了 16,再往里面提交任务会触发 rejectedExecutionHandler 的执行策略。

    还记得默认策略吗:抛出RejectedExecutionException 异常。

    在 NioEventLoopGroup 的默认构造中,它的实现是这样的:

     private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    throw new RejectedExecutionException();
    }
    };
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    throw new RejectedExecutionException();
    }
    };

而后,咱们再回到 NioEventLoop 的构造方法:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 咱们刚刚说完了这个
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 建立 selector 实例
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;

selectStrategy = strategy;
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 咱们刚刚说完了这个
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 建立 selector 实例
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;

selectStrategy = strategy;
}

能够看到,最重要的方法其实就是 openSelector() 方法,它将建立 NIO 中最重要的一个组件 Selector。在这个方法中,Netty 也作了一些优化,这部分咱们就不去分析它了。

到这里,咱们的线程池 NioEventLoopGroup 建立完成了,而且实例化了池中的全部 NioEventLoop 实例。

同时,你们应该已经看到,上面并无真正建立 NioEventLoop 中的线程(没有建立 Thread 实例)。

提早透露一下,建立线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?是咱们立刻要说的 channel 的 register 操做。

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相关文章
相关标签/搜索