Netty服务端启动流程源码分析
前记
哈喽,自从上篇《Netty之旅二:口口相传的高性能Netty究竟是什么?》后,迟迟两周才开启今天的Netty
源码系列。源码分析的第一篇文章,下一篇我会分享客户端的启动过程源码分析。经过源码的阅读,咱们将会知道,Netty
服务端启动的调用链是很是长的,同时确定也会发现一些新的问题,随着咱们源码阅读的不断深刻,相信这些问题咱们也会一一攻破。java
废话很少说,直接上号!git
1、从EchoServer示例入手
示例从哪里来?任何开源框架都会有本身的示例代码,Netty源码也不例外,如模块netty-example
中就包括了最多见的EchoServer
示例,下面经过这个示例进入服务端启动流程篇章。github
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 建立 EchoServerHandler 对象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 声明服务端启动引导器,并设置相关属性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 3. 绑定端口即启动服务端,并同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync(); // 4. 监听服务端关闭,并阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 优雅地关闭两个EventLoopGroup线程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
- [代码行1八、19]声明
Main-Sub Reactor
模式线程池:EventLoopGroup
建立两个 EventLoopGroup
对象。其中,bossGroup
用于服务端接受客户端的链接,workerGroup
用于进行客户端的 SocketChannel
的数据读写。算法
(<u>关于EventLoopGroup
不是本文重点因此在后续文章中进行分析</u>)编程
- [代码行23-39]声明服务端启动引导器,并设置相关属性
AbstractBootstrap
是一个帮助类,经过方法链(method chaining
)的方式,提供了一个简单易用的方式来配置启动一个Channel
。io.netty.bootstrap.ServerBootstrap
,实现 AbstractBootstrap
抽象类,用于 Server
的启动器实现类。io.netty.bootstrap.Bootstrap
,实现 AbstractBootstrap
抽象类,用于 Client
的启动器实现类。以下类图所示:bootstrap

(<u>在EchoServer
示例代码中,咱们看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等属性链式设置都放到关于AbstractBootstrap
体系代码中详细介绍。</u>)数组
- [代码行43]绑定端口即启动服务端,并同步等待
先调用 #bind(int port)
方法,绑定端口,后调用 ChannelFuture#sync()
方法,阻塞等待成功。对于bind
操做就是本文要详细介绍的"服务端启动流程"。promise
- [代码行47]监听服务端关闭,并阻塞等待
先调用 #closeFuture()
方法,监听服务器关闭,后调用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel
的监听关闭。服务器
- [代码行5一、52]优雅地关闭两个
EventLoopGroup
线程池
finally
代码块中执行说明服务端将最终关闭,因此调用 EventLoopGroup#shutdownGracefully()
方法,分别关闭两个EventLoopGroup
对象,终止全部线程。架构
2、服务启动过程
在服务启动过程的源码分析以前,这里回顾一下咱们在经过JDK NIO
编程在服务端启动初始的代码:
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
这5行代码标示一个最为熟悉的过程:
- 打开
serverSocketChannel
- 配置非阻塞模式
- 为
channel
的socket
绑定监听端口 - 建立
Selector
- 将
serverSocketChannel
注册到selector
后面等分析完Netty
的启动过程后,会对这些步骤有一个新的认识。在EchoServer
示例中,进入 #bind(int port)
方法,AbstractBootstrap#bind()
其实有多个方法,方便不一样地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代码以下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
- [代码行2] :调用
#initAndRegister()
方法,初始化并注册一个Channel
对象。由于注册是异步的过程,因此返回一个ChannelFuture
对象。详细解析,见 「initAndRegister()
」。 - [代码行4-6]]:若发生异常,直接进行返回。
- [代码行9-34]:由于注册是异步的过程,有可能已完成,有可能未完成。因此实现代码分红了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的状况。
- 核心在[第 11 、29行],调用
#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,绑定 Channel 的端口,并注册 Channel 到SelectionKey
中。 - 若是异步注册对应的
ChanelFuture
未完成,则调用ChannelFuture#addListener(ChannelFutureListener)
方法,添加监听器,在注册完成后,进行回调执行#doBind0(...)
方法的逻辑。
- 核心在[第 11 、29行],调用
经过doBind
方法能够知道服务端启动流程大体以下几个步骤:
1. 建立Channel
从#doBind(final SocketAddress localAddress)
进入到initAndRegister()
:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
[代码行4]调用 ChannelFactory#newChannel()
方法,建立Channel
对象。 ChannelFactory
类继承以下:
能够在ChannelFactory
注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,这里只是包名的调整,对于继承结构不变。netty
默认使用ReflectiveChannelFactory
,咱们能够看到重载方法:
@Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
很明显,正如其名是经过反射机制构造Channel
对象实例的。constructor
是在其构造方法初始化的:this.constructor = clazz.getConstructor();
这个clazz
按理说应该是咱们要建立的Channel
的Class对象。那Class
对象是什么呢?咱们接着看channelFactory
是怎么初始化的。
首先在AbstractBootstrap
找到以下代码:
@Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
调用这个方法的递推向上看到:
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
这个方法正是在EchoServer
中ServerBootstrap
链式设置时调用.channel(NioServerSocketChannel.class)
的方法。咱们看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
做为具体实例,而且将NioServerSocketChannel.class
做为构造参数传递初始化的,因此这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel
对象。
继续看NioServerSocketChannel
构造方法逻辑作了什么事情,看以前先给出NioServerSocketChannel
类继承关系:
NioServerSocketChannel
与NioSocketChannel
分别对应服务端和客户端,公共父类都是AbstractNioChannel
和AbstractChannel
,下面介绍建立过程能够参照这个Channel
类继承图。进入NioServerSocketChannel
构造方法:
/** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
点击newSocket
进去:
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
以上传进来的provider
是DEFAULT_SELECTOR_PROVIDER
即默认的java.nio.channels.spi.SelectorProvider
,[代码行9]就是熟悉的jdk nio
建立ServerSocketChannel
。这样newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回告终果ServerSocketChannel
,回到NioServerSocketChannel()#this()
点进去:
/** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
以上super
表明父类AbstractNioMessageChannel
构造方法,点进去看到:
/** * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }
以上super
表明父类AbstractNioChannel
构造方法,点进去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
以上[代码行3]将ServerSocketChannel
保存到了AbstractNioChannel#ch
成员变量,在上面提到的NioServerSocketChannel
构造方法的[代码行6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
变量。
以上[代码行6]就是熟悉的jdk nio
编程设置ServerSocketChannel
非阻塞方式。这里还有super
父类构造方法,点击进去看到:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
以上构造方法中:
parent
属性,表明父Channel
对象。对于NioServerSocketChannel
的parent
为null
。id
属性,Channel
编号对象。在构造方法中,经过调用#newId()
方法进行建立。(<u>这里不细展开Problem-1</u>)unsafe
属性,Unsafe
对象。由于Channel
真正的具体操做,是经过调用对应的Unsafe
对象实施。因此须要在构造方法中,经过调用#newUnsafe()
方法进行建立。这里的Unsafe
并非咱们常说的jdk
自带的sun.misc.Unsafe
,而是io.netty.channel.Channel#Unsafe
。(<u>这里不细展开Problem-2</u>)pipeline
属性默认是DefaultChannelPipeline
对象,赋值后在后面为channel绑定端口的时候会用到
经过以上建立channel
源码过程分析,总结的流程时序图以下:
2. 初始化Channel
回到一开始建立Channel
的initAndRegister()
入口方法,在建立Channel
后紧接着init(channel)
进入初始化流程,由于是服务端初始化,因此是ServerBootstrap#init(Channel channel)
,代码以下:
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
-
[代码 3 - 6 行]:
options0()
方法返回的options
保存了用户在EchoServer
中设置自定义的可选项集合,这样ServerBootstrap
将配置的选项集合,设置到了Channel
的可选项集合中。 -
[代码 8 - 15 行]:
attrs0()
方法返回的attrs
保存了用户在EchoServer
中设置自定义的属性集合,这样ServerBootstrap
将配置的属性集合,设置到了Channel
的属性集合中。 -
[代码21-28行]:经过局部变量
currentChildOptions
和currentChildAttrs
保存了用户自定义的childOptions
和childAttrs
,用于[代码43行]ServerBootstrapAcceptor
构造方法。 -
[代码30-47]]:建立
ChannelInitializer
对象,添加到pipeline
中,用于后续初始化ChannelHandler
到pipeline
中,包括用户在EchoServer
配置的LoggingHandler
和建立的建立ServerBootstrapAcceptor
对象。-
[代码行34-37]:添加启动器配置的
LoggingHandler
到pipeline
中。 -
[代码行39-45]:建立
ServerBootstrapAcceptor
对象,添加到pipeline
中。从名字上就能够看出来,ServerBootstrapAcceptor
也是一个ChannelHandler
实现类,专门用于接受客户端的新链接请求,把新的请求扔给某个事件循环器,咱们先不作过多分析。咱们发现是使用EventLoop.execute
执行添加的过程,这是为何呢?一样记录问题(<u>Problem-</u>3) -
须要说明的是
pipeline
在以前介绍Netty核心组件的时候提到是一个包含ChannelHandlerContext
的双向链表,每个context
对于惟一一个ChannelHandler
,这里初始化后,ChannelPipeline
里就是以下一个结构:
-
3. 注册Channel
初始化Channel
一些基本配置和属性完毕后,回到一开始建立Channel
的initAndRegister()
入口方法,在初始化Channel
后紧接着[代码行17] ChannelFuture regFuture = config().group().register(channel);
明显这里是经过EventLoopGroup
进入注册流程(EventLoopGroup
体系将在后续文章讲解)
在EchoServer
中启动器一样经过ServerBootstrap#group()
设置了NioEventLoopGroup
,它继承自MultithreadEventLoopGroup
,因此注册流程会进入MultithreadEventLoopGroup
重载的register(Channel channel)
方法,代码以下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
这里会调用 next()
方法选择出来一个 EventLoop
来注册 Channel
,里面实际上使用的是一个叫作 EventExecutorChooser
的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本质上就是从 EventExecutor
数组中选择一个 EventExecutor
,咱们这里就是 NioEventLoop
,那么,它们有什么区别呢?(<u>Problem-4:在介绍EventLoopGroup
体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。</u>)
接着,来到 NioEventLoop
的 register(channel)
方法,你会不会问找不到该方法?提示NioEventLoop
继承SingleThreadEventLoop
,因此父类方法:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
能够看到,先建立了一个叫作 ChannelPromise
的东西,它是 ChannelFuture
的子类。[代码行9]又调回了 Channel
的 Unsafe
的 register ()
方法,这里第一个参数是 this
,也就是 NioEventLoop
,第二个参数是刚建立的 ChannelPromise
。
点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法进去,代码以下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
[代码行15]这行代码是设置 Channel
的 eventLoop
属性。这行前面的代码主要是在校验传入的 eventLoop
参数非空,校验是否有注册过以及校验 Channel
和 eventLoop
类型是否匹配。
[代码1八、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise)
方法中:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
[代码行9]进入 AbstractNioChannel#doRegister()
方法:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
[代码行5]关键一行代码,将 Java 原生NIO Selector
与 Java 原生 NIO
的 Channel
对象(ServerSocketChannel
) 绑定在一块儿,并将当前 Netty 的Channel
经过 attachment
的形式绑定到 SelectionKey
上:
- 调用
#unwrappedSelector()
方法,返回 Java 原生NIO Selector
对象,并且每一个NioEventLoop
与Selector
惟一一对应。 - 调用
SelectableChannel#register(Selector sel, int ops, Object att)
方法,注册 Java 原生NIO
的Channel
对象到NIO Selector
对象上。
经过以上注册channel源码分析,总结流程的时序图以下:
4. 绑定端口
注册完Channel
最后回到AbstractBootstrap#doBind()
方法,分析 Channel
的端口绑定逻辑。进入doBind0
代码以下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
- [代码行7]:在前面
Channel
注册成功的条件下,调用EventLoop
执行Channel
的端口绑定逻辑。可是,实际上当前线程已是EventLoop
所在的线程了,为什么还要这样操做呢?答案在【第 5 至 6 行】的英语注释,这里做为一个问题记着(<u>Problem-5</u>)。 - [代码行11]:进入
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,一样当即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE
监听事件。 - [代码行13]:若是绑定端口以前的操做并无成功,天然也就不能进行端口绑定操做了,经过promise记录异常缘由。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
方法以下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
pipeline
是以前建立channel
的时候建立的DefaultChannelPipeline
,进入该方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
[在分析初始化流程的时候最后画一个DefaultChannelPipeline
内部的结构,可以便于分析后面进入DefaultChannelPipeline
一系列bind
方法。]
首先,tail
表明TailContext
,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //省略部分代码 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
[代码行3]:findContextOutbound
方法里主要是执行ctx = ctx.prev;
那么获得的next
就是绑定LoggingHandler
的context
[代码行6]:进入invokeBind(localAddress, promise)
方法并直接执行LoggingHandler#bind(this, localAddress, promise)
,进入后的方法以下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); }
设置了LoggingHandler
的日志基本级别为默认的INFO后,进行绑定操做的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法执行ctx = ctx.prev
取出HeadContext
进入到bind方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
兜兜转转,最终跳出了pipeline
轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
方法,Channel 的端口绑定逻辑。代码以下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //此处有省略... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //此处有省略... }
作实事方法doBind
进入后以下:
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
到了此处,服务端的 Java 原生 NIO ServerSocketChannel
终于绑定上了端口。
3、问题概括
- Problem-1: 建立
Channel
流程中AbstractChannel
构造函数中为channel
分配ID的算法如何实现? - Problem-2:
AbstractChannel
内部类AbstractUnsafe
的做用? - Problem-3: 初始化
channel
流程中pipeline
添加ServerBootstrapAcceptor
是经过EventLoop.execute
执行添加的过程,这是为何呢? - Problem-4:注册
channel
流程中PowerOfTwoEventExecutorChooser
和GenericEventExecutorChooser
的区别和优化原理? - Problem-5:绑定端口流程中调用
EventLoop
执行Channel
的端口绑定逻辑。可是,实际上当前线程已是EventLoop
所在的线程了,为什么还要这样操做呢?
小结
经过对Netty服务端启动流程源码分析,咱们发现了在使用NIO
的模式下,服务端启动流程其实就是封装了JDK NIO
编程在服务端启动的流程。只不过对原生JDK NIO
进行了加强和优化,同时从架构设计上简化了服务端流程的编写。
最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,可以让更多人学习源码的旅途少走不少弯路,谢谢!
欢迎关注: