Netty源码分析第2章(NioEventLoop)---->第6节: 执行select操做

 

Netty源码分析第二章: NioEventLoophtml

 

第六节: 执行select操做ide

 

分析完了selector的建立和优化的过程, 这一小节分析select相关操做oop

跟到跟到select操做的入口,NioEventLoop的run方法:源码分析

protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //轮询io事件(1)
                    select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默认是50
            final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //记录下开始时间
                final long ioStartTime = System.nanoTime(); try { //处理轮询到的key(2)
 processSelectedKeys(); } finally { //计算耗时
                    final long ioTime = System.nanoTime() - ioStartTime; //执行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代码省略
 } }

代码比较长, 其实主要分为三部分:优化

1. 轮询io事件ui

2. 处理轮询到的keythis

3. 执行taskspa

这一小节, 主要剖析第一部分, 轮询io事件线程

首先switch块中默认会走到SelectStrategy.SELECT中, 执行select(wakenUp.getAndSet(false))方法rest

参数wakenUp.getAndSet(false)表明当前select操做是未唤醒状态

进入到select(wakenUp.getAndSet(false))方法中:

private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; //当前系统的纳秒数
        long currentTimeNanos = System.nanoTime(); //截止时间=当前时间+队列第一个任务剩余执行时间
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒)
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //进行阻塞式的select操做
            int selectedKeys = selector.select(timeoutMillis); //轮询次数
            selectCnt ++; //若是轮询到一个事件(selectedKeys != 0), 或者当前select操做须要唤醒(oldWakenUp), //或者在执行select操做时已经被外部线程唤醒(wakenUp.get()), //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks())
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //省略 //记录下当前时间
            long time = System.nanoTime(); //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操做, 条件不成立, 有可能发生空轮询)
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //表明已经进行了一次阻塞式select操做, 操做次数重置为1
                selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //省略日志代码 //若是空轮询的次数大于一个阈值(512), 解决空轮询的bug
 rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //代码省略
    } catch (CancelledKeyException e) { //省略
 } }

首先经过 long currentTimeNanos = System.nanoTime() 获取系统的纳秒数

继续往下看:

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

delayNanos(currentTimeNanos)表明距定时任务中第一个任务剩余多长时间, 这个时间+当前时间表明此次操做不能超过的时间, 由于超过以后定时任务不能严格按照预约时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 因此第一个任务则是最近须要执行的任务, selectDeadLineNanos就表明了当前操做不能超过的时间

而后就进入到了无限for循环

for循环中咱们关注:

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L

 selectDeadLineNanos - currentTimeNanos+500000L 表明截止时间-当前时间+0.5毫秒的调整时间, 除以1000000表示将计算的时间转化为毫秒数

最后算出的时间就是selector操做的阻塞时间, 并赋值到局部变量的timeoutMillis

 

后面有个判断 if(imeoutMillis<0) , 表明当前时间已经超过了最后截止时间+0.5毫秒,  selectCnt == 0 表明没有进行select操做, 知足这两个条件, 则执行selectNow()以后, selectCnt赋值为1以后跳出循环

 

若是没超过截止时间, 就进行了 if(hasTasks() && wakenUp.compareAndSet(false, true)) 判断

 

这里咱们关注hasTasks()方法, 这里是判断当前NioEventLoop所绑定的taskQueue是否有任务, 若是有任务, 则执行selectNow()以后, selectCnt赋值为1以后跳出循环(跳出循环以后去执行任务队列中的任务)

 

hasTasks()方法能够本身跟一下, 很是简单

若是没有知足上述条件, 就会执行 int selectedKeys = selector.select(timeoutMillis) 进行阻塞式轮询, 而且自增轮询次数, 然后会进行以下判断:

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; }

selectedKeys != 0表明已经有轮询到的事件, oldWakenUp表明当前select操做是否须要唤醒, wakenUp.get()说明已经被外部线程唤醒, hasTasks()表明任务队列是否有任务, hasScheduledTasks()表明定时任务队列是否任务, 知足条件之一, 就跳出循环

 

 long time = System.nanoTime() 记录了当前的时间, 以后有个判断:

 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) 这里的意思是当前时间-阻塞时间>方法开始执行的时间, 这里说明已经完整的执行完成了一个阻塞的select()操做, selectCnt设置成1

 

若是此条件不成立, 说明没有完整执行select()操做, 可能触发了一次空轮询, 根据前一个selectCnt++这步咱们知道, 每触发一次空轮询selectCnt都会自增

以后会进入第二个判断 SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 

 

其中SELECTOR_AUTO_REBUILD_THRESHOLD默认是512, 这个判断意思就是空轮询的次数若是超过512, 则会认为是发生了epoll bug, 这样会经过rebuildSelector()方法从新构建selector, 而后将从新构建的selector赋值到局部变量selector, 执行一次selectNow(), selectCnt初始化1, 跳出循环

 

rebuildSelector()方法中, netty是如何解决epoll bug:

public void rebuildSelector() { //是不是由其余线程发起的
    if (!inEventLoop()) { //若是是其余线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用
        execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //从新建立一个select
        newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; for (;;) { try { //拿到旧select中全部的key
            for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { Object a = key.attachment(); //代码省略 //获取key注册的事件
                    int interestOps = key.interestOps(); //将key注册的事件取消
 key.cancel(); //注册到从新建立的新的selector中
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a); //若是channel是NioChannel
                    if (a instanceof AbstractNioChannel) { //从新赋值
                        ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //代码省略
 } } } catch (ConcurrentModificationException e) { continue; } break; } selector = newSelector; //代码省略
}

首先会判断是否是当前NioEventLoop线程执行的, 若是不是, 则将构建方法封装成task由当前NioEventLoop执行

 final Selector oldSelector = selector 表示拿到旧的selector

而后经过 newSelector = openSelector() 建立新的selector

经过for循环遍历全部注册在selector中的key

 Object a = key.attachment() 是获取channel, 第一章讲过, 在注册时, 将自身做为属性绑定在key

for循环体中, 经过 int interestOps = key.interestOps() 获取其注册的事件

key.cancel()将注册的事件进行取消

 SelectionKey newKey = key.channel().register(newSelector, interestOps, a) 将channel以及注册的事件注册在新的selector

 if (a instanceof AbstractNioChannel) 判断是否是NioChannel

若是是NioChannel, 则经过 ((AbstractNioChannel) a).selectionKey = newKey 将自身的属性selectionKey赋值为新返回的key

 selector = newSelector 将自身NioEventLoop属性selector赋值为新建立的newSelector

至此, 就是netty解决epoll bug的步骤, 其实就是建立一个新的selector, 将旧selector中注册的channel和事件从新注册到新的selector, 而后将自身selector属性替换成新建立的selector

 

 

上一节: 优化selector

下一节: 处理IO事件

相关文章
相关标签/搜索