并发编程之定时任务&定时线程池原理解析

点赞再看,养成习惯,公众号搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有个人系列文章。html

前言

线程池的具体实现有两种,分别是ThreadPoolExecutor 默认线程池和ScheduledThreadPoolExecutor 定时线程池,上一篇已经分析过ThreadPoolExecutor原理与使用了,本篇咱们来重点分析下ScheduledThreadPoolExecutor的原理与使用。java

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 线程池的概念有些区别,它是一个支持任务周期性调度的线程池。git

ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor,同时经过实现 ScheduledExecutorSerivce 来扩展基础线程池的功能,使其拥有了调度能力。其整个调度的核心在于内部类 DelayedWorkQueue ,一个有序的延时队列。github

定时线程池类的类结构图以下: 编程

ScheduledThreadPoolExecutor 的出现,很好的弥补了传统 Timer 的不足,具体对比看下表:数组

Timer ScheduledThreadPoolExecutor
线程 单线程 多线程
多任务 任务之间相互影响 任务之间不影响
调度时间 绝对时间 相对时间
异常 单任务异常,后续任务受影响 无影响

工做原理

它用来处理延时任务或定时任务 它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务的方式:markdown

  1. schedule,特定时间延时后执行一次任务
  2. scheduledAtFixedRate,固定周期执行任务(与任务执行时间无关,周期是固定的)
  3. scheduledWithFixedDelay,固定延时执行任务(与任务执行时间有关,延时从上一次任务完成后开始)

它采用 DelayedWorkQueue 存储等待的任务数据结构

  1. DelayedWorkQueue 内部封装了一个 PriorityQueue ,它会根据 time 的前后时间排序,若 time 相同则根据 sequenceNumber 排序;
  2. DelayedWorkQueue 也是一个无界队列;

由于前面讲阻塞队列实现的时候,已经对DelayedWorkQueue进行了说明,更多内容请查看《阻塞队列 — DelayedWorkQueue源码分析》多线程

工做线程的执行过程:并发

  • 工做线程会从DelayedWorkerQueue取已经到期的任务去执行;
  • 执行结束后从新设置任务的到期时间,再次放回DelayedWorkerQueue。

take方法是何时调用的呢? 在ThreadPoolExecutor中,getTask方法,工做线程会循环地从workQueue中取任务。但定时任务却不一样,由于若是一旦getTask方法取出了任务就开始执行了,而这时可能尚未到执行的时间,因此在take方法中,要保证只有在到指定的执行时间的时候任务才能够被取走。

PS:对于以上原理的理解,能够经过下面的源码分析加深印象。

源码分析

构造方法

ScheduledThreadPoolExecutor有四个构造形式:

public ScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    	new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    	new DelayedWorkQueue(), threadFactory, handler);
}
复制代码

固然咱们也可使用工具类Executors的newScheduledThreadPool的方法,快速建立。注意这里使用的DelayedWorkQueue

ScheduledThreadPoolExecutor没有提供带有最大线程数的构造函数的,默认是Integer.MAX_VALUE,说明其能够无限制的开启任意线程执行任务,在大量任务系统,应注意这一点,避免内存溢出。

核心方法

核心方法主要介绍ScheduledThreadPoolExecutor的调度方法,其余方法与 ThreadPoolExecutor 一致。调度方法均由 ScheduledExecutorService 接口定义:

public interface ScheduledExecutorService extends ExecutorService {
    // 特定时间延时后执行一次Runnable
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 特定时间延时后执行一次Callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    // 固定周期执行任务(与任务执行时间无关,周期是固定的)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
     // 固定延时执行任务(与任务执行时间有关,延时从上一次任务完成后开始)
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}
复制代码

咱们再来看一下接口的实现,具体是怎么来实现线程池任务的提交。由于最终都回调用 delayedExecute 提交任务。因此,咱们这里只分析schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。源代码以下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    //参数校验
    if (command == null || unit == null)
        throw new NullPointerException();
    //这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
    //而后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
   //包装好任务之后,就进行提交了
	delayedExecute(t);
    return t;
}
复制代码

delayedExecute 任务提交方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //若是线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
	if (isShutdown())
        reject(task);
    else {
		//与ThreadPoolExecutor不一样,这里直接把任务加入延迟队列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
		//若是当前状态没法执行任务,则取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
       		//这里是增长一个worker线程,避免提交的任务没有worker去执行
        	//缘由就是该类没有像ThreadPoolExecutor同样,woker满了才放入队列
          	ensurePrestart();
    }
}
复制代码

咱们能够看到提交到线程池的任务都包装成了 ScheduledFutureTask,继续往下咱们再来研究下。 造方

ScheduledFutureTask

从ScheduledFutureTask类的定义能够看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的私有内部类,继承了FutureTask类,并实现了RunnableScheduledFuture接口。也就是说,ScheduledFutureTask具备FutureTask类的全部功能,并实现了RunnableScheduledFuture接口的全部方法。ScheduledFutureTask类的定义以下所示:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> 复制代码

ScheduledFutureTask类继承图以下:

成员变量

SchduledFutureTask接收的参数(成员变量):

// 任务开始的时间
private long time;
// 任务添加到ScheduledThreadPoolExecutor中被分配的惟一序列号
private final long sequenceNumber;
// 任务执行的时间间隔
private final long period;
//ScheduledFutureTask对象,实际指向当前对象自己
RunnableScheduledFuture<V> outerTask = this;
//当前任务在延迟队列中的索引,可以更加方便的取消当前任务
int heapIndex;
复制代码

解析

  • sequenceNumber:任务添加到ScheduledThreadPoolExecutor中被分配的惟一序列号,能够根据这个序列号肯定惟一的一个任务,若是在定时任务中,若是一个任务是周期性执行的,可是它们的sequenceNumber的值相同,则被视为是同一个任务。
  • time:下一次执行任务的时间。
  • period:任务的执行周期。
  • outerTask:ScheduledFutureTask对象,实际指向当前对象自己。此对象的引用会被传入到周期性执行任务的ScheduledThreadPoolExecutor类的reExecutePeriodic方法中。
  • heapIndex:当前任务在延迟队列中的索引,这个索引可以更加方便的取消当前任务。

构造方法

ScheduledFutureTask类继承了FutureTask类,并实现了RunnableScheduledFuture接口。在ScheduledFutureTask类中提供了以下构造方法。

ScheduledFutureTask(Runnable r, V result, long ns) {
	super(r, result);
	this.time = ns;
	this.period = 0;
	this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
	super(r, result);
	this.time = ns;
	this.period = period;
	this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
	super(callable);
	this.time = ns;
	this.period = 0;
	this.sequenceNumber = sequencer.getAndIncrement();
}
复制代码

FutureTask的构造方法以下:

public FutureTask(Runnable runnable, V result) {
	this.callable = Executors.callable(runnable, result);
	this.state = NEW;       // ensure visibility of callable
}
复制代码

经过源码能够看到,在ScheduledFutureTask类的构造方法中,首先会调用FutureTask类的构造方法为FutureTask类的callable和state成员变量赋值,接下来为ScheduledFutureTask类的time、period和sequenceNumber成员变量赋值。理解起来比较简单。

getDelay方法

咱们先来看getDelay方法的源码,以下所示:

//获取下次执行任务的时间距离当前时间的纳秒数
public long getDelay(TimeUnit unit) {
	return unit.convert(time - now(), NANOSECONDS);
}
复制代码

getDelay方法比较简单,主要用来获取下次执行任务的时间距离当前系统时间的纳秒数。

compareTo方法

ScheduledFutureTask类在类的结构上实现了Comparable接口,compareTo方法主要是对Comparable接口定义的compareTo方法的实现。源码以下所示:

public int compareTo(Delayed other) {
	if (other == this) 
		return 0;
	if (other instanceof ScheduledFutureTask) {
		ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
		long diff = time - x.time;
		if (diff < 0)
			return -1;
		else if (diff > 0)
			return 1;
		else if (sequenceNumber < x.sequenceNumber)
			return -1;
		else
			return 1;
	}
	long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
	return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
复制代码

这段代码看上去好像是对各类数值类型数据的比较,本质上是对延迟队列中的任务进行排序。排序规则为:

  • 首先比较延迟队列中每一个任务下次执行的时间,下次执行时间距离当前时间短的任务会排在前面;
  • 若是下次执行任务的时间相同,则会比较任务的sequenceNumber值,sequenceNumber值小的任务会排在前面。

isPeriodic方法

isPeriodic方法的源代码以下所示:

//判断是不是周期性任务
public boolean isPeriodic() {
	return period != 0;
}
复制代码

这个方法主要是用来判断当前任务是不是周期性任务。这里只要判断运行任务的执行周期不等于0就能肯定为周期性任务了。由于不管period的值是大于0仍是小于0,当前任务都是周期性任务。

setNextRunTime方法

setNextRunTime方法的做用主要是设置当前任务下次执行的时间,源码以下所示:

private void setNextRunTime() {
	long p = period;
	//固定频率,上次执行任务的时间加上任务的执行周期
	if (p > 0)
		time += p;
	//相对固定的延迟执行,当前系统时间加上任务的执行周期
	else
		time = triggerTime(-p);
}
复制代码

这里再一次证实了使用isPeriodic方法判断当前任务是否为周期性任务时,只要判断period的值是否不等于0就能够了。

  • 由于若是当前任务时固定频率执行的周期性任务,会将周期period看成正数来处理;
  • 若是是相对固定的延迟执行当前任务,则会将周期period看成负数来处理。

这里咱们看到在setNextRunTime方法中,调用了ScheduledThreadPoolExecutor类的triggerTime方法。接下来咱们看下triggerTime方法的源码。

ScheduledThreadPoolExecutor类的triggerTime方法

triggerTime方法用于获取延迟队列中的任务下一次执行的具体时间。源码以下所示。

private long triggerTime(long delay, TimeUnit unit) {
	return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
	return now() +
		((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
复制代码

这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点须要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,若是小于Long.MAX_VALUE值的一半,则直接返回delay,不然须要处理溢出的状况。

咱们看到在triggerTime方法中处理防止溢出的逻辑使用了ScheduledThreadPoolExecutor类的overflowFree方法,接下来,咱们就看看ScheduledThreadPoolExecutor类的overflowFree方法的实现。

ScheduledThreadPoolExecutor类的overflowFree方法

overflowFree方法的源代码以下所示:

private long overflowFree(long delay) {
	//获取队列中的节点
	Delayed head = (Delayed) super.getQueue().peek();
	//获取的节点不为空,则进行后续处理
	if (head != null) {
		//从队列节点中获取延迟时间
		long headDelay = head.getDelay(NANOSECONDS);
		//若是从队列中获取的延迟时间小于0,而且传递的delay
		//值减去从队列节点中获取延迟时间小于0
		if (headDelay < 0 && (delay - headDelay < 0))
			//将delay的值设置为Long.MAX_VALUE + headDelay
			delay = Long.MAX_VALUE + headDelay;
	}
	//返回延迟时间
	return delay;
}
复制代码

经过对overflowFree方法的源码分析,能够看出overflowFree方法本质上就是为了限制队列中的全部节点的延迟时间在Long.MAX_VALUE值以内,防止在compareTo方法中溢出。

cancel方法

cancel方法的做用主要是取消当前任务的执行,源码以下所示:

public boolean cancel(boolean mayInterruptIfRunning) {
	//取消任务,返回任务是否取消的标识
	boolean cancelled = super.cancel(mayInterruptIfRunning);
	//若是任务已经取消
	//而且须要将任务从延迟队列中删除
	//而且任务在延迟队列中的索引大于或者等于0
	if (cancelled && removeOnCancel && heapIndex >= 0)
		//将当前任务从延迟队列中删除
		remove(this);
	//返回是否成功取消任务的标识
	return cancelled;
}
复制代码

这段代码理解起来相对比较简单,首先调用取消任务的方法,并返回任务是否已经取消的标识。若是任务已经取消,而且须要移除任务,同时,任务在延迟队列中的索引大于或者等于0,则将当前任务从延迟队列中移除。最后返回任务是否成功取消的标识。

run方法

run方法能够说是ScheduledFutureTask类的核心方法,是对Runnable接口的实现,源码以下所示:

public void run() {
	//当前任务是不是周期性任务
	boolean periodic = isPeriodic();
	//线程池当前运行状态下不能执行周期性任务
	if (!canRunInCurrentRunState(periodic))
		//取消任务的执行
		cancel(false);
	//若是不是周期性任务
	else if (!periodic)
		//则直接调用FutureTask类的run方法执行任务
		ScheduledFutureTask.super.run();
	//若是是周期性任务,则调用FutureTask类的runAndReset方法执行任务
	//若是任务执行成功
	else if (ScheduledFutureTask.super.runAndReset()) {
		//设置下次执行任务的时间
		setNextRunTime();
		//重复执行任务
		reExecutePeriodic(outerTask);
	}
}
复制代码

整理一下方法的逻辑:

  1. 首先判断当前任务是不是周期性任务。若是线程池当前运行状态下不能执行周期性任务,则取消任务的执行,不然执行步骤2;
  2. 若是当前任务不是周期性任务,则直接调用FutureTask类的run方法执行任务,会设置执行结果,而后直接返回,不然执行步骤3;
  3. 若是当前任务是周期性任务,则调用FutureTask类的runAndReset方法执行任务,不会设置执行结果,而后直接返回,不然执行步骤4;
  4. 若是任务执行成功,则设置下次执行任务的时间,同时,将任务设置为重复执行。

这里,调用了FutureTask类的run方法和runAndReset方法,而且调用了ScheduledThreadPoolExecutor类的reExecutePeriodic方法。接下来,咱们分别看下这些方法的实现。

FutureTask类的run方法

FutureTask类的run方法源码以下所示:

public void run() {
    //状态若是不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
    //状态若是是NEW,则尝试把当前执行线程保存在runner字段中
    //若是赋值失败则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //任务异常
                setException(ex);
            }
            if (ran)
                //任务正常执行完毕
                set(result);
        }
    } finally {

        runner = null;
        int s = state;
        //若是任务被中断,执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
复制代码

代码的总体逻辑为:

  • 判断当前任务的state是否等于NEW,若是不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回;
  • 若是状态为NEW则接着会经过unsafe类把任务执行线程引用CAS的保存在runner字段中,若是保存失败,则直接返回;
  • 执行任务;若是任务执行发生异常,则调用setException()方法保存异常信息。

FutureTask类的runAndReset方法

方法的源码以下所示:

protected boolean runAndReset() {
	if (state != NEW ||
		!UNSAFE.compareAndSwapObject(this, runnerOffset,
									 null, Thread.currentThread()))
		return false;
	boolean ran = false;
	int s = state;
	try {
		Callable<V> c = callable;
		if (c != null && s == NEW) {
			try {
				c.call(); // don't set result
				ran = true;
			} catch (Throwable ex) {
				setException(ex);
			}
		}
	} finally {
		// runner must be non-null until state is settled to
		// prevent concurrent calls to run()
		runner = null;
		// state must be re-read after nulling runner to prevent
		// leaked interrupts
		s = state;
		if (s >= INTERRUPTING)
			handlePossibleCancellationInterrupt(s);
	}
	return ran && s == NEW;
}
复制代码

FutureTask类的runAndReset方法与run方法的逻辑基本相同,只是runAndReset方法会重置当前任务的执行状态。

ScheduledThreadPoolExecutor类的reExecutePeriodic方法

reExecutePeriodic重复执行任务方法,源代码以下所示:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
	//线程池当前状态下可以执行任务
	if (canRunInCurrentRunState(true)) {
		//与ThreadPoolExecutor不一样,这里直接把任务加入延迟队列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
		//线程池当前状态下不能执行任务,而且成功移除任务
		if (!canRunInCurrentRunState(true) && remove(task))
			//取消任务
			task.cancel(false);
		else
			//这里是增长一个worker线程,避免提交的任务没有worker去执行
            //缘由就是该类没有像ThreadPoolExecutor同样,woker满了才放入队列 
			ensurePrestart();
	}
}
复制代码

整体来讲reExecutePeriodic方法的逻辑比较简单,须要注意的是:调用reExecutePeriodic方法的时候已经执行过一次任务,因此,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务必定是周期性的任务。

DelayedWorkQueue

ScheduledThreadPoolExecutor之因此要本身实现阻塞的工做队列,是由于 ScheduleThreadPoolExecutor 要求的工做队列有些特殊。

DelayedWorkQueue是一个基于堆的数据结构,相似于DelayQueue和PriorityQueue。在执行定时任务的时候,每一个任务的执行时间都不一样,因此DelayedWorkQueue的工做就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并非绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不必定是顺序的)。

堆结构以下图: 可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可使用数组表示,能够转换成以下的数组: 在这种结构中,能够发现有以下特性: 假设“第一个元素” 在数组中的索引为 0 的话,则父结点和子结点的位置关系以下:

  • 索引为  的左孩子的索引是  ( 2 i + 1 ) (2 * i + 1)
  • 索引为  的右孩子的索引是  ( 2 i + 2 ) (2 * i + 2)
  • 索引为  的父结点的索引是  f l o o r ( ( i 1 ) / 2 ) floor((i-1)/2)

为何要使用DelayedWorkQueue呢?

  • 定时任务执行时须要取出最近要执行的任务,因此任务在队列中每次出队时必定要是当前队列中执行时间最靠前的,因此天然要使用优先级队列。
  • DelayedWorkQueue是一个优先级队列,它能够保证每次出队的任务都是当前队列中执行时间最靠前的,因为它是基于堆结构的队列,堆结构在执行插入和删除操做时的最坏时间复杂度是 O(logN)

由于前面讲阻塞队列实现的时候,已经对DelayedWorkQueue进行了说明,更多内容请查看《阻塞队列 — DelayedWorkQueue源码分析》

总结

  1. 与Timer执行定时任务比较,相比Timer,ScheduledThreadPoolExecutor有说明优势?(文章前面分析过)
  2. ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它也是一个线程池,也有 coorPoolSize 和 workQueue,可是 ScheduledThreadPoolExecutor特殊的地方在于,本身实现了优先工做队列 DelayedWorkQueue ;
  3. ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService,因此就有了任务调度的方法,如 schedule 、 scheduleAtFixedRate  、 scheduleWithFixedDelay ,同时注意他们之间的区别;
  4. 内部类 ScheduledFutureTask 继承者FutureTask,实现了任务的异步执行而且能够获取返回结果。同时实现了Delayed接口,能够经过getDelay方法获取将要执行的时间间隔;
  5. 周期任务的执行实际上是调用了FutureTask的 runAndReset 方法,每次执行完不设置结果和状态。
  6.  DelayedWorkQueue的数据结构,它是一个基于最小堆结构的优先队列,而且每次出队时可以保证取出的任务是当前队列中下次执行时间最小的任务。同时注意一下优先队列中堆的顺序,堆中的顺序并非绝对的,但要保证子节点的值要比父节点的值要大,这样就不会影响出队的顺序。

整体来讲,ScheduedThreadPoolExecutor的重点是要理解下次执行时间的计算,以及优先队列的出队、入队和删除的过程,这两个是理解ScheduedThreadPoolExecutor的关键。

PS:以上代码提交在 Githubgithub.com/Niuh-Study/…

PS:这里有一个技术交流群(扣扣群:1158819530),方便你们一块儿交流,持续学习,共同进步,有须要的能够加一下。

文章持续更新,能够公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

相关文章
相关标签/搜索