JDK源码分析-ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor 也是一个线程池类,是线程池类 ThreadPoolExecutor 的子类。除了 ThreadPoolExecutor 相关的方法以外,它还增长了执行定时任务和周期性任务的方法。它的类签名和继承结构以下:app

public class ScheduledThreadPoolExecutor        extends ThreadPoolExecutor        implements ScheduledExecutorService {}异步

能够看到,它继承了 ThreadPoolExecutor 类(参考 「 JDK源码分析-ThreadPoolExecutor 」),而且实现了 ScheduledExecutorService 接口(参考 「 JDK源码分析-ScheduledExecutorService 」),所以具备两者的特性。 下面分析其代码实现。

代码分析

内部嵌套类 DelayedWorkQueue

先看它的一个内部嵌套类 DelayedWorkQueue,该类是一个延迟队列,它的类签名和继承结构以下:源码分析

static class DelayedWorkQueue extends AbstractQueue<Runnable>    implements BlockingQueue<Runnable> {}flex

Delay edWorkQue ue 类与前文分析的 DelayQueue 「JDK源码分析-DelayQueue」实现原理相似 ,这里就再也不赘述。

构造器

ScheduledThreadPoolExecutor 有以下四个构造器:ui

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue());}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory);}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), handler);}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory, handler);}
this

这几个构造器都是直接调用父类 ThreadPoolExecutor 的构造器,只是传入了不一样的参数。而其中的参数 workQueue 都传入了上面的延迟队列 DelayedWorkQueue。

内部类 ScheduledFutureTask

ScheduledThreadPoolExecutor 还有一个内部类 ScheduledFutureTask,它的继承结构以下:

它继承了 FutureTask 类(可参考前文「JDK源码分析-FutureTask」的分析),且实现了 RunnableScheduledFuture 接口,该接口定义以下:spa

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {    // 一个任务是否周期性执行的,如果则能够重复执行;不然只能运行一次    boolean isPeriodic();}线程

RunnableScheduledFuture 只定义了一个方法 isPeriodic,该方法用于判断一个任务是不是周期性执行的。它继承的 RunnableFuture 接口在前文 FutureTask 类中已进行分析,而 ScheduledFuture 接口如下:3d


它的内部并未定义方法,只是整合了 Delayed 接口和 Future 接口,Delayed 接口前文也已分析,下面分析该类的主要代码。

先看它的主要成员变量:rest

// 定时任务执行的时间(单位:纳秒)private long time;
/** * 重复执行的任务的时间间隔(单位:纳秒) * 正数表示固定频率(fixed-rate)执行 * 负数表示固定延迟(fixed-delay)执行 * 零表示非重复执行的任务 */private final long period;
// reExecutePeriodic 方法中从新排队的任务RunnableScheduledFuture<V> outerTask = this;
// 延迟队列中的索引位置,便于快速取消int heapIndex;

构造器:

/** * 构造器一:用给定的触发时间(纳秒),建立一个一次性任务 */ScheduledFutureTask(Runnable r, V result, long ns) {    super(r, result);    this.time = ns;    this.period = 0;    this.sequenceNumber = sequencer.getAndIncrement();}
/** * 构造器二:用给定的触发时间和间隔(纳秒),建立一个周期性任务 */ScheduledFutureTask(Runnable r, V result, long ns, long period) {    super(r, result);    this.time = ns;    this.period = period;    this.sequenceNumber = sequencer.getAndIncrement();}
/** * 构造器三:用给定的触发时间(纳秒),建立一个一次性任务 */ScheduledFutureTask(Callable<V> callable, long ns) {    super(callable);    this.time = ns;    this.period = 0;    this.sequenceNumber = sequencer.getAndIncrement();}

ScheduledFutureTask 有三个构造器,可分为两类:分别是建立一次性任务(一和三)和周期性任务(二)。 其中一和 三仍是 Runnable 和 Callable 的区别。

该类是一个任务类,即 Runnable 接口的实现类,所以它最核心的就是 run 方法,以下:

public void run() {    // 是否为周期性任务    boolean periodic = isPeriodic();    // 若任务不能执行,则取消    if (!canRunInCurrentRunState(periodic))        cancel(false);    // 若为非周期性任务    else if (!periodic)        // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 run 方法执行        ScheduledFutureTask.super.run();    // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 runAndReset 方法执行    else if (ScheduledFutureTask.super.runAndReset()) {        setNextRunTime(); // 设置下一次执行时间        reExecutePeriodic(outerTask); // 周期性执行    }}

reExecutePeriodic 方法以下:

/** * 该方法主要是将周期性任务从新排队 * 它的实现与 delayedExecute 方法(后面分析)逻辑有些相似 */void reExecutePeriodic(RunnableScheduledFuture<?> task) {    if (canRunInCurrentRunState(true)) {        super.getQueue().add(task);        if (!canRunInCurrentRunState(true) && remove(task))            task.cancel(false);        else            ensurePrestart();    }}


schedule & scheduleAtFixedRate & scheduleWithFixedDelay

这几个就是执行定时任务和周期性任务的方法,它们是对前文 「JDK源码分析-ScheduledExecutorService」接口所定义的方法实现,可参考前文的分析。

schedule 方法 1:其做用是延迟指定的时间后执行任务(即执行定时任务),只会执行一次。

public ScheduledFuture<?> schedule(Runnable command,                                   long delay,                                   TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    // 把用户提交的 Runnable 对象包装为 RunnableScheduledFuture 对象    // decorateTask 方法默认返回第二个参数    // decorateTask 方法的修饰符是 protected,可根据需求自行扩展    RunnableScheduledFuture<?> t = decorateTask(command,        new ScheduledFutureTask<Void>(command, null,                                      triggerTime(delay, unit)));    // 执行给定的任务    delayedExecute(t);    return t;}

delayExecute 方法:

/* * 延迟或周期性任务的主要执行方法。 * 若线程池已关闭,则拒绝该任务(执行拒绝策略); * 不然将任务添加到工做队列,如有须要启动一个线程去执行。 * 若在添加任务时关闭了线程池,则将其从队列移除并取消该任务 */private void delayedExecute(RunnableScheduledFuture<?> task) {    // 若线程池已关闭,则执行拒绝策略    if (isShutdown())        reject(task);    else {        // 将该任务添加到任务队列(即前面的延迟队列)        super.getQueue().add(task);        // 若当前任务没法执行,则将其从队列移除而且取消执行(相似事务的回滚操做)        if (isShutdown() &&            !canRunInCurrentRunState(task.isPeriodic()) &&            remove(task))            task.cancel(false);        // 任务能够执行,如有须要新增线程以执行该任务        else            ensurePrestart();    }}

schedule 方法 2:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,                                       long delay,                                       TimeUnit unit) {    if (callable == null || unit == null)        throw new NullPointerException();    RunnableScheduledFuture<V> t = decorateTask(callable,        new ScheduledFutureTask<V>(callable,                                   triggerTime(delay, unit)));    delayedExecute(t);    return t;}

该方法与前者相似,差异在于这里的参数类型是 Callable,前者是 Runnable 类型,其余操做同样。

scheduleAtFixedRate 方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                              long initialDelay,                                              long period,                                              TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (period <= 0)        throw new IllegalArgumentException();    // 将 Runnable 对象包装为 ScheduledFutureTask 对象    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit),                                      unit.toNanos(period));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}

该方法与前面的 schedule 方法相似,区别仅在于使用了不一样的 ScheduledFutureTask 对象,其余的执行流程几乎同样。

scheduleWithFixedDelay 方法:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                 long initialDelay,                                                 long delay,                                                 TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (delay <= 0)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit),                                      unit.toNanos(-delay));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}

该方法与  scheduleAtFixedRate 方法基本同样,区别仅在于构建 ScheduledFutureTask 对象时参数 period 不一样(一正一负,用以区分类型)。

execute & submit 方法

这两个方法是 Executor 接口和 ExecutorService 接口所定义的方法,代码实现以下:

public void execute(Runnable command) {    schedule(command, 0, NANOSECONDS);}
public Future<?> submit(Runnable task) {    return schedule(task, 0, NANOSECONDS);}
它们内部直接调用了 schedule(Runnable) 方法。另外两个 submit 方法

public <T> Future<T> submit(Runnable task, T result) {    return schedule(Executors.callable(task, result), 0, NANOSECONDS);}
public <T> Future<T> submit(Callable<T> task) {    return schedule(task, 0, NANOSECONDS);}

它们部直接调用了 schedule(Callable) 方法。

小结

1. ScheduledThreadPoolExecutor 是线程池的实现类之一;
2. 它继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口;
3. 提供了异步提交任务的 execute 方法和 submit 方法;
4. 供了执行定时任务的 schedule 方法和周期性任务的 scheduleAtFixedRate/scheduleWithFixedDelay 方法(使用延迟队列实现)。


相关阅读:
JDK源码分析-ThreadPoolExecutor
JDK源码分析-ScheduledExecutorService
JDK源码分析-DelayQueue
JDK源码分析-FutureTask


相关文章
相关标签/搜索