咱们在上一篇学习了ThreadPoolExecutor的实现原理:Java并发包源码学习系列:线程池ThreadPoolExecutor源码解析java
本篇咱们来学习一下在它基础之上的扩展:ScheduledThreadPoolExecutor。它继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口,是一个能够在指定必定延迟时间后或者定时进行任务调度执行的线程池。编程
public class TestScheduledThreadPool { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public static void main (String[] args) throws InterruptedException { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run () { System.out.println("command .. " + new Date()); } }, 0, 1, TimeUnit.SECONDS); } }
简单看一个demo吧,这里使用Executors工具类建立ScheduledExecutorService,起始就是实例化了一个ScheduledThreadPoolExecutor,固然咱们自定义也是能够的。并发
接着调用scheduleAtFixedRate
方法,指定延迟为0,表示当即执行, 指定period为1,以1s为周期定时执行该任务。ide
从总体感知ScheduledThreadPoolExecutor的执行函数
//Executors.java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor.java public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledExecutorService表明可在指定延迟后或周期性地执行线程任务线程池,提供了以下4个方法:工具
public interface ScheduledExecutorService extends ExecutorService { // 指定command任务将在delay延迟后执行 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); // 指定callable任务将在delay延迟后执行 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 指定command任务将在delay延迟后执行,并且以设定频率重复执行 // initialDelay + period 开始, initialDelay + n * period 处执行 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 建立并执行一个在给定初始延迟后首次启用的按期操做,随后在每一次执行终止和下一次执行开始之间 // 都存在给定的延迟。若是任务在任一一次执行时遇到异常,就会取消后续执行; // 不然,只能经过程序来显式取消或终止该任务 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
能够按照DelayQueue中的Delayed的元素理解,是具体放入延迟队列中的东西,能够看到实现了getDelay和compareTo方法。学习
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** FIFO队列中的序列号,time相同,序列号小的排在前面 */ private final long sequenceNumber; /** 任务将要被执行的时间,也就是过时时间 */ private long time; /** * period == 0 当前任务是一次性的, 执行完毕后就退出 * period > 0 当前任务是fixed-delay任务,是固定延迟的定时可重复执行任务 * period < 0 当前任务是fixed-rate任务,是固定频率的定时可重复执行任务 */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation. */ int heapIndex; //... 省略构造函数 // 当前任务还剩多久过时 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 队列中的比较策略 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; // time相同,序列号小的排在前面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } //... 省略其余方法 }
FutureTask内部使用一个state变量表示任务状态。this
public class FutureTask<V> implements RunnableFuture<V> { /** * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; // 初始状态 private static final int COMPLETING = 1; // 执行中 private static final int NORMAL = 2; // 正常运行结束 private static final int EXCEPTIONAL = 3; // 运行中异常 private static final int CANCELLED = 4; // 任务被取消 private static final int INTERRUPTING = 5; // 任务正在被中断 private static final int INTERRUPTED = 6; // 任务已经被中断 }
提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay时间后开始执行。.net
若是提交的任务不是周期性的任务,任务只会执行一次。线程
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 参数校验 if (command == null || unit == null) throw new NullPointerException(); // 任务转换: 把command任务转换为ScheduledFutureTask RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 添加任务到延迟队列 delayedExecute(t); return t; } // 将延迟时间转换为绝对时间, private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 将当前的那描述加上延迟的nanos后的long型值 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // 调用FutureTask的构造方法 this.time = ns; this.period = 0; // 这里表示任务是一次性的 this.sequenceNumber = sequencer.getAndIncrement(); } } // FutureTask.java public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Runnable runnable, V result) { // 将runnable转化为callable this.callable = Executors.callable(runnable, result); // 设置当前的任务状态为NEW this.state = NEW; // ensure visibility of callable } }
private void delayedExecute(RunnableScheduledFuture<?> task) { // 若是线程池关闭, 则执行拒绝策略 if (isShutdown()) reject(task); else { // 将任务添加到延迟队列 super.getQueue().add(task); // 检查线程池状态,若是已经关闭,则从延迟队列里面删除刚才添加的任务 // 但此时可能线程池中的线程已经从任务队列里面移除了该任务 // 此时须要调用cancel 取消任务 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 确保至少一个线程在处理任务 ensurePrestart(); } }
判断当前任务是否应该被取消。
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
periodic参数经过isPeriodic()
获得,若是period为0,则为false。
相应的isRunningOrShutdown方法传入的参数就应该是executeExistingDelayedTasksAfterShutdown,默认为true,表示:其余线程调用了shutdown命令关闭线程池后,当前任务仍是要执行
确保至少一个线程在处理任务:若是线程个数小于核心线程池数则新增一个线程,不然若是当前线程数为0,则新增一个线程。
void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 增长核心线程数 if (wc < corePoolSize) addWorker(null, true); // 若是corePoolSize==0 也添加一个线程 else if (wc == 0) addWorker(null, false); }
具体执行任务的线程是Worker线程,任务执行是Worker线程调用任务的润方法执行,这里的任务是ScheduledFutureTask,也就是调用它的run方法。
public void run() { // 是否只执行一次 period != 0 boolean periodic = isPeriodic(); // 取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任务只执行一次, 调用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定时执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下一次运行时间 setNextRunTime(); // 从新加入延迟队列 reExecutePeriodic(outerTask); } }
public void run() { // 若是任务不是NEW状态 直接返回 // 若是是NEW, 可是cas设置当前任务的持有者为当前线程失败 也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次判断任务的状态,避免两次判断状态之间有其余线程对任务状态进行修改 if (c != null && state == NEW) { V result; boolean ran; try { // 执行任务 result = c.call(); // 执行成功 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 若是执行任务成功 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { // CAS 将当前任务的状态 从 NEW 转化 为 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 走到这里只有一个线程会到这里,设置任务状态 为NORMAL 正常结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
protected void setException(Throwable t) { // CAS 将当前任务的状态 从 NEW 转化 为 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 走到这里只有一个线程会到这里,设置任务状态 为EXCEPTIONAL,非正常结束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
针对任务类型为fixed-delay,当任务执行完毕后,让其延迟固定时间后再次运行,原理是:
// initialDelay : 提交任务后延迟多少时间开始执行任务 // delay : 当任务执行完毕后延长多少时间后再次运行任务 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 参数校验 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任务转换 period < 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 添加任务到队列 delayedExecute(t); return t; }
注意这里构造的ScheduledFutureTask的period<0,会致使boolean periodic = isPeriodic();
的结果是true,所以在ScheduledFutureTask的run逻辑中,会调用FutureTask的runAndReset()方法。
具体执行任务的线程是Worker线程,任务执行是Worker线程调用任务的润方法执行,这里的任务是ScheduledFutureTask,也就是调用它的run方法。
public void run() { // 是否只执行一次 period != 0 boolean periodic = isPeriodic(); // 取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任务只执行一次, 调用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定时执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下一次运行时间 setNextRunTime(); // 从新加入延迟队列 reExecutePeriodic(outerTask); } }
相比于FutureTask的run方法,该方法逻辑差很少,但缺乏了:在任务正常执行完后设置状态的步骤。缘由在于:让任务成为可重复执行的任务。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 若是当前任务正常执行完毕而且任务状态为NEW 则返回true, 不然返回false return ran && s == NEW; }
若是该方法返回true,将会调用setNextRunTime()设置下一次的运行时间,接着调用reExecutePeriodic(outerTask)从新加入任务队列。
// 设置下一次运行时间 private void setNextRunTime() { long p = period; if (p > 0) time += p; else // 延迟-p的时间 time = triggerTime(-p); }
针对任务类型为fixed-rate,相对起始时间点以固定频率调用指定的任务。
// initialDelay : 提交任务后延迟多少时间开始执行任务 // period 固定周期 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // period > 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
它和scheduleWithFixedDelay相似,区别在于:
最终的执行规则为:initialDelay + n * period 的 刻执行任务,若是当前任务执行的时间到了,不会并发执行,下一次执行的任务将会延迟执行。