Netty 源码分析(三):服务器端的初始化和注册过程

1. 简介

接下来咱们会经过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深刻了解 Netty 的设计。java

使用 Netty 来实现一个 TCP 服务器,咱们大体要作如下事情:git

  1. 建立 ServerSocketChannel、Channel、ChannelHandler 等一系列对象。这里的 Channel 将包含 ServerSocketChannel 和一系列 ChannelHandler。
  2. 将 ServerSocketChannel 注册到 Selector 多路复用器上
  3. 启动 EventLoop,并将 Channel 注册到其中
  4. 为 ServerSocketChannel 绑定端口
  5. 接受客户端链接,并将 SocketChannel 注册到 Selector 和 EventLoop 中
  6. 处理读写事件 ...

2. 源码解析

咱们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 自己的各类组件,应用用户的设置参数。注册的主要工做最终是将 SelectableChannel 注册到多路复用器 Selector。这一过程在全部基于 Java NIO 的项目里都是相似的。github

服务端的整个构建过程是从 ServerBootstrap 开始的:编程

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(channelInitializer)
    .bind(portNum).sync().channel().closeFuture().sync();

接下来咱们从 AbstractBootstrapbind(int port) 方法开始了解 Netty 的源码。设计模式

2.1 类:AbstractBootstrap,方法:ChannelFuture doBind(final SocketAddress localAddress)

bind(int port) 的实际工做绝大部分是在 AbstractBootstrapChannelFuture doBind(final SocketAddress localAddress) 实现的,咱们来看其源码:promise

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
 
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind(SocketAddress localAddress) 方法主要会作三件事:服务器

  1. ServerChannel 初始化
  2. 注册 ServerChannel 到 Selector
  3. 将 ServerChannel bind 到本地端口上

最后一步 bind 须要在 register 完成以后再执行,可是由于这些动做均可能发生在不一样的线程上,因此 bind 的动做是经过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操做。网络

由于 AbstractBootstrap 类的 initAndRegister() 方法是接下来第一个被调用的方法,因此咱们接下来看它的源码。架构

2.2 类:AbstractBootstrap,方法:ChannelFuture initAndRegister()

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
 
    ChannelFuture regFuture = group().register(channel); // 见 2.5
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
 
    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
 
    return regFuture;
}

这个方法主要作了下面几件事:并发

  • Channel channel = channelFactory().newChannel(); 此处经过 channelFactory() 方法获得的 ChannelFactory 实现类是 ReflectiveChannelFactory。(这个类是默认实现,AbstractBootstrap 还提供了方法用来设置其它 ChannelFactory 的实现)这个类将经过 channel(NioServerSocketChannel.class) 提供的类反射建立出 NioServerSocketChannel。而在 NioServerSocketChannel 的构造函数里,一个 java.nio.channels.ServerSocketChannel 将被构建出来。

  • 接下来,在 NioServerSocketChannel 的父类 AbstractNioChannel 的构造函数中,ServerSocketChannel 被设置成非阻塞模式 ch.configureBlocking(false);

  • AbstractNioChannel 父类 AbstractChannel 的构造函数里,有两个重要的对象被构造出来

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

在这里 unsafepipeline 被构建出来。构造出 unsafenewUnsafe() 方法是在子类中实现,在本例中 NioMessageUnsafe(还有一个相似的 NioByteUnsafe,其为 NioSocketChannel 提供了具体的 IO 实现),它包含了不少具体的方法实现。而 DefaultChannelPipeline 则是另外一个重要的类,接下来咱们来看看它的构造函数。

2.3 类 DefaultChannelPipeline 的构造函数

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
       throw new NullPointerException("channel");
    }
    this.channel = channel;
 
    tail = new TailContext(this);
    head = new HeadContext(this);
 
    head.next = tail;
    tail.prev = head;
}

DefaultChannelPipeline 构造函数中作了一件很重要的事,就是构造了 ChannelHandler 链。TailContextHeadContext 都继承了 AbstractChannelHandlerContext。另外,TailContext 实现了 ChannelInboundHandlerHeadContext 实现了 ChannelOutboundHandler(Netty 5 在此处有变化,其再也不区分 ChannelInboundHandlerChannelOutboundHandler)。

上面这些就是 final Channel channel = channelFactory().newChannel(); 所作的事情。接下来咱们来了解 init(Channel channel) 方法所作的事情。在这里,void init(Channel channel) 方法由 AbstractBootstrap 的子类 ServerBootstrap 实现

2.4 类:ServerBootstrap,方法:void init(Channel channel)

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
 
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

这个方法主要作了两件事:

  1. NioServerSocketChannel 设置 options 和 attrs
  2. 为 pipeline 添加一个 Inbound 处理器 ChannelInitializer。这个 ChannelInitializer 会在 ChannelRegistered 事件发生时将 ServerBootstrapAcceptor 添加到 pipeline 上。在后面 ServerBootstrapAcceptor 将被用来接收客户端的链接,咱们会在后续文章中介绍。DefaultChannelPipelineaddLast 方法会将新的 ChannelHandler 添加到 tail 以前,其它全部 ChannelHandler 以后。

在作完初始化工做以后,就要开始注册的工做了。接下来来看 group().register(channel) 的实现。其中 group() 方法将会返回咱们在 b.group(bossGroup, workerGroup) 中设定的 bossGroup 这里不作过多介绍了。接下来看 register(channel) 方法。

2.5 类:AbstractChannel.AbstractUnsafe,方法:void register(EventLoop eventLoop, ChannelPromise promise)

由于咱们实现的是一个 NIO server,因此此处 EventLoop 使用的实现类是 NioEventLoopNioEventLoopregister(Channel) 方法是继承自 SingleThreadEventLoop。而 SingleThreadEventLoop 则经过 channel.unsafe().register(this, promise) 方法将注册工做代理给了 Channel.Unsafe 来实现。此处 Unsafe 的实现类是 AbstractChannel.AbstractUnsafe(多么一致的命名)。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
 
    AbstractChannel.this.eventLoop = eventLoop;
 
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
 
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

先解释一下 eventLoop.inEventLoop(),这个判断在不少地方都能看见。这个方法是用来判断当前的线程是不是 EventLoop 的任务执行线程。若是是,那就不用在添加任务,直接执行就能够了,不然须要将任务添加到 EventLoop 中。在本例中,很明显,执行过程将走到 else 分支中。

注册工做主要是在 doRegister() 方法中实现的,这个方法是定义在 AbstractChannel 中的一个抽象方法。在本例中,这个方法由 AbstractNioChannel 实现的。

2.6 类:AbstractNioChannel,方法:doRegister()

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在这个方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this); 经过调用 JDK 方法,将 SelectableChannel 注册到 Selector 上。注意一个细节,由于同一个 Channel 和 Selector 能够对应一个 SelectionKey,因此若是另一个相应的 SelectionKey 的 cancel 方法被执行以后,会致使 SelectableChannelregister 方法抛出 CancelledKeyException。因此这里经过 selectNow() 方法清除取消状态以后,从新 register。循环的缘由就像是注释所描述的同样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?

上面这几个过程就实现了 Server Channel 的初始化和注册工做。

3. 总结

3.1 设计模式

从这部分代码,咱们能够看到不少设计模式的使用

Builder 模式

在使用 ServerBootstrap 构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。

工厂方法模式

Channel channel = channelFactory().newChannel(); 使用了工厂方法模式。ServerBootstrap 提供了方法设置工厂类,同时也提供了默认实现。经过工厂方法模式建立 Channel,实现了良好的可扩展性。

模板方法模式

AbstractBootstrapvoid init(Channel channel) 就是一个模板方法。初始化和注册工做的主要实现方法是 AbstractBootstrapinitAndRegister,这个方法调用模板方法 init(Channel)。而模板方法 init(Channel) 则交由子类作具体的实现。

一样,AbstractChanneldoRegister 方法也是一个模板方法模式的例子。

3.2 设计思想

Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是做为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,而后再在这个架构之上实现各类功能。Netty 的执行核心是 EventLoopGroup 及其实现类。绝大部分 IO 操做和非 IO 操做都是交由 EventLoopGroup 来执行。这是 Netty 能被用来实现高并发服务的缘由之一。因此本文所涉及的操做,因此并不复杂,可是其中的一些操做,例如注册工做,也是须要交由 EventLoopGroup 异步执行。这虽然由于异步的方式,提升了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。

若是说 EventLoopGroup 是执行调度的核心,那 Channel 就是实现具体操做的实现核心。由于网络编程十分复杂,有各类复杂的协议,也有复杂的底层操做系统实现,因此 Channel 相应的实现类也是种类繁多。这其实并无增长复杂度,而是将各类复杂功能各归其主,将实现细节梳理清晰。

4. 接下来

接下来的文章将会解析服务端端口绑定的实现。

相关文章
相关标签/搜索