Netty源码分析第二章: NioEventLoophtml
第八节: 执行任务队列oop
继续回到NioEventLoop的run()方法:源码分析
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //轮询io事件(1)
select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默认是50
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //记录下开始时间
final long ioStartTime = System.nanoTime(); try { //处理轮询到的key(2)
processSelectedKeys(); } finally { //计算耗时
final long ioTime = System.nanoTime() - ioStartTime; //执行task(3)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代码省略
} }
咱们看处处理完轮询到的key以后, 首先记录下耗时, 而后经过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务学习
咱们知道ioRatio默认是50, 因此执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:fetch
跟进runAllTasks方法:优化
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务
fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务
Runnable task = pollTask(); //task为空, 则直接返回
if (task == null) { //跑完全部的任务执行收尾的操做
afterRunningAllTasks(); return false; } //若是队列不为空 //首先算一个截止时间(+50毫秒, 由于执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每个任务
for (;;) { safeExecute(task); //标记当前跑完的任务
runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) { break; } } //若是没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask(); //直到没有任务执行
if (task == null) { //记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到能够执行的任务添加到taskQueue中this
咱们跟进fetchFromScheduledTaskQueue()方法:spa
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务
Runnable scheduledTask = pollScheduledTask(nanoTime); //若是该定时任务队列不为空, 则塞到普通任务队列里面
while (scheduledTask != null) { //若是添加到普通任务队列过程当中失败
if (!taskQueue.offer(scheduledTask)) { //则从新添加到定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成以后, 全部符合运行条件的定时任务队列, 都添加到了普通任务队列中
scheduledTask = pollScheduledTask(nanoTime); } return true; }
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 表明从定时任务初始化到如今过去了多长时间日志
Runnable scheduledTask= pollScheduledTask(nanoTime) 表明从定时任务队列中拿到小于nanoTime时间的任务, 由于小于初始化到如今的时间, 说明该任务须要执行了netty
跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定时任务队列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一个任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //从队列中删除
scheduledTaskQueue.remove(); //返回该任务
return scheduledTask; } return null; }
咱们看到首先得到当前类绑定的定时任务队列的成员变量
若是不为空, 则经过scheduledTaskQueue.peek()弹出第一个任务
若是当前任务小于传来的时间, 说明该任务须要执行, 则从定时任务队列中删除
咱们继续回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务
Runnable scheduledTask = pollScheduledTask(nanoTime); //若是该定时任务队列不为空, 则塞到普通任务队列里面
while (scheduledTask != null) { //若是添加到普通任务队列过程当中失败
if (!taskQueue.offer(scheduledTask)) { //则从新添加到定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成以后, 全部符合运行条件的定时任务队列, 都添加到了普通任务队列中
scheduledTask = pollScheduledTask(nanoTime); } return true; }
弹出须要执行的定时任务以后, 咱们经过taskQueue.offer(scheduledTask)添加到taskQueue中, 若是添加失败, 则经过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)从新添加到定时任务队列中
若是添加成功, 则经过pollScheduledTask(nanoTime)方法继续添加, 直到没有须要执行的任务
这样就将定时任务队列须要执行的任务添加到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中:
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务
fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务
Runnable task = pollTask(); //task为空, 则直接返回
if (task == null) { //跑完全部的任务执行收尾的操做
afterRunningAllTasks(); return false; } //若是队列不为空 //首先算一个截止时间(+50毫秒, 由于执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每个任务
for (;;) { safeExecute(task); //标记当前跑完的任务
runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) { break; } } //若是没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask(); //直到没有任务执行
if (task == null) { //记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先经过 Runnable task = pollTask() 从taskQueue中拿一个任务
任务不为空, 则经过 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间, 任务的执行时间不能超过这个时间
而后在for循环中经过safeExecute(task)执行task
咱们跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) { try { //直接调用run()方法执行
task.run(); } catch (Throwable t) { //发生异常不终止
logger.warn("A task raised an exception. Task: {}", task, t); } }
这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 表明发生异常不终止, 继续往下执行
回到runAllTasks(long timeoutNanos)方法:
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务
fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务
Runnable task = pollTask(); //task为空, 则直接返回
if (task == null) { //跑完全部的任务执行收尾的操做
afterRunningAllTasks(); return false; } //若是队列不为空 //首先算一个截止时间(+50毫秒, 由于执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每个任务
for (;;) { safeExecute(task); //标记当前跑完的任务
runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) { break; } } //若是没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask(); //直到没有任务执行
if (task == null) { //记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
每次执行完task, runTasks自增
这里 if ((runTasks & 0x3F) == 0) 表明是否执行了64个任务, 若是执行了64个任务, 则会经过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到如今的时间, 若是这个时间超过了截止时间, 则退出循环
若是没有超过截止时间, 则经过 task = pollTask() 继续弹出任务执行
这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要缘由是由于获取系统时间是个比较耗时的操做, 这里是netty的一种优化方式
若是没有task须要执行, 则经过afterRunningAllTasks()作收尾工做, 最后记录下最后的执行时间
以上就是有关执行任务队列的相关逻辑
第二章总结
本章学习了有关NioEventLoopGroup的建立, NioEventLoop的建立和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 经过本章学习, 咱们应该掌握以下内容:
1. NioEventLoopGroup如何选择分配NioEventLoop
2. NioEventLoop如何开启
3. NioEventLoop如何进行select操做
4. NioEventLoop如何执行task