Netty 源码(一)服务端启动

Netty 源码(一)服务端启动

Netty 系列目录(http://www.javashuo.com/article/p-hskusway-em.html)html

ServerBootstap类图结构

ServerBootstap 建立时序图以下:java

ServerBootstap建立时序图

1、参数配置

(1) 绑定 Reactor 线程池ios

服务端一般会构建两个线程池,bossGroup 负责接收链接,childGroup 负责处理网络 IO 以及系统 Task。git

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    // 省略...
    this.childGroup = childGroup;
    return this;
}

// bossGroup 由父类的 AbstractBootstrap 初始化
public B group(EventLoopGroup group) {
    // 省略...
    this.group = group;
    return self();
}

(2) 绑定 Channelgithub

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

// bind 方法时会用反射建立对应的 channel
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    // 省略...
    this.channelFactory = channelFactory;
    return self();
}

(3) 绑定 ChannelHandlerpromise

// NioServerSocketChannel 使用,处理客户端链接
.handler(new LoggerHandler())
// NioSocketChannel 使用,处理网络 IO
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ServerHandler());
    }
})

这两个 handler 的区别以下图:网络

NioServerSocketChannel的Handler模型

2、启动服务

下面看一下 ServerBootstrap 是如何绑定端口socket

bind核心流程

  • doBind()
    initAndRegister
  • doBind0()
  • initAndRegister
    init()
  • beginRead()
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 建立 Channel 并将 Channel 注册到 NioEventLoop 上
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 2. 若是 channel 注册到 NioEventLoop 上成功,则绑定端口启动服务
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 3. 若是未注册成功,则绑定 ChannelFutureListener
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // channel 注册到 NioEventLoop 上失败
                    promise.setFailure(cause);
                } else {
                    // channel 注册到 NioEventLoop 上失败则绑定端口
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

doBind 主要完成二件事:一是 Channel 的初始化并注册到 NioEventLoop 上;二是绑定端口,启动服务。ide

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1. 建立 NioServerSocketChannel
        channel = channelFactory.newChannel();
        // 2. 配置 channel 的参数
        init(channel);
    } catch (Throwable t) {
        // 省略...
    }

    // 3. 将 channel 注册到 NioEventLoop 上
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    } 
    return regFuture;
}

2.1 NioServerSocketChannel

channelFactory.newChannel() 其实是用反射调用 NioServerSocketChannel 的无参构造器。oop

// 建立 NIO 底层的 ServerSocketChannel 对象
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// NioServerSocketChannel 须要注册 OP_ACCEPT 事件
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 设置成非阻塞模式,并注册感兴趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

// 建立 channel 是建立对应的 pipeline
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

ServerSocketChannel 建立为何要用 provider.openServerSocketChannel()?

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    // ServerSocketChannel.open() 每秒建立 5000 个链接时性能会下将 1%
    // https://github.com/netty/netty/issues/2308
    return provider.openServerSocketChannel();
}

2.2 初始化

init 方法配置 channel 的 socket 参数和附加属性,并配置相应的 handler。

void init(Channel channel) throws Exception {
    // 1. 配置 Socket 参数和 NioServerSocketChannel 的附加属性
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            // 2. 配置 NioServerSocketChannel 的 handler
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // 3. 将接收客户端的 ServerBootstrapAcceptor 注册到 pipeline 中
            //    注意:此时 channel 还未绑定到 eventLoop 上,直接调用会抛出空指针异常
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

ChannelInitializer 的执行

2.3 注册

config().group().register(channel)

NioServerSocketChannel 是如何注册到 NioEventLoopGroup 是的详见:http://www.javashuo.com/article/p-yrhrzohe-dx.html

3、客户端接入

NioServerSocketChannel 注册到 eventLoop 后就会启动 NioEventLoop 线程,专门处理对应 channel 的网络 IO 事件。

客户端接入

3.1 启动 NioEventLoop

// NioEventLoop 启动
protected void run() {
    // 省略...
    processSelectedKeys();
}

// Netty 对 NIO Selector 进行了优化,默认不开启(processSelectedKeysPlain)
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

processSelectedKeysPlain 负责处理注册在 selector 上的 channel

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // 处理网络 IO
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用户自定义的 Task
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }
        // 省略...
    }
}

// 分别处理 OP_CONNECT、OP_WRITE、OP_READ、OP_ACCEPT 事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 省略...
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    // OP_CONNECT
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        unsafe.finishConnect();
    }

    // OP_WRITE
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

    // OP_READ、OP_ACCEPT
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

能够看到 channel 的网络 IO 其实都是由 unsafe 类在处理,NioServerSocketChannel 的 unsafe 是在其父类 AbstractNioMessageChannel 中调用 newUnsafe() 初始化的。

3.2 接收客户端

// AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        do {
            // 1. 接收客户端的 NioSocketChannel
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    } catch (Throwable t) {
        exception = t;
    }

    // 2. 触发 pipeline 的 fireChannelRead、fireChannelReadComplete、fireExceptionCaught
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    if (exception != null) {
        closed = closeOnReadError(exception);
        pipeline.fireExceptionCaught(exception);
    }
}

NioMessageUnsafe 的 read 方法完成了二件事:一是接收客户端的 NioSocketChannel;二是触发 pipeline 的 fireChannelRead 事件完成 channel 的初始化工做 ,若有异常则触发 fireExceptionCaught。真正接收客户端请求的操做则委托给了子类 NioServerSocketChannel 的 doReadMessages 方法完成。

// 调用 NIO 底层接收客户链接
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    return 0;
}

至此,NioServerSocketChannel 已经将请求的 NioSocketChannel 接收过来,但还未完成 channel 的初始化工做,如 handler 绑定,参数配置等。

3.3 NioSocketChannel 初始化

上文提到 NioServerSocketChannel 在初始化的时候会绑定 ServerBootstrapAcceptor,这个 handler 完成了 channel 的初始化工做。NioServerSocketChannel 的 Pipeline 以下图:

NioServerSocketChannel Pipeline

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    // 1. NioSocketChannel 绑定 handler 和相关配置参数
    child.pipeline().addLast(childHandler);

    // 2. 配置 Socket 的 TCP 参数和附加属性
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    
    // 3. NioSocketChannel 注册到 eventLoop 上
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

天天用心记录一点点。内容也许不重要,但习惯很重要!

相关文章
相关标签/搜索