netty server 源码分析

netty支持BIO、NIO,也通过一些小版本尝试过AIO,但是相比于AIO没有优势。

通过前面对linux五种I/O模型进行分析,同步I/O也异步I/O的区别主要是异步I/O只关注什么时候数据复制完成了,同步I/O也要关注数据什么时候准备好了、也要关注什么时候数据从内核复制到用户空间完成了

由于普遍使用netty的NIO,从netty的NIO开始看。

1.EventLoopGroup boosGroup = new NioEventLoopGroup();       EventLoopGroup workerGroup = new NioEventLoopGroup();

创建了两个EventLoopGroup,看构造方法。

EventLoopGroup首先调用其重载方法,获取SelectorProvider,这个就是java源码分析第一步一样的获取Selecotor时获取的。在mac下获取的是KQueueSelectorProvider。

然后调用EventLoopGroup的父类MultithreadEventLoopGroup构造方法。

此方法中会判断默认传入的线程数是否为0,如果为0,设置为处理器数

*2。通过static块获取系统的cpu数量。然后调用MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup的构造函数。

第一步判断传入的threadFactory,如果为空,则创建DefaultThreadFactory作为默认的线程工厂类。然后根据线程数创建SingleThreadEventExecutor。通过线程数是否为2的次方设置选择器。如果过为2的次方选择器设置为PowerOfTwoEventExecutorChooser。否则为GenericEventExecutorChooser。发现以上2个类获取下一个得到的结果都是一样的,按顺序选择children数组中的线程,不同之外就是如果是2的平方的形式,因为是按位运算,所以执行的效率上是非常快的。而其他的选择因为要进行除余运算,所以在效率上应该会差一点。

然后循环线程数,然后调用newChild(),这个是在子类实现的,NioEventLoopGroup.newChild(),创建了NioEventLoop。查看NioEventLoop构造器,第一步调用父类构造器SingleThreadEventLoop,然后SingleThreadEventLoop再调用其父类构造器SingleThreadEventExecutor。在传入的threadFactory创建了一个thread。查看创建的线程主要是调用了NioEventLoop的run()方法。回到NioEventLoop的构造方法中,将传入的selectorProvider设置为provider。然后调用openSelector();

第一步selector = provider.openSelector();即打开selector。和java原生的NIO使用方法一致。然后通过反射获取SelectorImpl判断是否需要优化原生selector,如果不需要直接返回,这个方法后面的操作主要是通过反射优化Set类型的selectedKeys, publicSelectedKeysNetty自己使用实现Set接口SelectedSelectionKeySet代替原来的Set实现,防止在selectedKeyspublicSelectedKeys在读写时出现java.util.ConcurrentModificationException的异常。然后整个new NioEventLoopGroup()就完成了。

然后创建ServerBootstrap()

ServerBootStrap通过建造者模式进行创建。第一步调用group注入EventLoopGroup。第二步调用channel参数为NioServerSocketChannel.class。然后调用ServerBootStrap方法的channelFactory。参数为BootstrapChannelFactory。泛型为NioServerSocketChannel.class。然后设置channelFactory为传入的值。此步骤在后面会用到channelFactory创建NioServerSocketChannel.class。第三步调用option设置参数。第四步调用childHandler() 设置childHandler参数

调用AbstractBootstrap.bind()此方法第一步调用validate()进行参数校验,然后调用doBind()

第一步调用initAndRegister()获取ChannelFuture

initAndRegister()第一步通过在ServerBootstrap创建时设置的BootstrapChannelFactorynewChannel()反射方法创建NioServerSocketChannel

查看NioServerSocketChannel的构造函数,调用了newSocket()

可以看到调用了provider. openServerSocketChannel()。返回了ServerSocketChannelImpl。此步骤对应java源码的ServerSocketChannel.open()。获取javachannel然后调用NioServerSocketChannel的父类AbstractNioMessageChannel构造器,AbstractNioMessageChannel调用自己的父类AbstractNioChannel的构造器。第一步先调用其父类AbstractChannel然后设置channel为非阻塞。也是在java源码设置channel为非阻塞模式。

 

然后查看AbstractChannel的构造函数。

此函数先将传入的channel设置为parent。然后调用newUnsafe()生成Unsafe方法。实际在AbstractNioMessageChannel中实现了此方法。创建了NioMessageUnsafe(),为后续事件处理做准备。然后创建一个DefaultChannelPipeline()。这个相当于一个容器,其是一个双向链表,后面请求入站和出站就逐个执行相应的时间。后面的业务逻辑就是在里面实现。Netty保证顺序执行。看DefaultChannelPipeline的构造函数。

channel设置到容器中。则此channel和当前容器就进行了绑定。然后创建了TailContextHeadContext并将其设置到链表的头部和尾部。

Todo  TailContextHeadContext的构造器。

initAndRegister()第二步骤调用init()初始化channel。此方法在ServerBootStrap中。

init()方法第一步获取通过创建ServerBootStrap中调用options方法设置的options设置到channlconfig中。同样将attrs设置到channelattr

然后获取前面在创建NioServerSocketChannel时创建的容器pipline;调用

addList将构造ServerBootStrap中的handler添加到链表中。实际实现可以看addList方法的具体实现。

回到initAndRegister(),初始化完channel后,获取在构造ServerBootStrap创建的NioEventLoopGroupboos线程,执行register()方法。执行NioEventLoopGroup父类MultithreadEventLoopGroup,先调用next()方法逐条获取NioEventLoop。里面调用的是前面创建的chooser。然后调用NioEventLoopregister()将所有的channel注册到selector中。调用NioEventLoop的父类SingleThreadEventLoopregister()

然后调用构造NioServerSocketChannel创建的NioMessageUnsaferegister()NioMessageUnsafe的继承关系为

NioMessageUnsafe未实现register()交给其父类AbstractUnsaferegister()方法。

先对必要参数进行校验,然后将channel与当前NioEventLoop进行关联,此步骤表示一个channel只和一个线程进行绑定。然后执行register0()

再执行AbstractNioUnsafedoRegister()

调用javaChannel()获取当前channel,然后执行register(),实际执行的为ServerSocketChannel的父类AbstractSelectableChannelregister(),

此方法也即java源码中register的注册。

initAndRegister()执行完后,回到doBind()继续执行doBind0()

获取channel中的NioEventLoop。然后调用NioEventLoopexecute ()。实际执行NioEventLoop的父类SingleThreadEventExecutor.execute();

判断当前线程是否已经启动,当前线程还没有启动,执行else。调用startThread方法启动线程。其执行NioEventLooprun()

 

此方法首先判断当前是否有任务,如果有任务,则执行selectNow(),内部也即调用selector.selectNow();前面在看javanio源码时,也即调用select方法查询就绪的事件,此方法非阻塞方法,会立即返回。如果没有任务,则执行select()方法,此方法修复了javanioselect空轮询问题,实际就是如果空转了512次,调用rebuildSelector();重新生成selector

ioRatioI/Oexecutor里任务执行的比率,如果设为100则表示I/O的优先级最高,默认为50

然后执行processSelectedKeys();判断selectedKeys是否为空,这里为OP_ACCEPTED,执行processSelectedKeysOptimizedprocessSelectedKeysPlainprocessSelectedKeysOptimized逻辑差不多。

循环selectionkey。判断attachment是否为AbstractNioChannel的子类,属于作者的扩展,然后执行processSelectedKey(),

先判断数据是否可用,如果不可用,调用unsafe关闭当前channel

可以看到这块的逻辑和java nio的逻辑基本一直,对感兴趣的事件进行处理。

 

上图为javanio的处理,以OP_READ为例

调用unsaferead方法。实际调用前面初始化NioServerSocketChannel时创建的AbstractNioUnsaferead(),

 

第一步断言线程是否启动,娇艳channel的配置是否可读。

因为存在TCP粘包/拆包,所以循环读,调用doReadMessages(),读取后调用

pipeline.fireChannelRead(readBuf.get(i));也即调用当前channel的容器DefaultChannelPipelinefireChannelRead()

 

实际的调用流程如图。