文本将主要讲述 ThreadPoolExecutor
一个特殊的子类 ScheduledThreadPoolExecutor
,主要用于执行周期性任务;因此在看本文以前最好先了解一下 ThreadPoolExecutor
,能够参考 ThreadPoolExecutor 详解;另外 ScheduledThreadPoolExecutor
中使用了延迟队列,主要是基于彻底二叉堆实现的,能够参考 彻底二叉堆;html
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
在源码中能够看到,ScheduledThreadPoolExecutor
的状态管理、入队操做、拒绝操做等都是继承于 ThreadPoolExecutor
;ScheduledThreadPoolExecutor
主要是提供了周期任务和延迟任务相关的操做;java
就 ScheduledThreadPoolExecutor
的运行逻辑而言,大体能够表述为:dom
其内部结构如图所示:ide
这里须要注意的:函数
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; // 任务序号,从 AtomicLong sequencer 获取,当延迟时间相同时,序号小的先出 private long time; // 下次任务执行时间 private final long period; // 0 表示非周期任务,正值表示固定频率周期任务,负值表示固定延迟周期任务 RunnableScheduledFuture<V> outerTask = this; // 重复执行的任务,传入的任务可使用 decorateTask() 从新包装 int heapIndex; // 队列索引 }
其中最重要的方法必然是 run 方法了:源码分析
public void run() { boolean periodic = isPeriodic(); // 是否为周期任务,period != 0 if (!canRunInCurrentRunState(periodic)) // 当前状态可否继续运行,详细测试后面还会讲到 cancel(false); // 取消任务 else if (!periodic) // 不是周期任务时,直接运行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 时周期任务 setNextRunTime(); // 设置下次执行时间 reExecutePeriodic(outerTask); // 从新入队 } }
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); // 设置中断状态 if (cancelled && removeOnCancel && heapIndex >= 0) // 当设置 removeOnCancel 状态时,移除任务 remove(this); // 默认为 false return cancelled; }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 若是当前状态能够执行 super.getQueue().add(task); // 则从新入队 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); // 确保有线程执行任务 } }
此外还有 DelayedWorkQueue,可是这里不许备讲了,能够查看 彻底二叉堆 了解实现的原理;测试
scheduleAtFixedRate
和 scheduleWithFixedDelay
是咱们最经常使用的两个方法,可是他们的区别可能不是很清楚,这里重点讲一下,this
// 测试 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { sleep(1000); // 睡眠 1s, log.info("run task"); }, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s
// 打印
[19:41:28,489 INFO ] [pool-1-thread-1] - run task
[19:41:30,482 INFO ] [pool-1-thread-1] - run task
[19:41:32,483 INFO ] [pool-1-thread-1] - run task
[19:41:34,480 INFO ] [pool-1-thread-1] - run task线程
能够看到的确时固定周期 2s 执行的,可是若是任务执行时间超过周期呢?3d
// 测试 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = 2000 + random.nextInt(3) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s
// 打印
[19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000
[19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000
[19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000
能够看到若是任务执行时间超出周期时,下一次任务会马上运行;就好像周期是一个有弹性的袋子,能装下运行时间的时候,是固定大小,装不下的时候就会被撑大,图像化表示以下:
// 测试 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = 1000 + random.nextInt(5) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s
// 打印
[20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000
能够看到不管执行时间是多少,其结果都是在执行完毕后,停顿固定的时间,而后执行下一次任务,其图形化表示为:
public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask( command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask( callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
能够看到全部的周期任务,最终执行的都是 delayedExecute
方法,其中 decorateTask
是一个钩子函数,其之类能够利用他对任务进行重构过滤等操做;
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); // 若是线程池已经关闭,则拒绝任务 else { super.getQueue().add(task); // 任务入队 if (isShutdown() && // 再次检查,线程池是否关闭 !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); // 确保有线程执行任务 } }
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 注意这里添加的是正值 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); // 注意这里添加的是负值 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
从上面代码能够看到 scheduleAtFixedRate
和 scheduleWithFixedDelay
只有周期任务的时间不一样,其余的都同样,那么下面咱们看一下他们的任务时间计算;
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } private void setNextRunTime() { long p = period; if (p > 0) // 正值表示 scheduleAtFixedRate time += p; // 无论任务执行时间,直接加上周期时间,也就是一次任务超时,会影响后续任务的执行, // 超时的时候,getDelay 是负值,因此在延迟队列中必然排在最前面,马上被取出执行 else time = triggerTime(-p); // 计算触发时间 } long triggerTime(long delay) { // 这里能够看到,每次的确是在当前时间的基础上,加上延迟时间; return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
这里特别要注意 scheduleAtFixedRate 一次任务超时,会持续影响后面的任务周期安排,因此在设定周期的时候要特别注意; 例如:
// 测试 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = random.nextInt(5) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS);
// 打印
[20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000
如图所示:
private volatile boolean continueExistingPeriodicTasksAfterShutdown; //关闭后继续执行周期任务,默认false private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //关闭后继续执行延迟任务,默认true private volatile boolean removeOnCancel = false; // 取消任务是,从队列中删除任务,默认 false @Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 继续延迟任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 继续周期任务 if (!keepDelayed && !keepPeriodic) { // 都是 false,直接清除 for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); }