NioEventLoop的启动时机是在服务端的NioServerSocketChannel中的ServerSocketChannel初始化完成,且注册在NioEventLoop后执行的, 下一步就是去绑定端口,可是在绑定端口前,须要完成NioEventLoop的启动工做, 由于程序运行到这个阶段为止,依然只有MainThread一条线程,下面就开始阅读源码看NioEventLoop如何开启新的线程自立家门的java
总想说 NioEventLoop的总体结构,像极了这个图git
该图为,是我画的NioEventLoop启动的流程草图,很糙,可是不画它,总觉的少了点啥...github
NioEventLoop的继承体系图编程
NioEventLoop
的线程开启之路程序的入口是AbstractBootStrap, 这个抽象的启动辅助类, 找到它准备绑定端口的doBind0()
方法,下面是源码:数组
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // todo 此方法在触发 channelRegistered() 以前调用, 给用户一个机会,在 channelRegistered() 中设置pipeline // todo 这是 eventLoop启动的逻辑 , 下面的Runable就是一个 task任务, 什么任务的呢? 绑定端口 // todo 进入exeute() System.out.println("00000"); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // todo channel绑定端口而且添加了一个listenner channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
咱们关注上面的channel.execute(Runable)
方法, 若是咱们直接使用鼠标点击进去,会进入java.util.concurrent
包下的Executor
接口, 缘由是由于,它是NioEventLoop继承体系的超顶级接口,见上图, 咱们进入它的实现类,SingleThreadEventExcutor
, 也就是NioEventLoop
的间接父类, 源码以下:promise
// todo eventLoop事件循环里面的task,会在本类SingleThreadEventExecutor里面: execute() 执行 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } // todo 一样判断当前线程是否是 eventLoop里面的那条惟一的线程, 若是是的话, 就把当前任务放到任务队列里面等着当前的线程执行 // todo ,不是的话就开启新的线程去执行这个新的任务 // todo , eventLoop一辈子只会绑定一个线程,服务器启动时只有一条主线程,一直都是在作初始化的工做,并无任何一次start() // todo 因此走的是else, 在else中首先开启新的线程,然后把任务添加进去 boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { // todo 开启线程 , 进入查看 startThread(); // todo 把任务丢进队列 addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
如今执行这些代码的线程依然是主线程,主线程手上有绑定端口任务,可是它想把这个任务提交给NioEventLoop去执行,因而它就作出下面的判断安全
boolean inEventLoop = inEventLoop(); // 方法实现 @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
可是发现,主线程并非NioEventLoop惟一绑定的那个线程, 因而他就准备下面两件事:服务器
开启新线程的逻辑在下面,我删除了一些收尾,以及判断的代码,保留了主要的逻辑网络
private void doStartThread() { assert thread == null; // todo 断言线程为空, 而后才建立新的线程 executor.execute(new Runnable() { // todo 每次Execute 都是在使用 默认的线程工厂,建立一个线程并执行 Runable里面的任务 @Override public void run() { // todo 获取刚才建立出来的线程,保存在NioEventLoop中的 thread 变量里面, 这里其实就是在进行那个惟一的绑定 thread = Thread.currentThread(); updateLastExecutionTime(); try { // todo 实际启动线程, 到这里 NioEventLoop 就启动完成了 SingleThreadEventExecutor.this.run(); } }
主要作了两件事第一波高潮来了 1. 调用了NioEventLoop的线程执行器的execute
,这个方法的源码在下面,能够看到,excute,其实就是在建立线程, 线程建立完成后,当即把新建立出来的线程看成是NioEventLoop
相伴终生的线程;数据结构
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // todo 必须实现 Executor 里面惟一的抽象方法, execute , 执行性 任务 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
建立/绑定完成了新的线程后,第二波高潮来了, SingleThreadEventExecutor.this.run();
这行代码的意思是,调用本类的Run()
方法,这个Run()
方法就是真正在干活的事件循环,可是呢, 在本类中,Run()
是一个抽象方法,所以咱们要去找他的子类,那么是谁重写的这个Run()
呢? 就是NioEventLoop, 它根据本身需求,重写了这个方法
小结: 到如今,NioEventLoop
的线程已经开启了,下面的重头戏就是看他是如何进行事件循环的
NioEventLoop
的事件循环run()
咱们来到了NioEventLoop
的run()
, 他是个无限for循环, 主要完成了下面三件事
这是NioEventLoop
的run()
的源码,删除了部分注解和收尾工做,
/** * todo select() 检查是否有IO事件 * todo ProcessorSelectedKeys() 处理IO事件 * todo RunAllTask() 处理异步任务队列 */ @Override protected void run() { for (; ; ) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // todo 轮询IO事件, 等待事件的发生, 本方法下面的代码是处理接受到的感性趣的事件, 进入查看本方法 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // todo 默认50 // todo 若是ioRatio==100 就调用第一个 processSelectedKeys(); 不然就调用第二个 if (ioRatio == 100) { try { // todo 处理 处理发生的感性趣的事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 用于处理 本 eventLoop外的线程 扔到taskQueue中的任务 runAllTasks(); } } else {// todo 由于ioRatio默认是50 , 因此来else // todo 记录下开始的时间 final long ioStartTime = System.nanoTime(); try { // todo 处理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 根据处理IO事件耗时 ,控制 下面的runAllTasks执行任务不能超过 ioTime 时间 final long ioTime = System.nanoTime() - ioStartTime; // todo 这里面有聚合任务的逻辑 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } } }
下面进入它的select()
,咱们把select()
称做: 基于deadline的任务穿插处理逻辑
下面直接贴出它的源码:下面的代码中我写了一些注解了, 主要是分以下几步走
selector.selectNow();
直接退出selector.selectNow();
直接退出select(time)
,默认是1秒这时可会会出现空轮询的Bug// todo 循环接受IO事件 // todo 每次进行 select() 操做时, oldWakenUp被标记为false private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { ///todo ----------------------------------------- 以下部分代码, 是 select()的deadLine及任务穿插处理逻辑----------------------------------------------------- // todo selectCnt这个变量记录了 循环 select的次数 int selectCnt = 0; // todo 记录当前时间 long currentTimeNanos = System.nanoTime(); // todo 计算出估算的截止时间, 意思是, select()操做不能超过selectDeadLineNanos这个时间, 不让它一直耗着,外面也可能有任务等着当前线程处理 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // -------for 循环开始 ------- for (; ; ) { // todo 计算超时时间 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {// todo 若是超时了 , 而且selectCnt==0 , 就进行非阻塞的 select() , break, 跳出for循环 if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // todo 判断任务队列中时候还有别的任务, 若是有任务的话, 进入代码块, 非阻塞的select() 而且 break; 跳出循环 //todo 经过cas 把线程安全的把 wakenU设置成true表示退出select()方法, 已进入时,咱们设置oldWakenUp是false if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } ///todo ----------------------------------------- 如上部分代码, 是 select()的deadLine及任务穿插处理逻辑----------------------------------------------------- ///todo ----------------------------------------- 以下, 是 阻塞式的select() ----------------------------------------------------- // todo 上面设置的超时时间没到,并且任务为空,进行阻塞式的 select() , timeoutMillis 默认1 // todo netty任务,如今能够放心大胆的 阻塞1秒去轮询 channel链接上是否发生的 selector感性的事件 int selectedKeys = selector.select(timeoutMillis); // todo 表示当前已经轮询了SelectCnt次了 selectCnt++; // todo 阻塞完成轮询后,立刻进一步判断 只要知足下面的任意一条. 也将退出无限for循环, select() // todo selectedKeys != 0 表示轮询到了事件 // todo oldWakenUp 当前的操做是否须要唤醒 // todo wakenUp.get() 可能被外部线程唤醒 // todo hasTasks() 任务队列中又有新任务了 // todo hasScheduledTasks() 当时定时任务队列里面也有任务 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } ///todo ----------------------------------------- 如上, 是 阻塞式的select() ----------------------------------------------------- if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } // todo 每次执行到这里就说明,已经进行了一次阻塞式操做 ,而且尚未监听到任何感兴趣的事件,也没有新的任务添加到队列, 记录当前的时间 long time = System.nanoTime(); // todo 若是 当前的时间 - 超时时间 >= 开始时间 把 selectCnt设置为1 , 代表已经进行了一次阻塞式操做 // todo 每次for循环都会判断, 当前时间 currentTimeNanos 不能超过预订的超时时间 timeoutMillis // todo 可是,如今的状况是, 虽然已经进行了一次 时长为timeoutMillis时间的阻塞式select了, // todo 然而, 我执行到当前代码的 时间 - 开始的时间 >= 超时的时间 // todo 可是 若是 当前时间- 超时时间< 开始时间, 也就是说,并无阻塞select, 而是当即返回了, 就代表这是一次空轮询 // todo 而每次轮询 selectCnt ++; 因而有了下面的判断, if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && // todo selectCnt若是大于 512 表示cpu确实在空轮询, 因而rebuild Selector selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); // todo 它的逻辑建立一个新的selectKey , 把老的Selector上面的key注册进这个新的selector上面 , 进入查看 rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. // todo 解决了Select空轮询的bug selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //// -----------for 循环结束 -------------- if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
咱们能够看到,上面的run()
方法,通过两次判断后进入了指定时长的阻塞式轮询,而咱们常说的空轮询bug,指的就是原本该阻塞住轮询,可是却直接返回了, 在这个死循环中,它的畅通执行极可能使得CPU的使用率飙升, 因而把这种状况说是jdk的selector的空轮询的bug
一个分支语句 if(){}else{}
, 首先他记录下,如今执行判断时的时间, 而后用下面的公式判断
当前的时间t1 - 预订的deadLine截止时间t2 >= 开始进入for循环的时间t3
咱们想, 若是说,上面的阻塞式select(t2)
没出现任何问题,那么 我如今来检验是否出现了空轮询是时间t1 = t2+执行其余代码的时间, 若是是这样, 上面的等式确定是成立的, 等式成立说没bug, 顺道把selectCnt = 1;
可是若是出现了空轮询,select(t2)
并无阻塞,而是之间返回了, 那么如今的时间 t1 = 0+执行其余代码的时间, 这时的t1相对于上一个没有bug的大小,明显少了一个t2, 这时再用t1-t2 均可能是一个负数, 等式不成立,就进入了else的代码块, netty接着判断,是不是真的在空轮询, 若是说循环的次数达到了512次, netty就肯定真的出现了空轮询, 因而nettyrebuild()
Selector ,重新开启一个Selector, 循环老的Selector上面的上面的注册的时间,从新注册进新的 Selector上,用这个中替换Selector的方法,解决了空轮询的bug
selectedkeys
中的?ok, run()
的三部曲第一步轮询已经完成了, 下一步就是处理轮询出来的感兴趣的IO事件,processSelectedKeys()
,下面咱们进入这个方法, 若是这个selectedKeys不为空,就进去processSelectedKeysOptimized();
继续处理IO事件,
比较有趣的是,这个selectedKeys是谁? ,别忘了咱们是在NioEventLoop
中,是它开启了Selector,也是他使用反射的手段将Selector,存放感兴趣事件的HashSet集合替换成了SelectedSelectionKeySet
这个名叫set,实为数组的数据结构, 当时的状况以下:
SelectedSelectionKeySet
的实例 selectedKeySet
unwrappedSelector
中的 selectedKeysField
字段,替换成 selectedKeySet
selectedKeys = selectedKeySet;
看到第三步没? 也就是说,咱们如今再想获取装有感兴趣Key的 HashSet集合,已经不可能了,取而代之的是更优秀的selectedKeySet
,也就是下面咱们使用的selectedKeys
,因而咱们想处理感性趣的事件,直接从selectedKeys
中取, Selector轮询到感兴趣的事件,也会直接往selectedKeys
中放
private void processSelectedKeys() { // todo selectedKeys 就是通过优化后的keys(底层是数组) if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
下面接着跟进processSelectedKeysOptimized();
,关于这个方法的有趣的地方,我写在这段代码的下面
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // todo 数组输出空项, 从而容许在channel 关闭时对其进行垃圾回收 // See https://github.com/netty/netty/issues/2363 // todo 数组中当前循环对应的keys质空, 这种感兴趣的事件只处理一次就行 selectedKeys.keys[i] = null; // todo 获取出 attachment,默认状况下就是注册进Selector时,传入的第三个参数 this===> NioServerSocketChannel // todo 一个Selector中可能被绑定上了成千上万个Channel, 经过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理 final Object a = k.attachment(); // todo if (a instanceof AbstractNioChannel) { // todo 进入这个方法, 传进入 感兴趣的key + NioSocketChannel processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
NioEventLoop
是如何在千百条channel中,精确获取出现指定感兴趣事件的channel的?上面这个方法,就是在真真正正的处理IO事件, 看看这段代码, 咱们发现了这样一行代码
final Object a = k.attachment();
而且,判断出Key的类型后,执行处理逻辑的代码中的入参都是同样的processSelectedKey(a,k)
, 这是在干什么呢?
其实,咱们知道,每一个NioEventLoop
开始干活后,会有不少客户端的链接channel前来和它创建链接,一个事件循环同时为多条channel服务,并且一条channel的整个生命周期都只和一个NioEventLoop关联
如今好了,事件循环的选择器轮询出了诸多的channel中有channel出现了感兴趣的事件,下一步处理这个事件的前提得知道,到底是哪一个channel?
使用的attachment特性,早在Channel注册进Selector时,进存放进去了,下面是Netty中,Channel注册进Selector的源码
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // todo javaChannel() -- 返回SelectableChanel 可选择的Channel,换句话说,能够和Selector搭配使用,他是channel体系的顶级抽象类, 实际的类型是 ServerSocketChannel // todo eventLoop().unwrappedSelector(), -- > 获取选择器, 如今在AbstractNioChannel中 获取到的eventLoop是BossGroup里面的 // todo 到目前看, 他是把ServerSocketChannel(系统建立的) 注册进了 EventLoop的选择器 // todo 到目前为止, 虽然注册上了,可是它不关心任何事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) {
这里的 最后一个参数是 this是当前的channel , 意思是把当前的Channel当成是一个 attachment(附件) 绑定到selector上 做用以下:
ok, 如今就捋清楚了,挖坑,填坑的过程; 下面进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)
执行IO任务, 源码以下: 咱们能够看到,具体的处理IO的任务都是用Channel的内部类unSafe()完成的, 到这里就不往下跟进了, 后续写新博客连载
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // todo 这个unsafe 也是可channel 也是和Channel进行惟一绑定的对象 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // todo 确保Key的合法 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { // todo 确保多线程下的安全性 return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } // todo NioServerSocketChannel和selectKey都合法的话, 就进入下面的 处理阶段 try { // todo 获取SelectedKey 的 关心的选项 int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // todo 在read() write()以前咱们须要调用 finishConnect() 方法, 不然 NIO JDK抛出异常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps( ); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // todo 一样是检查 readOps是否为零, 来检查是否出现了 jdk 空轮询的bug if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
上面的处理IO事件结束后,第三波高潮就来了,处理任务队列中的任务, runAllTask(timeOutMinils)
, 他也是有生命时长限制的 deadline, 它主要完成了以下的几步:
源码以下:
protected boolean runAllTasks(long timeoutNanos) { // todo 聚合任务, 会把定时任务放入普通的任务队列中 进入查看 fetchFromScheduledTaskQueue(); // todo 从普通的队列中拿出一个任务 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } // todo 计算截止时间, 表示任务的执行,最好别超过这个时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; // todo for循环执行任务 for (;;) { // todo 执行任务, 方法里调用 task.run(); safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. // todo 由于 nanoTime();的执行也是个相对耗时的操做,所以没执行完64个任务后,检查有没有超时 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // todo 拿新的任务 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } // todo 每一个任务执行结束都有个收尾的构造 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
NioEventLoop
如何聚合任务?聚合任务就是把已经到执行时间的任务从定时任务队列中所有取出 ,放入普通任务队列而后执行, 咱们进入上的第一个方法fetchFromScheduledTaskQueue
,源码以下,
private boolean fetchFromScheduledTaskQueue() { // todo 拉取第一个聚合任务 long nanoTime = AbstractScheduledEventExecutor.nanoTime(); // todo 从任务丢列中取出 截止时间是 nanoTime的定时任务 , // todo 往定时队列中添加 ScheduledFutureTask任务, 排序的基准是 ScheduledFutureTask 的compare方法,按照时间,从小到大 // todo 因而当咱们发现队列中的第一个任务,也就是截止时间最近的任务的截止时间比咱们的 Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { // todo scheduledTask != null表示定时任务该被执行了, 因而将定时任务添加到 普通任务队列 if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. // todo 若是添加失败了, 把这个任务重新放入到定时任务队列中, 再尝试添加 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } // todo 循环,尝试拉取定时任务 , 循环结束后,全部的任务所有会被添加到 task里面 scheduledTask = pollScheduledTask(nanoTime); } return true; }
根据指定的截止时间,从定时任务队列中取出任务,定时任务队列中任务按照时间排序,时间越短的,排在前面, 时间相同,按照添加的顺序排序, 如今的任务就是检查定时任务队列中任务,尝试把里面的任务挨个取出来,因而netty使用这个方法Runnable scheduledTask = pollScheduledTask(nanoTime);
而后立刻在while(){}
循环中判断是否存在, 这个方法实现源码以下, 不难看出,他是在根据时间判断
/** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}. * todo 根据给定的纳秒值,返回 Runable定时任务 , 而且,每次使用都要冲洗使用是nanoTime() 来矫正时间 */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } // todo 若是定时任务的截止时间<= 咱们穿进来的时间, 就把他返回 if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } // todo 不然返回kong,表示当前全部的定时任务都没到期, 没有能够执行的 return null; }
通过循环以后,到期的任务,全被添加到 taskQueue里面了,下面就是执行TaskQueue里面的任务
safeExecute(task);
方法,执行任务队列中的任务
源码以下: 实际上就行执行了 task这个Runable的Run方法
/** * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}. */ protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } }
总结一下: 到如今为止,EventLoop已经启动了, 一说到NioEventLoop老是想起上图, 如今他能够接受新的链接接入,轮询,处理任务...