最近新接手的项目里大量使用了ScheduledThreadPoolExecutor类去执行一些定时任务,以前一直没有机会研究这个类的源码,此次趁着机会好好研读一下。面试
原文地址:www.jianshu.com/p/18f4c95ac…bash
该类主要仍是基于ThreadPoolExecutor类进行二次开发,因此对Java线程池执行过程还不了解的同窗建议先看看我以前的文章。
当面试官问线程池时,你应该知道些什么?数据结构
与ThreadPoolExecutor不一样,向ScheduledThreadPoolExecutor中提交任务的时候,任务被包装成ScheduledFutureTask对象加入延迟队列并启动一个woker线程。函数
用户提交的任务加入延迟队列时,会按照执行时间进行排列,也就是说队列头的任务是须要最先执行的。而woker线程会从延迟队列中获取任务,若是已经到了任务的执行时间,则开始执行。不然阻塞等待剩余延迟时间后再尝试获取任务。优化
任务执行完成之后,若是该任务是一个须要周期性反复执行的任务,则计算好下次执行的时间后会从新加入到延迟队列中。ui
首先看下ScheduledThreadPoolExecutor类的几个构造函数:this
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}复制代码
注:这里构造函数都是使用super,其实就是ThreadPoolExecutor的构造函数
这里有三点须要注意:spa
再次说明:上面三点的理解须要先了解ThreadPoolExecutor的知识点。线程
当咱们建立出一个调度线程池之后,就能够开始提交任务了。这里依次分析一下三个经常使用API的源码:rest
首先是schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//参数校验
if (command == null || unit == null)
throw new NullPointerException();
//这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
//而后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//包装好任务之后,就进行提交了
delayedExecute(t);
return t;
}复制代码
重点看一下提交任务的源码:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//若是线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
//与ThreadPoolExecutor不一样,这里直接把任务加入延迟队列
super.getQueue().add(task);
//若是当前状态没法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//这里是增长一个worker线程,避免提交的任务没有worker去执行
//缘由就是该类没有像ThreadPoolExecutor同样,woker满了才放入队列
ensurePrestart();
}
}复制代码
这里的关键点其实就是super.getQueue().add(task)行代码,ScheduledThreadPoolExecutor类在内部本身实现了一个基于堆数据结构的延迟队列。add方法最终会落到offer方法中,一块儿看下:
public boolean offer(Runnable x) {
//参数校验
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看当前元素数量,若是大于队列长度则进行扩容
int i = size;
if (i >= queue.length)
grow();
//元素数量加1
size = i + 1;
//若是当前队列尚未元素,则直接加入头部
if (i == 0) {
queue[0] = e;
//记录索引
setIndex(e, 0);
} else {
//把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
//把须要最先执行的任务放在前面
siftUp(i, e);
}
//若是新加入的元素就是队列头,这里有两种状况
//1.这是用户提交的第一个任务
//2.新任务进行堆调整之后,排在队列头
if (queue[0] == e) {
//这个变量起优化做用,后面说
leader = null;
//加入元素之后,唤醒worker线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}复制代码
经过上面的逻辑,咱们把提交的任务成功加入到了延迟队列中,前面说了加入任务之后会开启一个woker线程,该线程的任务就是从延迟队列中不断取出任务执行。这些都是跟ThreadPoolExecutor相同的,咱们看下从该延迟队列中获取元素的源码:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//取出队列中第一个元素,即最先须要执行的任务
RunnableScheduledFuture<?> first = queue[0];
//若是队列为空,则阻塞等待加入元素时唤醒
if (first == null)
available.await();
else {
//计算任务执行时间,这个delay是当前时间减去任务触发时间
long delay = first.getDelay(NANOSECONDS);
//若是到了触发时间,则执行出队操做
if (delay <= 0)
return finishPoll(first);
first = null;
//这里表示该任务已经分配给了其余线程,当前线程等待唤醒就能够
if (leader != null)
available.await();
else {
//不然把给任务分配给当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//当前线程等待任务剩余延迟时间
available.awaitNanos(delay);
} finally {
//这里线程醒来之后,何时leader会发生变化呢?
//就是上面的添加任务的时候
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//若是队列不为空,则唤醒其余woker线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}复制代码
这里为何会加入一个leader变量来分配阻塞队列中的任务呢?缘由是要减小没必要要的时间等待。好比说如今队列中的第一个任务1分钟后执行,那么用户提交新的任务时会不断的加入woker线程,若是新提交的任务都排在队列后面,也就是说新的woker如今都会取出这第一个任务进行执行延迟时间的等待,当该任务到触发时间时,会唤醒不少woker线程,这显然是没有必要的。
当任务被woker线程取出之后,会执行run方法,因为此时任务已经被包装成了ScheduledFutureTask对象,那咱们来看下该类的run方法:
public void run() {
boolean periodic = isPeriodic();
//若是当前线程池已经不支持执行任务,则取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//若是不须要周期性执行,则直接执行run方法而后结束
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//若是须要周期执行,则在执行完任务之后,设置下一次执行时间
setNextRunTime();
//把任务从新加入延迟队列
reExecutePeriodic(outerTask);
}
}复制代码
上面就是schedule方法完整的执行过程。
ScheduledThreadPoolExecutor类中关于周期性执行的任务提供了两个方法scheduleAtFixedRate跟scheduleWithFixedDelay,一块儿看下区别。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//删除没必要要的逻辑,重点看区别
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//两者惟一区别
unit.toNanos(period));
//...
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
//...
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//两者惟一区别
unit.toNanos(-delay));
//..
}复制代码
前者把周期延迟时间传入ScheduledFutureTask中,然后者却设置成负数传入,区别在哪里呢?看下当任务执行完成之后的收尾工做中设置任务下次执行时间的方法setNextRunTime源码:
private void setNextRunTime() {
long p = period;
//大于0是scheduleAtFixedRate方法,表示执行时间是根据初始化参数计算的
if (p > 0)
time += p;
else
//小于0是scheduleWithFixedDelay方法,表示执行时间是根据当前时间从新计算的
time = triggerTime(-p);
}复制代码
也就是说当使用scheduleAtFixedRate方法提交任务时,任务后续执行的延迟时间都已经肯定好了,分别是initialDelay,initialDelay + period,initialDelay + 2 * period以此类推。
而调用scheduleWithFixedDelay方法提交任务时,第一次执行的延迟时间为initialDelay,后面的每次执行时间都是在前一次任务执行完成之后的时间点上面加上period延迟执行。
ScheduledThreadPoolExecutor能够说是在ThreadPoolExecutor上面进行了一些扩展操做,它只是从新包装了任务以及阻塞队列。该类的阻塞队列DelayedWorkQueue是基于堆去实现的,本文没有太详细介绍堆结构插入跟删除数据的调整工做,感兴趣的同窗能够私信或者评论交流。