接下来咱们会经过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深刻了解 Netty 的设计。java
使用 Netty 来实现一个 TCP 服务器,咱们大体要作如下事情:git
咱们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 自己的各类组件,应用用户的设置参数。注册的主要工做最终是将 SelectableChannel
注册到多路复用器 Selector
。这一过程在全部基于 Java NIO 的项目里都是相似的。github
服务端的整个构建过程是从 ServerBootstrap
开始的:编程
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) .bind(portNum).sync().channel().closeFuture().sync();
接下来咱们从 AbstractBootstrap
的 bind(int port)
方法开始了解 Netty 的源码。设计模式
AbstractBootstrap
,方法:ChannelFuture doBind(final SocketAddress localAddress)
bind(int port)
的实际工做绝大部分是在 AbstractBootstrap
的 ChannelFuture doBind(final SocketAddress localAddress)
实现的,咱们来看其源码:promise
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
doBind(SocketAddress localAddress)
方法主要会作三件事:服务器
Selector
最后一步 bind 须要在 register 完成以后再执行,可是由于这些动做均可能发生在不一样的线程上,因此 bind 的动做是经过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操做。网络
由于 AbstractBootstrap
类的 initAndRegister()
方法是接下来第一个被调用的方法,因此咱们接下来看它的源码。架构
AbstractBootstrap
,方法:ChannelFuture initAndRegister()
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); // 见 2.5 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
这个方法主要作了下面几件事:并发
Channel channel = channelFactory().newChannel();
此处经过 channelFactory()
方法获得的 ChannelFactory
实现类是 ReflectiveChannelFactory
。(这个类是默认实现,AbstractBootstrap
还提供了方法用来设置其它 ChannelFactory
的实现)这个类将经过 channel(NioServerSocketChannel.class)
提供的类反射建立出 NioServerSocketChannel
。而在 NioServerSocketChannel
的构造函数里,一个 java.nio.channels.ServerSocketChannel
将被构建出来。
接下来,在 NioServerSocketChannel
的父类 AbstractNioChannel
的构造函数中,ServerSocketChannel
被设置成非阻塞模式 ch.configureBlocking(false);
在 AbstractNioChannel
父类 AbstractChannel
的构造函数里,有两个重要的对象被构造出来
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
在这里 unsafe
和 pipeline
被构建出来。构造出 unsafe
的 newUnsafe()
方法是在子类中实现,在本例中 NioMessageUnsafe
(还有一个相似的 NioByteUnsafe
,其为 NioSocketChannel
提供了具体的 IO 实现),它包含了不少具体的方法实现。而 DefaultChannelPipeline
则是另外一个重要的类,接下来咱们来看看它的构造函数。
DefaultChannelPipeline
的构造函数public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline
构造函数中作了一件很重要的事,就是构造了 ChannelHandler
链。TailContext
和 HeadContext
都继承了 AbstractChannelHandlerContext
。另外,TailContext
实现了 ChannelInboundHandler
,HeadContext
实现了 ChannelOutboundHandler
(Netty 5 在此处有变化,其再也不区分 ChannelInboundHandler
和 ChannelOutboundHandler
)。
上面这些就是 final Channel channel = channelFactory().newChannel();
所作的事情。接下来咱们来了解 init(Channel channel)
方法所作的事情。在这里,void init(Channel channel)
方法由 AbstractBootstrap
的子类 ServerBootstrap
实现
ServerBootstrap
,方法:void init(Channel channel)
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
这个方法主要作了两件事:
NioServerSocketChannel
设置 options 和 attrsChannelInitializer
。这个 ChannelInitializer
会在 ChannelRegistered 事件发生时将 ServerBootstrapAcceptor
添加到 pipeline 上。在后面 ServerBootstrapAcceptor
将被用来接收客户端的链接,咱们会在后续文章中介绍。DefaultChannelPipeline
的 addLast
方法会将新的 ChannelHandler
添加到 tail 以前,其它全部 ChannelHandler
以后。在作完初始化工做以后,就要开始注册的工做了。接下来来看 group().register(channel)
的实现。其中 group()
方法将会返回咱们在 b.group(bossGroup, workerGroup)
中设定的 bossGroup
这里不作过多介绍了。接下来看 register(channel)
方法。
AbstractChannel.AbstractUnsafe
,方法:void register(EventLoop eventLoop, ChannelPromise promise)
由于咱们实现的是一个 NIO server,因此此处 EventLoop
使用的实现类是 NioEventLoop
。NioEventLoop
的 register(Channel)
方法是继承自 SingleThreadEventLoop
。而 SingleThreadEventLoop
则经过 channel.unsafe().register(this, promise)
方法将注册工做代理给了 Channel.Unsafe
来实现。此处 Unsafe
的实现类是 AbstractChannel.AbstractUnsafe
(多么一致的命名)。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
先解释一下 eventLoop.inEventLoop()
,这个判断在不少地方都能看见。这个方法是用来判断当前的线程是不是 EventLoop
的任务执行线程。若是是,那就不用在添加任务,直接执行就能够了,不然须要将任务添加到 EventLoop
中。在本例中,很明显,执行过程将走到 else 分支中。
注册工做主要是在 doRegister()
方法中实现的,这个方法是定义在 AbstractChannel
中的一个抽象方法。在本例中,这个方法由 AbstractNioChannel
实现的。
AbstractNioChannel
,方法:doRegister()
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
在这个方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this);
经过调用 JDK 方法,将 SelectableChannel
注册到 Selector
上。注意一个细节,由于同一个 Channel 和 Selector 能够对应一个 SelectionKey,因此若是另一个相应的 SelectionKey 的 cancel
方法被执行以后,会致使 SelectableChannel
的 register
方法抛出 CancelledKeyException
。因此这里经过 selectNow()
方法清除取消状态以后,从新 register。循环的缘由就像是注释所描述的同样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?
上面这几个过程就实现了 Server Channel 的初始化和注册工做。
从这部分代码,咱们能够看到不少设计模式的使用
在使用 ServerBootstrap
构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。
Channel channel = channelFactory().newChannel();
使用了工厂方法模式。ServerBootstrap
提供了方法设置工厂类,同时也提供了默认实现。经过工厂方法模式建立 Channel
,实现了良好的可扩展性。
AbstractBootstrap
的 void init(Channel channel)
就是一个模板方法。初始化和注册工做的主要实现方法是 AbstractBootstrap
的 initAndRegister
,这个方法调用模板方法 init(Channel)
。而模板方法 init(Channel)
则交由子类作具体的实现。
一样,AbstractChannel
的 doRegister
方法也是一个模板方法模式的例子。
Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是做为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,而后再在这个架构之上实现各类功能。Netty 的执行核心是 EventLoopGroup
及其实现类。绝大部分 IO 操做和非 IO 操做都是交由 EventLoopGroup
来执行。这是 Netty 能被用来实现高并发服务的缘由之一。因此本文所涉及的操做,因此并不复杂,可是其中的一些操做,例如注册工做,也是须要交由 EventLoopGroup
异步执行。这虽然由于异步的方式,提升了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。
若是说 EventLoopGroup
是执行调度的核心,那 Channel
就是实现具体操做的实现核心。由于网络编程十分复杂,有各类复杂的协议,也有复杂的底层操做系统实现,因此 Channel
相应的实现类也是种类繁多。这其实并无增长复杂度,而是将各类复杂功能各归其主,将实现细节梳理清晰。
接下来的文章将会解析服务端端口绑定的实现。