前面的第一篇文章中,我以spark中的netty客户端的建立为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动过程。显然,咱们还有另外一个重要的部分--服务端的初始化和启动过程没有探究,因此这一节,咱们就来从源码层面详细分析一下netty的服务端引导类ServerBootstrap的启动过程。java
咱们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先咱们看一下spark的NettyRpc模块中建立netty服务端引导类的代码:ios
TransportServer的构造方法中会调用init方法,ServerBootstrap类就是在init方法中被建立并初始化以及启动的。
这个方法主要分为三块:git
很显然,ServerBootstrap的启动入口就是bind方法。github
// 初始化netty服务端 private void init(String hostToBind, int portToBind) { // io模式,有两种选项NIO, EPOLL IOMode ioMode = IOMode.valueOf(conf.ioMode()); // 建立bossGroup和workerGroup,即主线程组合子线程组 EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; // 缓冲分配器,分为堆内存和直接内存 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); // 建立一个netty服务端引导对象,并设置相关参数 bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); // 内存使用的度量对象 this.metrics = new NettyMemoryMetrics( allocator, conf.getModuleName() + "-server", conf); // 排队的链接数 if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } // socket接收缓冲区大小 if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } // socket发送缓冲区大小 if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } // 子channel处理器 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); // 绑定到ip地址和端口 channelFuture = bootstrap.bind(address); // 同步等待绑定成功 channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port: {}", port); }
这里的校验主要是对group和channelFactory的非空校验
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}bootstrap
这个方法,咱们以前在分析Bootstrap的启动过程时提到过,它的主要做用以下:api
以前,咱们分析了NioSocketChannel的构造过程,以及Bootstarp中对channel的初始化过程,
本节咱们要分析NioServerSocketChannel的构造过程,以及ServerBootstrap的init方法的实现。promise
private ChannelFuture doBind(final SocketAddress localAddress) { // 建立一个channel,并对这个channel作一些初始化工做 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 将这个channel绑定到指定的地址 doBind0(regFuture, channel, localAddress, promise); return promise; } else {// 对于还没有注册成功的状况,采用异步的方式,即添加一个回调 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
这里经过调用jdk的api建立了一个ServerSocketChannel。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}安全
与NioSocketChannelConfig相似,NioServerSocketChannelConfig也是一种门面模式,是对NioServerSocketChannel中的参数接口的封装。
此外,咱们注意到,这里规定了NioServerSocketChannel的初始的感兴趣的事件是ACCEPT事件,即默认会监听请求创建链接的事件。
而在NioSocketChannel中的初始感兴趣的事件是read事件。
因此,这里与NioSocketChannel构造过程最主要的不一样就是初始的感兴趣事件不一样。app
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
这里首先调用了父类的构造方法,最终调用了AbstractNioChannel类的构造方法,这个过程咱们在以前分析NioSocketChannel初始化的时候已经详细说过,主要就是建立了内部的Unsafe对象和ChannelPipeline对象。异步
分析完了channel的构造过程,咱们再来看一下ServerBootstrap是怎么对channel对象进行初始化的。
因此,很显然,咱们接下来就要看一下这个特殊的handler,ServerBootstrapAcceptor的read方法。
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); // 设置参数 synchronized (options) { setChannelOptions(channel, options, logger); } // 设置属性 final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); // 子channel的group和handler参数 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 添加处理器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 通常状况下,对于ServerBootstrap用户无需设置handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 这里添加了一个关键的handler,而且顺手启动了对应的EventLoop的线程 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
在分析ServerBootstrapAcceptor以前,咱们首先来回顾一下NioEventLoop的循环中,对于accept事件的处理逻辑,这里截取其中的一小段代码:
// 处理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
可见,对于accept事件和read事件同样,调用NioUnsafe的read方法
由于NioServerSocketChannel继承了AbstractNioMessageChannel,而且read方法的实现也是在AbstractNioMessageChannel中,
根据前面对channelPipeline的分析,咱们知道,读事件对从头结点开始,向尾节点传播。上面咱们也提到了,对于初始的那个NioServerSocketChannel,会在ServerBootstarp的init方法中向这个channel的处理链中加入一个ServerBootstrapAcceptor处理器,因此,很显然,接下来咱们应该分析ServerBootstrapAcceptor中对读事件的处理。
public void read() { // 确认当前代码的执行是在EventLoop的线程中 assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 这里读取到的是创建的链接对应的channel, // jdk的socketChannel被包装成了netty的NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 把接收到的每个channel做为消息,在channelPipeline中触发一个读事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // 最后触发一个读完成的事件 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
代码逻辑仍是比较简单的,由于有了前面的铺垫,即在ServerBootstrap的init方法对创始的那个serverChannel进行初始化时,将用户设置的子channel的参数,属性,子channel的handler和子group等参数做为构造参数所有传给了ServerBootstrapAcceptor,因此在这里直接用就好了。
其实这里的子channel的初始化和注册过程和Bootstrap中对一个新建立的channel的初始化过程基本同样,区别在于Bootstrap中channel是用户代码经过调用connect方法最终在initAndregistry中经过反射构造的一个对象;而在服务端,经过监听ServerSocketChannel的accept事件,当有新的链接创建请求时,会自动建立一个SocketChannel(jdk的代码实现),而后NioServerSocketChannel将其包装成一个NioSocketChannel,并做为消息在传递给处理器,因此在ServerSocketChannel中的子channel的建立是由底层的jdk的库实现的。
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 类型转换,这里的强制转换是安全的的, // 是由各类具体的AbstractNioMessageChannel子类型的实现保证的 // 各类具体的AbstractNioMessageChannel子类型的读方法确保它们读取并最终返回的是一个Channel类型 final Channel child = (Channel) msg; // 给子channel添加handler child.pipeline().addLast(childHandler); // 给子channel设置参数 setChannelOptions(child, childOptions, logger); // 给子channel设置属性 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 将子channel注册到子group中 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
回到doBind方法中,在完成了channel的构造,初始化和注册逻辑后,接下来就要把这个server类型的channel绑定到一个地址上,这样才能接受客户端创建链接的请求。
从代码中能够看出,调用了channel的bind方法实现绑定的逻辑。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 调用了channel.bind方法完成绑定的逻辑 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
bind操做的传递是从尾节点开始向前传递,因此咱们直接看Headcontext对于bind方法的实现
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
调用了unsafe的bind方法。
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
由于后面右有几个事件的触发,每一个触发事件都是经过channel的相关方法来触发,而后又是经过channelpipeline的传递事件,这些事件最后基本都是由HeadContext处理了,因此这里我只简单地叙述一下后面的 大概逻辑,代码比较繁琐,并且不少都是相同的调用过程,因此就不贴代码了。
从代码中能够看出来,最终调用了jdk的api,将感兴趣的事件添加到selectionKey中。经过前面的 分析,咱们知道对于NioSocketChannel,它的感兴趣的读事件类型是SelectionKey.OP_READ,也就是读事件;
而对于NioServerSocketChannel,根据前面对其构造方法的分析,它的感兴趣的事件是SelectionKey.OP_ACCEPT,也就是创建链接的事件。
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 将读事件类型加入到selectionKey的感兴趣的事件中 // 这样jdk底层的selector就会监听相应类型的事件 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
到这里,咱们就把ServerBootstrap的主要功能代码分析完了,这里面主要包括三个方面: