扫描下方二维码或者微信搜索公众号
菜鸟飞呀飞
,便可关注微信公众号,阅读更多Spring源码分析
和Java并发编程
文章。java
当 netty 的服务端启动之后,就能够开始接收客户端的链接了。那么在 netty 中,服务端是如何来进行新链接的建立的呢?在开始进行源码阅读以前,能够先思考如下三个问题。编程
在上一篇文章Netty 源码分析系列之 NioEventLoop 的执行流程中,分析了 NioEventLoop 线程在启动后,会不停地去循环处理网络 IO 事件、普通任务和定时任务。在处理网络 IO 事件时,当轮询到 IO 事件类型为 OP_ACCEPT 时(以下代码所示),就表示有新客户端来链接服务端了,也就是检测到了新链接。这个时候,服务端 channel 就会进行新链接的读取。promise
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
复制代码
能够看到,当是 OP_ACCEPT 事件时,就会调用unsafe.read() 方法来进行新链接的接入。此时 unsafe 对象是 NioMessageUnsafe 类型的实例,为何呢?由于只有服务端 channel 才会对 OP_ACCEPT 事件感兴趣,而服务端 channel 中 unsafe 属性保存的是 NioMessageUnsafe 类型的实例。微信
read()方法的源码很长,但它主要干了两件事,第一:调用 doReadMessages()方法来读取链接;第二:将读取到的链接经过服务端 channel 中的 pipeline 来进行传播,最终执行每个 handler 中的 channelRead()方法。网络
服务端 channel 在监听到 OP_ACCEPT 事件后,会为新链接建立一个客户端 channel,后面数据的读写均是经过这个客户端 channel 来进行的。而这个客户端 channel 是经过 doReadMessages()方法来建立的,该方法是定义在 NioServerSocketChannel 中的,下面是其源码。数据结构
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 将原生的客户端channel包装成netty中的客户端channel:NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// 异常日志打印等...
}
return 0;
}
复制代码
在该方法中,首先会经过 javaChannel()获取到 JDK 原生的服务端 channel,即 ServerSocketChannel,这个原生的服务端 channel 是被保存在 NioServerSocketChannel 的ch属性中,在初始化 NioServerSocketChannel 时会对ch属性赋值(能够参考这篇文章:Netty 源码分析系列之服务端 Channel 初始化)。建立完 JDK 原生的服务端 channel 后,会经过 SocketUtils 这个工具类来建立一个 JDK 原生的客户端 channel,即 SocketChannel。SocketUtils 这个工具类的底层实现,实际上就是调用 JDK 原生的 API,即 ServerSocketChannel.accept()。多线程
在建立完原生的 SocketChannel 后,netty 须要将其包装成 netty 中定义的服务端 channel 类型,即:NioSocketChannel。如何包装的呢?经过 new 关键字调用 NioSocketChannel 的构造方法来进行包装。在构造方法中,作了不少初始化工做。跟踪源码,发现会调用到 AbstractNioChannel 类的以下构造方法。并发
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 此时的parent = NioServerSocketChannel,ch = SocketChannel(JDK原生的客户端channel),readInterestOp = OP_READ
super(parent);
// 保存channel和感兴趣的事件
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
// 异常处理...
}
}
复制代码
在该构造方法中,首先会保存原生的客户端 channel 和客户端 channel 感兴趣的事件,而后将客户端 channel 的阻塞模式设置为 false,表示不阻塞(在 NIO 网络编程中,这一步是必须的,不然启动会报错)。同时还会调用父类构造方法,父类就是 AbstractChannel。AbstractChannel 类的构造方法源码以下。app
protected AbstractChannel(Channel parent) {
// parent的值为NioServerSocketChannel
this.parent = parent;
id = newId();
// 对于客户端channel而言,建立的unsafe是NioSocketChannelUnsafe
unsafe = newUnsafe();
// DefaultChannelPipeline
pipeline = newChannelPipeline();
}
复制代码
在该构造方法中,对于客户端 channel 而言,parent 的值为 NioServerSocketChannel,也就是 netty 服务端启动时建立的服务端 channel。而后建立的 unsafe 是 NioSocketChannelUnsafe,最后会为客户端 channel 建立一个默认的 pipeline,此时 pipeline 的结构以下。(若是看过前几篇文章,可能会发现,服务端 channel 在建立时也会调用到该构造方法)异步
最终还会为 NioSocketChannel 建立一个 NioSocketChannelConfig 对象,这个对象是用来保存用户为客户端 channel 设置的一些 TCP 配置和属性,在建立这个 config 对象时,会将 TCP 的 TCP_NODELAY 参数设置为 true。TCP 在默认状况下,会将小的数据包积攒成大的数据包之后才发出去,而 netty 为了及时地 i 将较小的数据报发送出去,所以将 TCP_NODELAY 参数设置为 true,表示不延迟发送。
至此,新链接对应的客户端 channel 就建立完成了,后面网络数据的读写,都是基于这个 NioSocketChannel 来进行的。
当客户端的 channel 建立完成后,在 read()方法中,就会经过 pipeline.fireChannelRead(socketChannel)这一行代码,将客户端 channel 经过 pipeline 进行传播,依次执行 pipeline 中每个 handler 的 channelRead()方法。(注意,这儿的 pipeline 是服务端 channel 中保存的 pipeline,在建立客户端 channel 时,也会为每一个新建的客户端 channel 建立一个 pipeline,这里千万不要搞混了)
在服务端启动的时候,服务端 channel 中 pipeline 的结构图以下(详细解释能够参考这三篇文章: Netty 源码分析系列之服务端 Channel 初始化 、Netty 源码分析系列之服务端 Channel 注册和 Netty 源码分析系列之服务端 Channel 的端口绑定)。
该 pipeline 中,对于 head 和 tail 而言,它俩的 channelRead()方法没作什么实际意义的工做,直接是向下一个节点传播了,这里重要的是 ServerBootstrapAcceptor 节点的 channelRead()方法。该方法的源码以下。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 向客户端的channel中添加用户自定义的childHandler
child.pipeline().addLast(childHandler);
// 保存用户为客户端channel配置的属性
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将客户端channel注册到工做线程池,即从workerGroup中选择出一个NioEventloop,再将客户端channel绑定到NioEventLoop上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
复制代码
在该方法中,首先向客户端 channel 的 pipeline 中的节点中添加了一个 childHandler,这个 childHandler 是用户本身定义的,什么意思呢?以下图所示,用户经过 childHandler()方法自定义了一个 ChannelInitializer 类型的 childHandler,这个此时就会向客户端 channel 的 pipeline 中的节点中添加该 childHandler(这个地方很重要,后面会用到)。而后经过 setChannelOptions 保存用户为客户端 channel 配置的 TCP 参数和属性。
最重要的一步在 childGroup.register(child),这一行代码会将客户端 channel 注册到 workerGroup 线程池中的某一个 NioEventLoop 上。(在服务端端口绑定的过程当中,也是相似于调用 NioEventLoopGroup 的 register()方法,将服务端 channel 注册到 bossGroup 中的某一个 NioEventLoop 中)。
此时的 childGroup 是 workerGroup(Reactor 主从多线程线程模型中的从线程池),调用 register()方法时,会调用到以下方法。
public ChannelFuture register(Channel channel) {
// next()方法会从NioEventLoop中选择出一个NioEventLoop
return next().register(channel);
}
复制代码
next()方法会从 NioEventLoop 中选择出一个 NioEventLoop(关于 next()方法的详细介绍请参考: Netty 源码分析系列之 NioEventLoop 的建立与启动),因为 NioEventLoop 继承了 SingleThreadEventLoop,因此这儿最后调用的是 SingleThreadEventLoop 中的以下的 register()方法。
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
/** * 对于客户端channel而言 * promise是DefaultChannelPromise * promise.channel()获取到的是NioSocketChannel * promise.channel().unsafe()获得的是NioSocketChannelUnsafe * 因为NioSocketChannelUnsafe继承了AbstractUnsafe,因此当调用unsafe.register()时,会调用到AbstractUnsafe类的register()方法 */
// this为NioEventLoop
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
这里的 unsafe()获取到的是 NioSocketChannelUnsafe 对象,因为 NioSocketChannelUnsafe 继承了 AbstractUnsafe,因此当调用 unsafe.register()时,会调用到 AbstractUnsafe 类的 register()方法。该方法精简后的源码以下。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略部分代码....
// 对客户端channel而言,这一步是给NioSocketChannel的eventLoop属性赋值
AbstractChannel.this.eventLoop = eventLoop;
// 判断是同步执行register0(),仍是异步执行register0()
if (eventLoop.inEventLoop()) {
// 同步执行
register0(promise);
} else {
try {
// 提交到NioEventLoop线程中,异步执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 省略部分代码
}
}
}
复制代码
实际上,服务端 channel 注册到 NioEventLoop 上时,也是调用的到了该方法(能够参考这篇文章: Netty 源码分析系列之服务端 Channel 注册)。
对于客户端而言,在该方法中,经过以下一行代码,就将客户端 channel 与一个 NioEventLoop 进行了绑定,这就回答了文章开头的第三个问题。
AbstractChannel.this.eventLoop = eventLoop;
复制代码
接着会判断当前线程是否等于传入的 eventLoop 中保存的线程,这里确定不是。为何呢?由于当前线程是 bossGroup 线程组中的线程,而 eventLoop 是 workerGroup 线程组中的线程,因此这里会返回 false,那么就会异步执行 register0()方法。register0()方法的源码以下。
private void register0(ChannelPromise promise) {
try {
// 省略部分代码...
boolean firstRegistration = neverRegistered;
/** * 对于客户端的channel而言,doRegister()方法作的事情就是将服务端Channel注册到多路复用器上 */
doRegister();
neverRegistered = false;
registered = true;
//会执行handlerAdded方法
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//经过在pipeline传播来执行每一个ChannelHandler的channelRegistered()方法
pipeline.fireChannelRegistered();
// 若是客户端channel已经激活,就执行下面逻辑。
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 省略部分代码...
}
}
复制代码
在 register0()方法中,有三步重要的逻辑,第一:doRegister();第二:pipeline.invokeHandlerAddedIfNeeded();第三:pipeline.fireChannelRegistered()。下面分别来看看这三步都干了哪些事情。
doRegister()就是真正将客户端 channel 注册到多路复用器上的一步。doRegister()调用的是 AbstractNioChannel 类中的 doRegister()方法,删减后源码以下。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
// 异常处理......
}
}
}
复制代码
其中 javaChannel()获取的就是 JDK 中原生的 SocketChannel。
eventLoop().unwrappedSelector()
获取的是 JDK 中原生的多路复用器 Selector(底层的数据结构被替换了)。(EventLoop 中的 unwrappedSelector 属性是在建立 NioEventLoop 时,初始化的,底层的数据结构也是这个时候被替换的)
因此javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
这一行代码,实际上就是调用 JDK 原生 SocketChannel 的register(selector,ops,attr)
方法,而后将客户端 Channel 注册到了多路复用器 Selector 上。
注意这里在调 JDK 原生的 register()方法时,第三个参数传入的是 this,此时 this 表明的就是当前的 NioSocketChannel 对象。将 this 做为一个 attachment 保存到多路复用器 Selector 上,这样作的好处就是,后面能够经过多路复用器 Selector 获取到客户端的 channel。第二个参数传入的是 0,表示此时将客户端 channel 注册到多路复用器上,客户端 chennel 感兴趣的事件标识符是 0,即此时对任何事件都不感兴趣(在后面才会将感兴趣的事件设置为 OP_READ)。
当 doRegister()方法执行完之后,就会执行第二步:pipeline.invokeHandlerAddedIfNeeded(),这一步作的事情就是回调 pipeline 中 handler 的 handlerAdded()方法。
往下执行,代码会执行到 pipeline.fireChannelRegistered(),也就是前面咱们提到的第三步。这一步作的事情就是传播 Channel 注册事件,如何传播呢?就是沿着 pipeline 中的头结点这个 handler 开始,日后依次执行每一个 handler 的 channelRegistered()方法。
在前面咱们提到过,会向客户端 channel 的 pipeline 中添加一个 ChannelInitializer 类型的匿名类,所以在传播执行 channelRegistered()方法的时候,就会执行到该匿名类的 channelRegistered()方法,从而最终会执行该匿名类中重写的 initChannel(channel)方法,即以下图所示的代码。关因而如何调用到 initChannel(channel)方法中的,能够参考这篇文章:Netty 源码分析系列之服务端 Channel 注册,里面进行了很详细的分析。不过读源码最佳方式仍是亲自动手,Debug 调试一下你也许会体会更深,更容易理解。
再次回到 register0()方法中,最后会判断 isActive()是否为 true,此时因为客户端 channel 已经注册到多路复用器上了,所以会返回 true,并且因为此时客户端 channel 是第一次注册,因此会 pipeline.fireChannelActive()这一行代码,也就是又会经过客户端 channel 的 pipeline 向下传播执行全部 handler 的 channelActive()方法,最终会调用到 AbstractChannel 的 doBeginRead()方法(这一步的调用过程很复杂,建议直接 DEBUG)。doBeginRead 方法的源码以下。
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
/** * 在客户端channel注册到多路复用器上时,将selectionKey的interestOps属性设置为了0 * selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); */
final int interestOps = selectionKey.interestOps();
/** * readInterestOp属性的值,是在NioSocketChannel的构造器中,被设置为SelectionKey.OP_READ */
if ((interestOps & readInterestOp) == 0) {
// 对于客户端channel而言,interestOps | readInterestOp运算的结果为OP_READ
// 因此最终selectionKey感兴趣的事件为OP_READ事件,至此,客户端channel终于能够开始接收客户端的连接了。
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
至此,客户端 channel 感兴趣的就变成了 OP_READ 事件,那么接下来就能够进行数据的读写了。
本文主要分析了当一个新链接进来后,netty 服务端是如何为这个新链接建立客户端 channel 的,又是如何将其绑定到 NioEventLoop 线程中的。客户端 channel 注册过程与服务端 channel 的注册过程很是类似,调用过程几乎同样,因此建议先阅读这篇文章Netty 源码分析系列之服务端 Channel 注册。