上节对Netty的作了简单介绍,这节分析下Netty启动流程,后面的源码分析都以Netty4.0.32版本为例,如下面启动代码为例子java
public class TimeServer { public void bind(int port) throws Exception { // 1.配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //2.配置参数 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingServerHandler()) .childHandler(new ChildChannelHandler()); // 3.绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //4. 等待服务端关闭 f.channel().closeFuture().sync(); } finally { //5.优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
EventLoopGroup是Netty的线程池,为Acceptor Selector和IO Selector事件提供线程支持,EventLoopGroup在初始化的时候会建立相应配置数量的EventLoop来提供单个线程,EventLoop功能比较复杂,既是Select的轮询事件线程也是其余IO事件处理的线程。使用时调用EventLoopGroup#next()方法来获取EventLoop。Netty的线程比较复杂后面章节会详细说明。promise
NioServerSocketChannel提供了JDK的Channel和selector的绑定功能,再绑定端口前会调用这个方法绑定操做。同时NioServerSocketChannel有个Unsafe内部类在初始化NioServerSocketChannel时会建立Unsafe做为他的成员变量。Unsafe提供了JDK层的io操做包括读、写、绑定端口等根据继承类不一样的不一样操做。异步
ChannelHandler是Netty的核心,它的继承包含了2个重要的处理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是写出IO事件的处理器。ChannelInboundHandler是写入IO事件的处理器同时也包含了多个触发器,包括链接断开触发器,异常触发器,注册完成触发器,取消注册触发器等,因为ChannelHandler比较复杂后面章节会详细说明。socket
ChannelFuture是Netty的异步事件等待器,Netty利用其来实现全部IO事件的异步化,ChannelFuture能够注册多个事件完成监听器,异步事件会在完成后回调监听器。ide
ServerBootstrap是Netty配置及启动的入口,提供线程池、链接参数、线程池以及处理器的配置,同时提供了绑定端口的方法。oop
咱们以b.bind(port)这行代码做为开始流程,时序图以下: 源码分析
ServerBootstrap:this
EventLoopGroup:线程
EventLoop:3d
Channel:
ChannelPipeline:
HeadContext:
ServerBootstrap类的线程池、链接参数、线程池以及处理器的配置其实就是成员变量的复制这里就是很少讲,咱们直接从b.bind()开始看。
b.bind()的方法直接将端口包装成类SocketAddress交给doBind()处理。doBind()源码以下
private ChannelFuture doBind(final SocketAddress localAddress) { //初始化Channel并注册Selector final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //绑定监听端口并开始监听 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
这个方法主要作了2件事
注册Selector是默认是异步进行的这里Netty作了一个处理,若是注册Selector已完成则同步调用doBind0()不然注册监听器等待Selector注册完成后调用doBind0(),这样保证整个doBind()方法的异步性。
接下来咱们看下initAndRegister()的实现,源码以下:
final ChannelFuture initAndRegister() { //初始化channel final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //注册Selector ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
initAndRegister()方法也是作了2件事
channelFactory().newChannel()是建立你配置的Channel这里源码配置了NioServerSocketChannel,而init()的具体实现则是在NioServerSocketChannel中,init()源码以下:
void init(Channel channel) throws Exception { ... ChannelPipeline p = channel.pipeline(); ... //将从线程组,从处理器,从配置,从属性封装成ServerBootstrapAcceptor处理器,共后续IOSelector使用。 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
在上节中咱们讲了Neety服务端线程模式能够是主从模型,因此它的配置也分主从配置,init()主要作的将主处理器(ChannelHandler)添加到主ChannelPipeline里,将 从线程组,从处理器(ChannelHandler),从配置,从属性封装成ServerBootstrapAcceptor处理器添加到主ChannelPipeline里,供后续触发客户端链接事件使用。
group().register()先是取了主线程组EventLoopGroup委托EventLoopGroup注册Selector,主线程组中也是调用next()委托给单线程EventLoop去处理
next().register(channel);
最后在EventLoop的register()中委托给Channel中的Unsafe处理(Unsafe是在Channel建立的时候被建立的)
channel.unsafe().register(this, promise);
咱们来看下Unsafe中的register()的源码:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; //若是eventLoop是当前线程直接执行,不然交给传eventLoop线程处理 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
Unsafe中的register()作的就是判断next()获取的线程是不是当前线程,若是是同步执行注册,不然异步执行注册,按例子中默认配置会异步调用register0()注册。
咱们来看下register0()的源码
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //调用JDK去注册Selector doRegister(); neverRegistered = false; registered = true; //设置注册成功通知监听器 safeSetSuccess(promise); //触发注册成功事件 pipeline.fireChannelRegistered(); //若是是第一次则触发激活成功事件 if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0()作了4件事
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
触发注册成功事件和若是绑定成功触发激活成功事件实际上是在主pipeline中链式调用用户的配置ChannelHandler,调用过程以下
这里会有个疑问绑定是在注册selector以后进行的为何这里能够触发激活成功事件,其实在safeSetSuccess(promise)代码执行以后已经通知监听器并开始执行绑定相关的代码了。
触发激活成功事件后若是配置了会触发开始读事件
public ChannelPipeline fireChannelActive() { head.fireChannelActive(); //若是配置了会触发开始读事件 if (channel.config().isAutoRead()) { channel.read(); } return this; }
读事件也是链式调用调用过程以下:
能够看到最后调用HeadContext里的read()方法
public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
委托给unsafe去处理,处理的方式就是修改interestOps添加读操做位,由IOSelector去触发读事件
selectionKey.interestOps(interestOps | readInterestOp);
在NioServerSocketChannel中readInterestOp其实对应的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel构造方法中定义:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
服务端启动后最早关心是客户端链接的,若是没有客户端的链接,后续没法IO操做,因此这里注册了SelectionKey.OP_ACCEPT来监听客户端的链接。
以上就是初始化Channel并注册Selector的流程
下面说下调用doBind0()绑定监听端口并开始监听流程,doBind0()代码以下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
绑定端口是异步进行的,这里调用channel.eventLoop()获取的线程默认就是以前注册Selector的流程中next()获取的线程。绑定的流程其实也是在主pipeline中链式调用用户的配置ChannelHandler,如图2,能够看到最后调用HeadContext里的bind()方法
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
也是委托给unsafe去处理,处理的方式就是调用JDK里的绑定接口去绑定
javaChannel().socket().bind(localAddress);
若是绑定成功且以前没触发激活成功事件则触发之,这里的流程跟以前讲的同样就不累述。
以上就是绑定监听端口并开始监听的流程。