本文是源起netty
专栏的第4篇文章,很明显前3篇文章已经在偏离主题的道路上愈来愈远。因而乎,我决定:继续保持……前端
首先看看源码类注释中的示例(未改变官方示例逻辑,只是增长了print输出和注释)java
import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class ScheduleExecutorServiceDemo { private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); public static void main(String args[]){ final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } }; //从0s开始输出beep,间隔1s final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS); //10s以后中止beeperHandle的疯狂输出行为 scheduler.schedule(new Runnable() { public void run() { System.out.println("觉悟吧,beeperHandle!I will kill you!"); beeperHandle.cancel(true); } }, 10, TimeUnit.SECONDS); } }
scheduleAtFixedRate
也是该类经常使用的打开方式之一,网上不少文章会拿该方法与scheduleWithFixedDelay
进行对比,对比结果其实和方法名一致:后端
scheduleAtFixedRate //以固定频率执行 scheduleWithFixedDelay //延迟方式执行,间隔时间=间隔时间入参+任务执行时间
ScheduleExecutorService实则是Timer
的进化版,主要改进了Timer单线程方面的弊端,改进方式天然是线程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor
华丽丽登场。其实ScheduledThreadPoolExecutor才是主角,ScheduleExecutorService扮演的是抛砖引玉中的砖……源码分析
先看下ScheduledThreadPoolExecutor类的江湖地位:this
既然继承自ThreadPoolExecutor,确乃线程池无疑。spa
本文以以下方法做为切入点:public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
线程
方法入参period
(译:周期)就是scheduleAtFixedRate所指的固定频率吗?
这个问题很好验证,把示例中这部分代码的注释去掉就能获得答案。调试
final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } };
答案就是,若是方法执行时间大于间隔周期period,则任务的下次执行时间将超过period的设定!rest
执行结果以下,能够看出任务间隔为3s,而不是period设置的1snetty
不由好奇,ScheduleExecutorService是怎么实现的多长时间以后执行下一个任务?有句话叫源码之下无秘密,so..let's do this !
从ScheduleExecutorService的初始化开始:
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
追随调用链Executors.newScheduledThreadPool
-> new ScheduledThreadPoolExecutor(corePoolSize)
,进入以下方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); //注意最后一个参数 }
线程池中的任务队列用的new DelayedWorkQueue()
,而DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类。
初始化部分关注到这一点便可,以后会是一些成员变量的赋值,不做解释。
接下来从scheduleAtFixedRate方法开始,进入它的实现方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
Runnable command
被封装成了ScheduledFutureTask
类,无独有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另一个内部类。看下它的类关系图:
有没有发现ScheduledFutureTask实现了Comparable
接口?众所周知这个接口是以某种规则用来比较大小的,这里的规则就是任务的开始执行时间——ScheduledFutureTask的一个属性:
/** The time the task is enabled to execute in nanoTime units */ private long time;
compareTo
方法就是明证:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; //focus这里啊喂!!! if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
通常来讲,这些比较(compare)放在集合中才有意义,那ScheduledFutureTask以后会放在哪一个集合中吗?有些朋友可能已经猜到了,没错,ScheduledFutureTask后续会置于前文提到的DelayedWorkQueue中。
继续ScheduledThreadPoolExecutor.scheduleAtFixedRate
方法:
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); //醒醒,该你出场了
进入delayedExecute
方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //代码一 - 任务加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //代码二 - 任务开始 } }
追踪 代码一 位置的调用链:
-> DelayedWorkQueue.add
-> offer
-> siftUp(int k, RunnableScheduledFuture<?> key)
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
能够看到,siftUp方法实现了向DelayedWorkQueue添加任务时(add),开始时间靠后的任务(ScheduledFutureTask)会放在后面。
ok,回到 代码二 位置的ensurePrestart
方法,接着追:ensurePrestart
-> addWorker(Runnable firstTask, boolean core)
浓缩版addWorker方法以下:
private boolean addWorker(Runnable firstTask, boolean core){ ... //省略不少的验证逻辑 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try{ w = new Worker(firstTask); //代码三 - 封装成worker,new Worker会从线程池中获取线程 final Thread t = w.thread; if (t != null){ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); ... //省略部分状态控制逻辑 if (workerAdded){ t.start(); //代码四 - 执行Worker的run方法 workerStarted = true; } } }finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这里发现firstTask(ScheduledFutureTask)再次被封装成了Worker
(代码三),那么t.start()
(代码四),天然会执行Worker的run方法,跟下Worker.run
方法:Worker.run
-> runWorker(Worker w)
浓缩后的runWorker
:
final void runWorker(Worker w){ ... //省略部分代码 try{ while (task != null || (task = getTask()) != null){ //代码五 - getTask()获取任务 ... //省略部分代码 task.run(); //代码六 - 任务执行 ... //省略部分代码 } completedAbruptly = false; }finally{ processWorkerExit(w, completedAbruptly); } }
老规矩,5、六两处关键代码分别看一下:
getTask
最终定位到DelayedWorkQueue.take
方法,这里只分析延时任务的执行状况public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) //代码八 - leader线程就是下一次的工做线程 available.await(); else { Thread thisThread = Thread.currentThread(); //代码七 - 指定leader线程 leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
对于延时任务来讲,线程池中第一个调用take的线程进来会做为leader线程(代码七),而后等待。结束等待的位置在哪?在ScheduledFutureTask.run
的调用中!(我做断点调试的时候,这个等待时间老是很大,通常两个小时以上,彷佛直接用await就成?这一点确有疑问)。
而线程池中的其它线程调用take时,发现leader已经被第一个线程抢了,只能等着(代码八)
task.run()
也就是ScheduledFutureTask.run
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //对于延时任务,会进入这个分支 setNextRunTime(); reExecutePeriodic(outerTask); } }
对于延时任务,会执行ScheduledFutureTask.super.runAndReset()
:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { //代码九 - 阻塞式等待beeper完成 c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
runAndReset方法会等待最初定义的beeper逻辑执行完成(代码九),这也解释了为何scheduleAtFixedRate
的下次任务执行时间会有可能超过参数period
的设定!
而后调用reExecutePeriodic
:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); //队列中再次加入任务 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); //再次回到ensurePrestart方法 } }
reExecutePeriodic
方法看上去是否是似曾相识,与本小节(3.延时执行)开端的delayedExecute
方法对比下:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //任务加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //任务开始 } }
都是加入队列,而后任务开始!
而DelayedWorkQueue.add
中到底作了什么?以前没有分析,在这里看一下:DelayedWorkQueue.add
-> offer
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; //将leader赋值清除 available.signal(); //代码十 - 通知线程 } } finally { lock.unlock(); } return true; }
能够看到,就是在offer
方法(代码十),将DelayedWorkQueue.take
中的available.awaitNanos(delay)
唤醒了!
是否是已经绕晕了?很正常,由于源码终归是须要本身去读个几遍才能理清整个脉络。因此老铁们,加油!
最后的总结仍是不能缺乏的,一个定时任务的执行流程是这样的:
1.任务开始时,将任务ScheduledFutureTask
放入队列DelayedWorkQueue
。任务放入过程会计算该任务的开始执行时间,执行时间靠前的放入队列的前端,执行时间靠后的放入队列的后端。
2.以后的ensurePrestart
方法,先从线程池中获取线程,该线程会从队列DelayedWorkQueue
中获取ScheduledFutureTask
。
获取过程DelayedWorkQueue.take
先计算任务的延时时间delay
,有两种状况:
delay>0 须要延时,出现以下局面:
long delay = first.getDelay(NANOSECONDS); //计算延时时间delay //已不须要延时,当即获取任务 if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting //须要延时的任务(与此同时有任务正在执行) if (leader != null) //其它线程进来时,有leader线程存在了,等待 available.await(); else { Thread thisThread = Thread.currentThread(); //第一个进入这里的线程会成为leader leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } }
3.获取任务后,进入执行环节Worker.run
-> ScheduledFutureTask.run
。执行过程会阻塞式等待任务完成,这也是任务执行时间可能会超过period的缘由!任务执行结束会再次放入任务,这样又回到步骤1,反复执行。