自jdk1.5开始,Java开始提供ScheduledThreadPoolExecutor类来支持周期性任务的调度,在这以前,这些工做须要依靠Timer/TimerTask或者其它第三方工具来完成。但Timer有着很多缺陷,如Timer是单线程模式,调度多个周期性任务时,若是某个任务耗时较久就会影响其它任务的调度;若是某个任务出现异常而没有被catch则可能致使惟一的线程死掉而全部任务都不会再被调度。ScheduledThreadPoolExecutor解决了不少Timer存在的缺陷。java
先来看看ScheduledThreadPoolExecutor的实现模型,它经过继承ThreadPoolExecutor来重用线程池的功能,里面作了几件事情:工具
public
void
run() {
if
(isPeriodic())
runPeriodic();
else
ScheduledFutureTask.
super
.run();
}
|
若是不是周期性任务就直接执行任务(也就是else部分),这个主要是用于实现ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),后面会讲到它们的实现,这里先关注周期任务的执行方式。周期性任务执行的是runPeriodic(),看下它的实现:this
private
void
runPeriodic() {
boolean
ok = ScheduledFutureTask.
super
.runAndReset();
boolean
down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if
(ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isTerminating()))) {
long
p = period;
if
(p >
0
)
time += p;
else
time = triggerTime(-p);
ScheduledThreadPoolExecutor.
super
.getQueue().add(
this
);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else
if
(down)
interruptIdleWorkers();
}
|
这里能够看到,先执行了任务自己(ScheduledFutureTask.super.runAndReset),这个调用有一个返回值,来看下它的实现:spa
protected
boolean
runAndReset() {
return
sync.innerRunAndReset();
}
|
跟进去看下innerRunAndReset():线程
boolean
innerRunAndReset() {
if
(!compareAndSetState(
0
, RUNNING))
return
false
;
try
{
runner = Thread.currentThread();
callable.call();
// don't set result
runner =
null
;
return
compareAndSetState(RUNNING,
0
);
}
catch
(Throwable ex) {
innerSetException(ex);
return
false
;
}
}
|
能够发现,这里须要关注的是第三个return,也就是若是任务执行出现了异常,会被catch且返回false.rest
继续看runPeriodic()方法,if里面,若是刚才任务执行的返回值是true且线程池还在运行就在if块中的操做,若是线程池被关闭了就作else if里的操做。也就是说,若是以前的任务执行出现的异常返回了false,那么if里以及else if里的代码都不会执行了,那有什么影响?接下来看看if里作了什么。code
if里的代码很简单,分为两部分,一是计算这个任务下次调度的间隔,二是将任务从新放回队列中。回到出现异常的状况,若是刚才的任务执行出现了异常,就不会将任务再放回队列中,换而言之,也就是这个任务再也得不到调度了!可是,这并不影响其它周期任务的调度。blog
综上,能够看到,ScheduledThreadPoolExecutor执行周期性任务的模型就是:调度一次任务,计算并设置该任务下次间隔,将任务放回队列中供线程池执行。这里的队列起了很大的做用,且有一些特色:距离下次调度间隔短的任务老是在队首,队首的任务若距离下次调度的间隔时间大于0就没法从该队列的take()方法中拿到任务。排序
接下来看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)这两个非周期性任务的实现方式,先看看它们的源码:继承
public
ScheduledFuture<?> schedule(Runnable command,
long
delay,
TimeUnit unit) {
if
(command ==
null
|| unit ==
null
)
throw
new
NullPointerException();
ScheduledFutureTask<?> t =
new
ScheduledFutureTask<Boolean>(command,
null
, triggerTime(delay, unit));
delayedExecute(t);
return
t;
}
|
public
<V> ScheduledFuture<V> schedule(Callable<V> callable,
long
delay,
TimeUnit unit) {
if
(callable ==
null
|| unit ==
null
)
throw
new
NullPointerException();
ScheduledFutureTask<V> t =
new
ScheduledFutureTask<V>(callable, triggerTime(delay, unit));
delayedExecute(t);
return
t;
}
|
private
void
delayedExecute(Runnable command) {
if
(isShutdown()) {
reject(command);
return
;
}
// Prestart a thread if necessary. We cannot prestart it
// running the task because the task (probably) shouldn't be
// run yet, so thread will just idle until delay elapses.
if
(getPoolSize() < getCorePoolSize())
prestartCoreThread();
super
.getQueue().add(command);
}
|
实现方式也很简单,在建立ScheduledThreadPoolExecutor内部任务(即ScheduledFutureTask)的时候就将调度间隔计算并设置好,若是当前线程数小于设置的核心线程数,就启动一个线程(多是线程池刚启动里面尚未线程,也多是里面的线程执行任务时挂掉了。若是线程池中的线程挂掉了而又没有调用这些schedule方法谁去补充挂掉的线程?不用担忧,线程池本身会处理的)去监听队列里的任务,而后将任务放到队列里,在任务执行间隔不大于0的时候,线程就能够拿到这个任务并执行。
周期性任务的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))与非周期性任务是相似的,它们处理方式不一样的地方在于前文说到的ScheduledFutureTask#run()中。