Java多线程进阶(四一)—— J.U.C之executors框架:ScheduledThreadPoolExecutor

图片描述

本文首发于一世流云专栏: https://segmentfault.com/blog...

1、ScheduledThreadPoolExecutor简介

executors框架概述一节中,咱们曾经提到过一种可对任务进行延迟/周期性调度的执行器(Executor),这类Executor通常实现了ScheduledExecutorService这个接口。ScheduledExecutorService在普通执行器接口(ExecutorService)的基础上引入了Future模式,使得能够限时或周期性地调度任务。java

ScheduledThreadPoolExecutor的类继承关系以下图,该图中除了本节要讲解的ScheduledThreadPoolExecutor外,其它部分已经在前2节详细介绍过了:segmentfault

clipboard.png

从上图中能够看到,ScheduledThreadPoolExecutor实际上是继承了ThreadPoolExecutor这个普通线程池,咱们知道ThreadPoolExecutor中提交的任务都是实现了Runnable接口,可是ScheduledThreadPoolExecutor比较特殊,因为要知足任务的延迟/周期调度功能,它会对全部的Runnable任务都进行包装,包装成一个RunnableScheduledFuture任务。设计模式

clipboard.png

RunnableScheduledFuture是Future模式中的一个接口,关于Future模式,咱们后续会专门章节讲解,这里只要知道RunnableScheduledFuture的做用就是能够异步地执行【延时/周期任务】。

另外,咱们知道在ThreadPoolExecutor中,须要指定一个阻塞队列做为任务队列。ScheduledThreadPoolExecutor中也同样,不过特殊的是,ScheduledThreadPoolExecutor中的任务队列是一种特殊的延时队列(DelayQueue)。多线程

咱们曾经在juc-collections框架中,分析过该种阻塞队列,DelayQueue底层基于优先队列(PriorityQueue)实现,是一种“堆”结构,经过该种阻塞队列能够实现任务的延迟到期执行(即每次从队列获取的任务都是最早到期的任务)。框架

ScheduledThreadPoolExecutor在内部定义了DelayQueue的变种——DelayedWorkQueue,它和DelayQueue相似,只不过要求全部入队元素必须实现RunnableScheduledFuture接口。异步

2、ScheduledThreadPoolExecutor基本原理

构造线程池

咱们先来看下ScheduledThreadPoolExecutor的构造,其实在executors框架概述中讲Executors时已经接触过了,Executors使用newScheduledThreadPool工厂方法建立ScheduledThreadPoolExecutor:性能

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

咱们来看下ScheduledThreadPoolExecutor的构造器,内部其实都是调用了父类ThreadPoolExecutor的构造器,这里最须要注意的就是任务队列的选择——DelayedWorkQueue,咱们后面会详细介绍它的实现原理。this

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

线程池的调度

ScheduledThreadPoolExecutor的核心调度方法是schedulescheduleAtFixedRatescheduleWithFixedDelay,咱们经过schedule方法来看下整个调度流程:spa

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,
            triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

上述的decorateTask方法把Runnable任务包装成ScheduledFutureTask,用户能够根据本身的须要覆写该方法:线程

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
注意: ScheduledFutureTask是RunnableScheduledFuture接口的实现类,任务经过 period字段来表示任务类型
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
 
    /**
     * 任务序号, 自增惟一
     */
    private final long sequenceNumber;
 
    /**
     * 首次执行的时间点
     */
    private long time;
 
    /**
     * 0: 非周期任务
     * >0: fixed-rate任务
     * <0: fixed-delay任务
     */
    private final long period;
 
    /**
     * 在堆中的索引
     */
    int heapIndex;
 
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // ...
}
ScheduledThreadPoolExecutor中的任务队列—— DelayedWorkQueue,保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一种 堆结构,time最小的任务会排在堆顶(表示最先过时),每次出队都是取堆顶元素,这样最快到期的任务就会被先执行。若是两个ScheduledFutureTask的time相同,就比较它们的序号——sequenceNumber,序号小的表明先被提交,因此就会先执行。

schedule的核心是其中的delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())   // 线程池已关闭
        reject(task);   // 任务拒绝策略
    else {
        super.getQueue().add(task);                 // 将任务入队
 
        // 若是线程池已关闭且该任务是非周期任务, 则将其从队列移除
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);  // 取消任务
        else
            ensurePrestart();   // 添加一个工做线程
    }
}

经过delayedExecute能够看出,ScheduledThreadPoolExecutor的整个任务调度流程大体以下图:

clipboard.png

咱们来分析这个过程:

  1. 首先,任务被提交到线程池后,会判断线程池的状态,若是不是RUNNING状态会执行拒绝策略。
  2. 而后,将任务添加到阻塞队列中。(注意,因为DelayedWorkQueue是无界队列,因此必定会add成功)
  3. 而后,会建立一个工做线程,加入到核心线程池或者非核心线程池:

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    经过ensurePrestart能够看到,若是核心线程池未满,则新建的工做线程会被放到核心线程池中。若是核心线程池已经满了,ScheduledThreadPoolExecutor不会像ThreadPoolExecutor那样再去建立归属于非核心线程池的工做线程,而是直接返回。也就是说,在ScheduledThreadPoolExecutor中,一旦核心线程池满了,就不会再去建立工做线程。

这里思考一点,何时会执行else if (wc == 0)建立一个归属于非核心线程池的工做线程?
答案是,当经过setCorePoolSize方法设置核心线程池大小为0时,这里必需要保证任务可以被执行,因此会建立一个工做线程,放到非核心线程池中。

最后,线程池中的工做线程会去任务队列获取任务并执行,当任务被执行完成后,若是该任务是周期任务,则会重置time字段,并从新插入队列中,等待下次执行。这里注意从队列中获取元素的方法:

  • 对于核心线程池中的工做线程来讲,若是没有超时设置(allowCoreThreadTimeOut == false),则会使用阻塞方法take获取任务(由于没有超时限制,因此会一直等待直到队列中有任务);若是设置了超时,则会使用poll方法(方法入参须要超时时间),超时还没拿到任务的话,该工做线程就会被回收。
  • 对于非工做线程来讲,都是调用poll获取队列元素,超时取不到任务就会被回收。



上述就是ScheduledThreadPoolExecutor的核心调度流程,经过咱们的分析能够看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有如下几点不一样:

  1. 整体的调度控制流程略有区别;
  2. 任务的执行方式有所区别;
  3. 任务队列的选择不一样。

最后,咱们来看下ScheduledThreadPoolExecutor中的延时队列——DelayedWorkQueue

延时队列

DelayedWorkQueue,该队列和已经介绍过的DelayQueue区别不大,只不过队列元素是RunnableScheduledFuture:

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private int size = 0;
 
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
 
    private Thread leader = null;
 
    // ...
}

DelayedWorkQueue是一个无界队列,在队列元素满了之后会自动扩容,它并无像DelayQueue那样,将队列操做委托给PriorityQueue,而是本身从新实现了一遍堆的核心操做——上浮、下沉。我这里再也不赘述这些堆操做,读者能够参考PriorityBlockingQueue自行阅读源码。

咱们关键来看下addtakepoll这三个队列方法,由于ScheduledThreadPoolExecutor的核心调度流程中使用到了这三个方法:

public boolean add(Runnable e) {
    return offer(e);
}
 
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}

add、offer内部都调用了下面这个方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;           // 队列已满, 扩容
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);       // 堆上浮操做
        }
        
        if (queue[0] == e) {    // 当前元素是首个元素
            leader = null;
            available.signal(); // 唤醒一个等待线程
        }   
    } finally {
        lock.unlock();
    }
    return true;
}

take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)          // 队列为空
                available.await();      // 等待元素入队
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)         // 元素已到期
                    return finishPoll(first);
 
                // 执行到此处, 说明队首元素还未到期
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // 当前线程成功leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

注意:上述leader表示一个等待获取队首元素的出队线程,这是一种称为“Leader-Follower pattern”的多线程设计模式(读者能够参考DelayQueue中的讲解)。

每次出队元素时,若是队列为空或者队首元素还未到期,线程就会在condition条件队列等待。通常的思路是无限等待,直到出现一个入队线程,入队元素后将一个出队线程唤醒。
为了提高性能,当队列非空时,用 leader保存第一个到来并尝试出队的线程,并设置它的等待时间为队首元素的剩余期限,这样当元素过时后,线程也就本身唤醒了,不须要入队线程唤醒。这样作的好处就是提高一些性能。

3、总结

本节介绍了ScheduledThreadPoolExecutor,它是对普通线程池ThreadPoolExecutor的扩展,增长了延时调度、周期调度任务的功能。归纳下ScheduledThreadPoolExecutor的主要特色:

  1. 对Runnable任务进行包装,封装成ScheduledFutureTask,该类任务支持任务的周期执行、延迟执行;
  2. 采用DelayedWorkQueue做为任务队列。该队列是无界队列,因此任务必定能添加成功,可是当工做线程尝试从队列取任务执行时,只有最早到期的任务会出队,若是没有任务或者队首任务未到期,则工做线程会阻塞;
  3. ScheduledThreadPoolExecutor的任务调度流程与ThreadPoolExecutor略有区别,最大的区别就是,先往队列添加任务,而后建立工做线程执行任务。

另外,maximumPoolSize这个参数对ScheduledThreadPoolExecutor其实并无做用,由于除非把corePoolSize设置为0,这种状况下ScheduledThreadPoolExecutor只会建立一个属于非核心线程池的工做线程;不然,ScheduledThreadPoolExecutor只会新建归属于核心线程池的工做线程,一旦核心线程池满了,就再也不新建工做线程。

相关文章
相关标签/搜索