ScheduledThreadPoolExecutor 也是一个线程池类,是线程池类 ThreadPoolExecutor 的子类。除了 ThreadPoolExecutor 相关的方法以外,它还增长了执行定时任务和周期性任务的方法。它的类签名和继承结构以下:app
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}
异步
先看它的一个内部嵌套类 DelayedWorkQueue,该类是一个延迟队列,它的类签名和继承结构以下:源码分析
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {}
flex
ScheduledThreadPoolExecutor 有以下四个构造器:ui
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
this
它继承了 FutureTask 类(可参考前文「JDK源码分析-FutureTask」的分析),且实现了 RunnableScheduledFuture 接口,该接口定义以下:spa
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
// 一个任务是否周期性执行的,如果则能够重复执行;不然只能运行一次
boolean isPeriodic();
}
线程
RunnableScheduledFuture 只定义了一个方法 isPeriodic,该方法用于判断一个任务是不是周期性执行的。它继承的 RunnableFuture 接口在前文 FutureTask 类中已进行分析,而 ScheduledFuture 接口如下:3d
先看它的主要成员变量:rest
// 定时任务执行的时间(单位:纳秒)
private long time;
/**
* 重复执行的任务的时间间隔(单位:纳秒)
* 正数表示固定频率(fixed-rate)执行
* 负数表示固定延迟(fixed-delay)执行
* 零表示非重复执行的任务
*/
private final long period;
// reExecutePeriodic 方法中从新排队的任务
RunnableScheduledFuture<V> outerTask = this;
// 延迟队列中的索引位置,便于快速取消
int heapIndex;
构造器:
/**
* 构造器一:用给定的触发时间(纳秒),建立一个一次性任务
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 构造器二:用给定的触发时间和间隔(纳秒),建立一个周期性任务
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 构造器三:用给定的触发时间(纳秒),建立一个一次性任务
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
该类是一个任务类,即 Runnable 接口的实现类,所以它最核心的就是 run 方法,以下:
public void run() {
// 是否为周期性任务
boolean periodic = isPeriodic();
// 若任务不能执行,则取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若为非周期性任务
else if (!periodic)
// 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 run 方法执行
ScheduledFutureTask.super.run();
// 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 runAndReset 方法执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 设置下一次执行时间
reExecutePeriodic(outerTask); // 周期性执行
}
}
reExecutePeriodic 方法以下:
/**
* 该方法主要是将周期性任务从新排队
* 它的实现与 delayedExecute 方法(后面分析)逻辑有些相似
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
schedule 方法 1:其做用是延迟指定的时间后执行任务(即执行定时任务),只会执行一次。
public ScheduledFuture > schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 把用户提交的 Runnable 对象包装为 RunnableScheduledFuture 对象
// decorateTask 方法默认返回第二个参数
// decorateTask 方法的修饰符是 protected,可根据需求自行扩展
RunnableScheduledFuture > t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 执行给定的任务
delayedExecute(t);
return t;
}
delayExecute 方法:
/*
* 延迟或周期性任务的主要执行方法。
* 若线程池已关闭,则拒绝该任务(执行拒绝策略);
* 不然将任务添加到工做队列,如有须要启动一个线程去执行。
* 若在添加任务时关闭了线程池,则将其从队列移除并取消该任务
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 若线程池已关闭,则执行拒绝策略
if (isShutdown())
reject(task);
else {
// 将该任务添加到任务队列(即前面的延迟队列)
super.getQueue().add(task);
// 若当前任务没法执行,则将其从队列移除而且取消执行(相似事务的回滚操做)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
// 任务能够执行,如有须要新增线程以执行该任务
else
ensurePrestart();
}
}
schedule 方法 2:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
scheduleAtFixedRate 方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将 Runnable 对象包装为 ScheduledFutureTask 对象
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay 方法:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
这两个方法是 Executor 接口和 ExecutorService 接口所定义的方法,代码实现以下:
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
它们内部直接调用了 schedule(Runnable) 方法。另外两个 submit 方法:
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}