在以前的博文--ThreadPoolExecutor源码解读已经对ThreadPoolExecutor的实现原理与源码进行了分析。ScheduledExecutorService也是咱们在开发中常常会用到的一种ExecutorService,JDK中它的默认实现类为ScheduledThreadPoolExecutor。本文针对ScheduledThreadPoolExecutor的设计原理与实现源码进行分析解读。html
首先来看一下ScheduledThreadPoolExecutor的继承关系。
java
这里有必要来介绍一下ScheduledExecutorService接口。
ScheduledExecutorService自己继承了ExecutorService接口,并为调度任务额外提供了两种模式数组
咱们知道RunnableFuture接口是ThreadPoolExecutor对内和对外的桥梁。对内它的形态是Runnable来执行任务,对外它的形态是Future。
那么对于ScheduledThreadPoolExecutor来讲,RunnableScheduledFuture是它的内外桥梁,对内形态为Runnable,对外形态为ScheduledFuture。安全
ScheduledFutureTask是ScheduledThreadPoolExecutor对于RunnableScheduledFuture的默认实现,而且继承了FutureTask。
它覆盖了FutureTask的run方法来实现对延时执行、周期执行的支持,简单来讲它的套路就是对于延时任务则调用FutureTask#run而对于周期性任务则调用FutureTask#runAndReset而且在成功以后根据fixed-delay/fixed-rate模式来设置下次执行时间并从新将任务塞到工做队列中。
对于ScheduledFutureTask#run方法来讲它并不须要关心run的时候是否到了能够执行的时间,由于这个职责会由ScheduledThreadPoolExecutor中的工做队列来完成,以保证只有在任务能够被执行的时候才会被Worker线程从队列中取出。并发
DelayedWorkQueue是ScheduledThreadPoolExecutor中阻塞队列的实现,它内部使用了小根堆来使得自身具备优先队列的功能,而且经过Leader/Follower模式避免线程没必要要的等待。
从DelayedWorkQueue中取出任务时,任务必定已经至少到了能够被执行的时间。ide
分析ScheduledThreadPoolExecutor的源码,主要会分红三个部分:ScheduledFutureTask, DelayedWorkQueue以及ScheduledThreadPoolExecutor自己。源码分析
ScheduledFutureTask是ScheduledThreadPoolExecutor中的一个内部类。
咱们能够看到,它的接口继承线大致是两条:RunnableFuture和ScheduledFuture,而RunnableScheduledFuture是二者的合体。this
outerTask的主要目的就是让周期任务在第二次及以后的运行时跑的都是decorateTask返回的包装结果。线程
ScheduledFutureTask一般状况下就是线程池中Worker线程拿到的Runnable对象。注意这里说的是一般状况,由于ScheduledThreadPoolExecutor容许咱们经过decorateTask方法包装原先的ScheduledFutureTask,相比之下这并不常见。设计
public void run() { // 是否周期性,就是判断period是否为0。 boolean periodic = isPeriodic(); // 检查任务是否能够被执行。 if (!canRunInCurrentRunState(periodic)) cancel(false); // 若是非周期性任务直接调用run运行便可。 else if (!periodic) ScheduledFutureTask.super.run(); // 若是成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。 else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 须要从新将任务(outerTask)放到工做队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor自己API时说起。 reExecutePeriodic(outerTask); } } private void setNextRunTime() { long p = period; /* * fixed-rate模式,时间设置为上一次时间+p。 * 提一句,这里的时间其实只是能够被执行的最小时间,不表明到点就要执行。 * 若是此次任务还没执行完是确定不会执行下一次的。 */ if (p > 0) time += p; /** * fixed-delay模式,计算下一次任务能够被执行的时间。 * 简单来讲差很少就是当前时间+delay值。由于代码走到这里任务就已经结束了,now()能够认为就是任务结束时间。 */ else time = triggerTime(-p); } long triggerTime(long delay) { /* * 若是delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。 * * 不然为了不队列中出现因为溢出致使的排序紊乱,须要调用overflowFree来修正一下delay(若是有必要的话)。 */ return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * 主要就是有这么一种状况: * 某个任务的delay为负数,说明当前能够执行(其实早该执行了)。 * 工做队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。 * * 那么就有可能出现一个delay为正数,减去另外一个为负数的delay,结果上溢为负数,则会致使compareTo产生错误的结果。 * * 为了特殊处理这种状况,首先判断一下队首的delay是否是负数,若是是正数不用管了,怎么减都不会溢出。 * 不然能够拿当前delay减去队首的delay来比较看,若是不出现上溢,则整个队列都ok,排序不会乱。 * 否则就把当前delay值给调整为Long.MAX_VALUE + 队首delay。 */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
public boolean cancel(boolean mayInterruptIfRunning) { // 先调用父类FutureTask#cancel来取消任务。 boolean cancelled = super.cancel(mayInterruptIfRunning); /* * removeOnCancel开关用于控制任务取消后是否应该从队列中移除。 * * 若是已经成功取消,而且removeOnCancel开关打开,而且heapIndex >= 0(说明仍然在队列中), * 则从队列中删除该任务。 */ if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; }
DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工做队列。它内部维护了一个小根堆,根据任务的执行开始时间来维护任务顺序。但不一样的地方在于,它对于ScheduledFutureTask类型的元素额外维护了元素在队列中堆数组的索引,用来实现快速取消。DelayedWorkQueue用了ReentrantLock+Condition来实现管程保证数据的线程安全性。
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) // 容量扩增50%。 grow(); size = i + 1; // 第一个元素,其实这里也能够统一进行sift-up操做,不必特判。 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 插入堆尾。 siftUp(i, e); } // 若是新加入的元素成为了堆顶,则原先的leader就无效了。 if (queue[0] == e) { leader = null; // 因为原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。 available.signal(); } } finally { lock.unlock(); } return true; }
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { /* * 循环读取当前堆中最小也就执行开始时间最近的任务。 * 若是当前队列为空无任务,则在available条件上等待。 * 不然若是最近任务的delay<=0则返回这个任务以执行,不然的话根据是否能够做为leader分类: * 若是能够做为leader,则根据delay进行有时限等待。 * 不然无限等待直至被leader唤醒。 */ for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 若是当前队列无元素,则在available条件上无限等待直至有任务经过offer入队并唤醒。 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); // 若是delay小于0说明任务该马上执行了。 if (delay <= 0) // 从堆中移除元素并返回结果。 return finishPoll(first); /* * 在接下来等待的过程当中,first应该清为null。 * 由于下一轮从新拿到的最近须要执行的任务极可能已经不是这里的first了。 * 因此对于接下来的逻辑来讲first已经没有任何用处了,不应持有引用。 */ first = null; // 若是目前有leader的话,当前线程做为follower在available条件上无限等待直至唤醒。 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { /* * 若是从available条件中被唤醒当前线程仍然是leader,则清空leader。 * * 分析一下这里不等的状况: * 1. 原先thisThread == leader, 而后堆顶更新了,leader为null。 * 2. 堆顶更新,offer方法释放锁后,有其它线程经过take/poll拿到锁,读到leader == null,而后将自身更新为leader。 * * 对于这两种状况统一的处理逻辑就是只要leader为thisThread,则清leader为null用以接下来判断是否须要唤醒后继线程。 */ if (leader == thisThread) leader = null; } } } } } finally { /* * 若是当前堆中无元素(根据堆顶判断)则直接释放锁。 * * * 不然若是leader有值,说明当前线程必定不是leader,当前线程不用去唤醒后续等待线程。 * 不然由当前线程来唤醒后继等待线程。不过这并不表明当前线程原来是leader。 */ if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
因为和take方法套路差很少,这里不展开细讲了。
ScheduledThreadPoolExecutor支持任务取消的时候快速从队列中移除,由于大部分状况下队列中的元素是ScheduledFutureTask类型,内部维护了heapIndex也即在堆数组中的索引。
堆移除一个元素的时间复杂度是O(log n),前提是咱们须要知道待删除元素在堆数组中的位置。若是咱们不维护heapIndex则须要遍历整个堆数组来定位元素在堆数组的位置,这样光是扫描一次堆数组复杂度就O(n)了。而维护了heapIndex,就能够以O(1)的时间来确认位置,从而能够更快的移除元素。
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); /* * 堆的删除某个元素操做就是将最后一个元素移到那个元素。 * 这时候有可能须要向上调整堆,也可能须要向下维护。 * * 对于小根堆而言,若是移过去后比父元素小,则须要向上维护堆结构, * 不然将左右两个子节点中较小值与当前元素比较,若是当前元素较大,则须要向下维护堆结构。 */ int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; // 若是参数x就是堆数组中最后一个元素则删除操做已经完毕了。 if (s != i) { // 尝试向下维护堆。 siftDown(i, replacement); // 相等说明replacement比子节点都要小,尝试向上维护堆。 if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } } private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; // 再次判断i确实是本线程池的,由于remove方法的参数x彻底能够是个其它池子里拿到的ScheduledFutureTask。 if (i >= 0 && i < size && queue[i] == x) return i; } else { for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; }
在了解了ScheduledFutureTask与DelayedWorkQueue以后最后再来看ScheduledThreadPoolExecutor自己的方法,就显得容易不少。
这里咱们来介绍一些ScheduledThreadPoolExecutor以及父类ThreadPoolExecutor中的方法。
这个方法在任务提交时,任务运行时都会被调用以校验当前状态是否能够运行任务。
boolean canRunInCurrentRunState(boolean periodic) { /* * isRunningOrShutdown的参数为布尔值,true则表示shutdown状态也返回true,不然只有running状态返回ture。 * 若是为周期性任务则根据continueExistingPeriodicTasksAfterShutdown来判断是否shutdown了仍然能够执行。 * 不然根据executeExistingDelayedTasksAfterShutdown来判断是否shutdown了仍然能够执行。 */ return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
ScheduledThreadPoolExecutor任务提交的入口方法主要是execute, schedule, scheduleAtFixedRate以及scheduleWithFixedDelay这几类。
/** * 覆盖了父类execute的实现,以零延时任务的形式实现。 */ public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // 包装ScheduledFutureTask。 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // fixed-rate模式period为正数。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 包装ScheduledFutureTask,默认返回自己。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 将构造出的ScheduledFutureTask的outerTask设置为通过包装的结果。 sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // fixed-delay模式delay为正数。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); // 包装ScheduledFutureTask,默认返回自己。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 将构造出的ScheduledFutureTask的outerTask设置为通过包装的结果。 sft.outerTask = t; delayedExecute(t); return t; }
private void delayedExecute(RunnableScheduledFuture<?> task) { // 非RUNNING态,根据饱和策略处理任务。 if (isShutdown()) reject(task); else { // 往work queue中插入任务。 super.getQueue().add(task); /* * 检查任务是否能够被执行。 * 若是任务不该该被执行,而且从队列中成功移除的话(说明没被worker拿取执行),则调用cancel取消任务。 */ if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) // 参数中false表示不试图中断执行任务的线程。 task.cancel(false); else ensurePrestart(); } } /** * 这是父类ThreadPoolExecutor的方法用于确保有worker线程来执行任务。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); // worker数目小于corePoolSize,则添加一个worker。 if (wc < corePoolSize) addWorker(null, true); // wc==orePoolSize==0的状况也添加一个worker。 else if (wc == 0) addWorker(null, false); }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 塞到工做队列中。 super.getQueue().add(task); // 再次检查是否能够执行,若是不能执行且任务还在队列中未被取走则取消任务。 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
onShutdown方法是ThreadPoolExecutor的一个钩子方法,会在shutdown方法中被调用,默认实现为空。而ScheduledThreadPoolExecutor覆盖了此方法用于删除并取消工做队列中的不须要再执行的任务。
@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); // shutdown是否仍然执行延时任务。 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // shutdown是否仍然执行周期任务。 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 若是二者皆不可则对队列中全部RunnableScheduledFuture调用cancel取消并清空队列。 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; /* * 不须要执行的任务删除并取消。 * 已经取消的任务也须要从队列中删除。 * 因此这里就判断下是否须要执行或者任务是否已经取消。 */ if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } // 由于任务被从队列中清理掉,因此这里须要调用tryTerminate尝试跃迁executor的状态。 tryTerminate(); }
本文介绍了ScheduledThreadPoolExecutor的原理与源码实现。
ScheduledThreadPoolExecutor内部使用了ScheduledFutureTask来表示任务,即便对于execute方法也将其委托至schedule方法,以零延时的形式实现。同时ScheduledThreadPoolExecutor也容许咱们经过decorateTask方法来包装任务以实现定制化的封装。
而ScheduledThreadPoolExecutor内部使用的阻塞队列DelayedWorkQueue经过小根堆来实现优先队列的功能。因为DelayedWorkQueue是无界的,因此本质上对于ScheduledThreadPoolExecutor而言,maximumPoolSize并无意义。总体而言,ScheduledThreadPoolExecutor处理两类任务--延时任务与周期任务。经过ScheduledFutureTask.period的是否为零属于哪一类,经过ScheduledFutureTask.period的正负性来判断属于周期任务中的fixed-rate模式仍是fixed-delay模式。而且提供了经过参数来控制延时任务与周期任务在线程池被关闭时是否须要被取消并移除出队列(若是还在队列)以及是否容许执行(若是已经被worker线程取出)。
DelayedWorkQueue使用了Leader/Follower来避免没必要要的等待,只让leader来等待须要等待的时间,其他线程无限等待直至被唤醒便可。
DelayedWorkQueue全部的堆调整方法都维护了类型为ScheduledFutureTask的元素的heapIndex,以下降cancel的时间复杂度。
下面整理一下ScheduledThreadPoolExecutor中几个重要参数。