在前面的文章中,咱们介绍了定时任务类 Timer ,他是 JDK 1.3 中出现的,位于 java.util 包下。而今天说的 ScheduledThreadPoolExecutor
的是在 JUC 包下,是 JDK1.5 新增的。java
今天就来讲说这个类。程序员
该类内部结构和 Timer
仍是有点相似的,也是 3 个类:数据结构
ScheduledThreadPoolExecutor
:程序员使用的接口。DelayedWorkQueue
: 存储任务的队列。ScheduledFutureTask
: 执行任务的线程。构造方法介绍:this
// 使用给定核心池大小建立一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用给定初始参数建立一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用给定的初始参数建立一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用给定初始参数建立一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
复制代码
ScheduledThreadPoolExecutor
最多支持 3 个参数:核心线程数量,线程工厂,拒绝策略。spa
为何没有最大线程数量?因为 ScheduledThreadPoolExecutor
内部是个无界队列,maximumPoolSize
也就没有意思了。线程
再介绍一下他的 API 方法,请原谅我将 JDK 文档照抄过来了,就当是备忘吧,以下:rest
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 callable 的任务。 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 runnable 的任务。 void execute(Runnable command) // 使用所要求的零延迟执行命令。 boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的状况下、是否继续执行现有按期任务的策略。 boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的状况下是否继续执行现有延迟任务的策略。 BlockingQueue<Runnable> getQueue() // 返回此执行程序使用的任务队列。 boolean remove(Runnable task) // 从执行程序的内部队列中移除此任务(若是存在),从而若是还没有开始,则其再也不运行。 <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // 建立并执行在给定延迟后启用的 ScheduledFuture。 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) // 建立并执行在给定延迟后启用的一次性操做。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 建立并执行一个在给定初始延迟后首次启用的按期操做,后续操做具备给定的周期;也就是将在 initialDelay 后开始执行,而后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 建立并执行一个在给定初始延迟后首次启用的按期操做,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。 void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的状况下是否继续执行现有按期任务的策略。 void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的状况下是否继续执行现有延迟任务的策略。 void shutdown() // 在之前已提交任务的执行中发起一个有序的关闭,可是不接受新任务。 List<Runnable> shutdownNow() // 尝试中止全部正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。 <T> Future<T> submit(Callable<T> task) // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。 Future<?> submit(Runnable task) // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 <T> Future<T> submit(Runnable task, T result) // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 复制代码
最常用的几个方法以下:code
// 使用给定核心池大小建立一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 建立并执行在给定延迟后启用的一次性操做。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 建立并执行一个在给定初始延迟后首次启用的按期操做,后续操做具备给定的周期;也就是将在 initialDelay 后开始执行,而后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 建立并执行一个在给定初始延迟后首次启用的按期操做,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
复制代码
除了默认的构造方法,还有 3 个 schedule
方法。咱们将分析他们内部的实现。cdn
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
咱们感兴趣的就是这个 DelayedWorkQueue
队列了。他也是一个阻塞队列。这个队列的数据结构是堆。同时,这个 queue
也是可比较的,比较什么呢?任务必须实现 compareTo
方法,这个方法的比较逻辑是:比较任务的执行时间,若是任务的执行时间相同,则比较任务的加入时间。对象
所以,ScheduledFutureTask
有 2 个变量:
time
: 任务的执行时间。sequenceNumber
:任务的加入时间。这两个变量就是用来比较任务的执行顺序的。整个调度的顺序就是这个逻辑。
刚刚说了,有 3 个 schedule
方法:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
建立并执行在给定延迟后启用的一次性操做。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
建立并执行一个在给定初始延迟后首次启用的按期操做,后续操做具备给定的周期;也就是将在initialDelay
后开始执行,而后在 initialDelay+period
后执行,接着在initialDelay + 2 * period
后执行,依此类推。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
建立并执行一个在给定初始延迟后首次启用的按期操做,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
第一个方法执行在给定的时间后,执行一次就结束。
有点意思的地方是 第二个方法和 第三个方法,他们直接的区别。
这两个方法均可以重复的调用。可是,重复调用的逻辑有所区别,这里就是比 Timer 好用的地方。
他们的共同点在于:必须等待上个任务执行完毕才能执行下个任务。
不一样点在于:他们调度的时间粗略是不一样的。
scheduleAtFixedRate
方法的执行周期都是固定的,也就是,他是以上一个任务的开始执行时间做为起点,加上以后的 period 时间,调度下次任务。
scheduleWithFixedDelay
方法则是以上一个任务的结束时间做为起点,加上以后的 period 时间,调度下次任务。
有什么区别呢?
如何任务执行时间很短,那就没上面区别。可是,若是任务执行时间很长,超过了 period 时间,那么区别就出来了。
咱们假设一下。
咱们设置 period
时间为 2 秒,而任务耗时 5 秒。
这个两个方法的区别就体现出来了。
scheduleAtFixedRate
方法将会在上一个任务结束完毕马上执行,他和上一个任务的开始执行时间的间隔是 5 秒(由于必须等待上一个任务执行完毕)。
scheduleWithFixedDelay
方法将会在上一个任务结束后,注意:**再等待 2 秒,**才开始执行,那么他和上一个任务的开始执行时间的间隔是 7 秒。
因此,咱们在使用 ScheduledThreadPoolExecutor
的过程当中须要注意任务的执行时间不能超过间隔时间,若是超过了,最好使用scheduleAtFixedRate
方法,防止任务堆积。
固然,也和具体的业务有关。不能一律而论。但必定要注意这两个方法的区别。
咱们看看 scheduleAtFixedRate 方法的内部实现。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
复制代码
建立一个 ScheduledFutureTask
对象,而后装饰一个这个Future
,该类实现是直接返回,子类能够有本身的实现,在这个任务外装饰一层。
而后执行 delayedExecute
方法,最后返回 Future
。
这个 ScheduledFutureTask
实现了不少接口,好比 Future
,Runnable
, Comparable
,Delayed
等。
ScheduledFutureTask
的构造方法以下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
层层递进,该类首先经过一个原子静态 int
对象这只任务的入队编号,而后建立一个 Callable
,这个 Callable 是一个适配器,适配了Runnable
和Callable
,也就是将Runnable
包装成 callabe
, 他的 call
方法就是调用给定任务的 run 方法。固然,这里的 result
是没有什么做用的。
若是你传递的是一个 callable
,那么,就调用 FutureTask
的 run
方法,设置真正的返回值。
这里使用了适配器模式,仍是挺有趣的。
总的来讲,这个 ScheduledFutureTask
基于 FutureTask
, 关于 FutureTask
咱们以前从源码介绍过了。
而他本身重写了几个方法:compareTo
, getDelay
, run
,isPeriodic
4 个方法。
咱们仍是要看看 delayedExecute
的实现。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 添加进队列。
super.getQueue().add(task);
// 若是线程池关闭了,且不能够在当前状态下运行任务,且从队列删除任务成功,就给任务打上取消标记。
// 第二个判断是由两个变量控制的(下面是默认值):
// continueExistingPeriodicTasksAfterShutdown = false 表示关闭的时候应取消周期性任务。默认关闭
// executeExistingDelayedTasksAfterShutdown = true。表示关闭的时候应取消非周期性的任务。默认不关闭。
// running 状态下,canRunInCurrentRunState 一定返回 ture。
// 非 running 状态下,canRunInCurrentRunState 根据上面的两个值返回。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 开始执行任务
ensurePrestart();
}
}
复制代码
说说上面的方法。
ScheduledFutureTask
的 compareTo
方法来的,先比较执行时间,再比较添加顺序。总体的过程如图:
注意上面的图,若是是周期性的任务,则会在执行完毕后,归还队列。
从哪里能够看出来呢?
ScheduledFutureTask
的 run
方法:
public void run() {
// 是不是周期性任务
boolean periodic = isPeriodic();
// 若是不能够在当前状态下运行,就取消任务(将这个任务的状态设置为CANCELLED)。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若是不是周期性的任务,调用 FutureTask # run 方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 若是是周期性的。
// 执行任务,但不设置返回值,成功后返回 true。(callable 不能够重复执行)
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行时间
setNextRunTime();
// 再次将任务添加到队列中
reExecutePeriodic(outerTask);
}
}
复制代码
逻辑以下:
FutureTask
的 run
方法。runAndReset
方法。而管理整个执行过程的就是ScheduledThreadPoolExecutor
的父类 ThreadPoolExecutor
的runWorker
方法。其中,该方法会从队列中取出数据,也就是调用队列的 take
方法。
关于DelayedWorkQueue
的take
方法,其中有个 leader
变量,若是 leader 不是空,说明已经有线程在等待了,那就阻塞当前线程,若是是空,说明,队列的第一个元素已经被更新了,就设置当前线程为 leader
.
这是一个 Leader-Follower
模式,Doug Lea 说的。
固然,take
方法总体的逻辑仍是不变的。从队列的头部拿数据。使用 Condition
作线程之间的协调。
关于 ScheduledThreadPoolExecutor
调度类,咱们分析的差很少了,总结一下。
ScheduledThreadPoolExecutor
是个定时任务线程池,相似 Timer
,可是比 Timer
强大,健壮。
好比不会像 Timer 那样,任务异常了,整个调度系统就完全无用了。
也比Timer
多了Rate
模式(Rate 和 Delay
)。
这两种模式的区别就是任务执行的起点时间不一样,Rate
是从上一个任务的开始执行时间开始计算;Delay
是从上一个任务的结束时间开始计算。
所以,若是任务自己的时间超过了间隔时间,那么这两种模式的间隔时间将会不一致。
而任务的排序是经过 ScheduledFutureTask
的 compareTo
方法排序的,规则是先比较执行时间,若是时间相同,再比较加入时间。
还要注意一点就是:若是任务执行过程当中异常了,那么将不会再次重复执行。由于 ScheduledFutureTask
的 run
方法没有作catch
处理。因此程序员须要手动处理,相对于 Timer 异常就直接费了调度系统来讲,要好不少。
ScheduledThreadPoolExecutor
的是实现基于 ThreadPoolExecutor
,大部分功能都是重用的父类,只是本身在执行完毕以后,从新设置时间,并再次将任务还到了队列中,造成了定时任务。