本文首发于一世流云专栏: https://segmentfault.com/blog...
在executors框架概述一节中,咱们曾经提到过一种可对任务进行延迟/周期性调度的执行器(Executor),这类Executor通常实现了ScheduledExecutorService这个接口。ScheduledExecutorService在普通执行器接口(ExecutorService)的基础上引入了Future模式,使得能够限时或周期性地调度任务。java
ScheduledThreadPoolExecutor
的类继承关系以下图,该图中除了本节要讲解的ScheduledThreadPoolExecutor外,其它部分已经在前2节详细介绍过了:segmentfault
从上图中能够看到,ScheduledThreadPoolExecutor实际上是继承了ThreadPoolExecutor这个普通线程池,咱们知道ThreadPoolExecutor中提交的任务都是实现了Runnable接口,可是ScheduledThreadPoolExecutor比较特殊,因为要知足任务的延迟/周期调度功能,它会对全部的Runnable任务都进行包装,包装成一个RunnableScheduledFuture
任务。设计模式
RunnableScheduledFuture是Future模式中的一个接口,关于Future模式,咱们后续会专门章节讲解,这里只要知道RunnableScheduledFuture的做用就是能够异步地执行【延时/周期任务】。
另外,咱们知道在ThreadPoolExecutor中,须要指定一个阻塞队列做为任务队列。ScheduledThreadPoolExecutor中也同样,不过特殊的是,ScheduledThreadPoolExecutor中的任务队列是一种特殊的延时队列(DelayQueue)。多线程
咱们曾经在juc-collections框架中,分析过该种阻塞队列,DelayQueue底层基于优先队列(PriorityQueue)实现,是一种“堆”结构,经过该种阻塞队列能够实现任务的延迟到期执行(即每次从队列获取的任务都是最早到期的任务)。框架
ScheduledThreadPoolExecutor在内部定义了DelayQueue的变种——DelayedWorkQueue
,它和DelayQueue相似,只不过要求全部入队元素必须实现RunnableScheduledFuture接口。异步
咱们先来看下ScheduledThreadPoolExecutor的构造,其实在executors框架概述中讲Executors时已经接触过了,Executors使用newScheduledThreadPool
工厂方法建立ScheduledThreadPoolExecutor:性能
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
咱们来看下ScheduledThreadPoolExecutor的构造器,内部其实都是调用了父类ThreadPoolExecutor的构造器,这里最须要注意的就是任务队列的选择——DelayedWorkQueue,咱们后面会详细介绍它的实现原理。this
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
ScheduledThreadPoolExecutor的核心调度方法是schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
,咱们经过schedule方法来看下整个调度流程:spa
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
上述的decorateTask方法把Runnable任务包装成ScheduledFutureTask,用户能够根据本身的须要覆写该方法:线程
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return task; }
注意:
ScheduledFutureTask是RunnableScheduledFuture接口的实现类,任务经过
period
字段来表示任务类型
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** * 任务序号, 自增惟一 */ private final long sequenceNumber; /** * 首次执行的时间点 */ private long time; /** * 0: 非周期任务 * >0: fixed-rate任务 * <0: fixed-delay任务 */ private final long period; /** * 在堆中的索引 */ int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // ... }
ScheduledThreadPoolExecutor中的任务队列—— DelayedWorkQueue,保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一种 堆结构,time最小的任务会排在堆顶(表示最先过时),每次出队都是取堆顶元素,这样最快到期的任务就会被先执行。若是两个ScheduledFutureTask的time相同,就比较它们的序号——sequenceNumber,序号小的表明先被提交,因此就会先执行。
schedule的核心是其中的delayedExecute方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) // 线程池已关闭 reject(task); // 任务拒绝策略 else { super.getQueue().add(task); // 将任务入队 // 若是线程池已关闭且该任务是非周期任务, 则将其从队列移除 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 取消任务 else ensurePrestart(); // 添加一个工做线程 } }
经过delayedExecute能够看出,ScheduledThreadPoolExecutor的整个任务调度流程大体以下图:
咱们来分析这个过程:
而后,会建立一个工做线程,加入到核心线程池或者非核心线程池:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
经过ensurePrestart能够看到,若是核心线程池未满,则新建的工做线程会被放到核心线程池中。若是核心线程池已经满了,ScheduledThreadPoolExecutor不会像ThreadPoolExecutor那样再去建立归属于非核心线程池的工做线程,而是直接返回。也就是说,在ScheduledThreadPoolExecutor中,一旦核心线程池满了,就不会再去建立工做线程。
这里思考一点,何时会执行else if (wc == 0)建立一个归属于非核心线程池的工做线程?
答案是,当经过setCorePoolSize方法设置核心线程池大小为0时,这里必需要保证任务可以被执行,因此会建立一个工做线程,放到非核心线程池中。
最后,线程池中的工做线程会去任务队列获取任务并执行,当任务被执行完成后,若是该任务是周期任务,则会重置time字段,并从新插入队列中,等待下次执行。这里注意从队列中获取元素的方法:
allowCoreThreadTimeOut == false
),则会使用阻塞方法take获取任务(由于没有超时限制,因此会一直等待直到队列中有任务);若是设置了超时,则会使用poll方法(方法入参须要超时时间),超时还没拿到任务的话,该工做线程就会被回收。
上述就是ScheduledThreadPoolExecutor的核心调度流程,经过咱们的分析能够看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有如下几点不一样:
最后,咱们来看下ScheduledThreadPoolExecutor中的延时队列——DelayedWorkQueue。
DelayedWorkQueue,该队列和已经介绍过的DelayQueue区别不大,只不过队列元素是RunnableScheduledFuture:
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private int size = 0; private final ReentrantLock lock = new ReentrantLock(); private final Condition available = lock.newCondition(); private Thread leader = null; // ... }
DelayedWorkQueue是一个无界队列,在队列元素满了之后会自动扩容,它并无像DelayQueue那样,将队列操做委托给PriorityQueue,而是本身从新实现了一遍堆的核心操做——上浮、下沉。我这里再也不赘述这些堆操做,读者能够参考PriorityBlockingQueue自行阅读源码。
咱们关键来看下add
、take
、poll
这三个队列方法,由于ScheduledThreadPoolExecutor的核心调度流程中使用到了这三个方法:
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); }
add、offer内部都调用了下面这个方法:
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 队列已满, 扩容 if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); // 堆上浮操做 } if (queue[0] == e) { // 当前元素是首个元素 leader = null; available.signal(); // 唤醒一个等待线程 } } finally { lock.unlock(); } return true; }
take方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) // 队列为空 available.await(); // 等待元素入队 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 元素已到期 return finishPoll(first); // 执行到此处, 说明队首元素还未到期 first = null; if (leader != null) available.await(); else { // 当前线程成功leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
注意:上述leader表示一个等待获取队首元素的出队线程,这是一种称为“Leader-Follower pattern”的多线程设计模式(读者能够参考DelayQueue中的讲解)。
每次出队元素时,若是队列为空或者队首元素还未到期,线程就会在condition条件队列等待。通常的思路是无限等待,直到出现一个入队线程,入队元素后将一个出队线程唤醒。
为了提高性能,当队列非空时,用leader
保存第一个到来并尝试出队的线程,并设置它的等待时间为队首元素的剩余期限,这样当元素过时后,线程也就本身唤醒了,不须要入队线程唤醒。这样作的好处就是提高一些性能。
本节介绍了ScheduledThreadPoolExecutor,它是对普通线程池ThreadPoolExecutor的扩展,增长了延时调度、周期调度任务的功能。归纳下ScheduledThreadPoolExecutor的主要特色:
ScheduledFutureTask
,该类任务支持任务的周期执行、延迟执行;DelayedWorkQueue
做为任务队列。该队列是无界队列,因此任务必定能添加成功,可是当工做线程尝试从队列取任务执行时,只有最早到期的任务会出队,若是没有任务或者队首任务未到期,则工做线程会阻塞;ScheduledThreadPoolExecutor
的任务调度流程与ThreadPoolExecutor略有区别,最大的区别就是,先往队列添加任务,而后建立工做线程执行任务。另外,maximumPoolSize
这个参数对ScheduledThreadPoolExecutor其实并无做用,由于除非把corePoolSize设置为0,这种状况下ScheduledThreadPoolExecutor只会建立一个属于非核心线程池的工做线程;不然,ScheduledThreadPoolExecutor只会新建归属于核心线程池的工做线程,一旦核心线程池满了,就再也不新建工做线程。