ScheduledThreadPoolExecutor源码学习

1、首先先看一下线程池的建立:线程

corePoolSize:线程池核心线程数量翻译

maximumPoolSize:线程池最大线程数量rest

keepAliverTime:当活跃线程数大于核心线程数时,空闲的多余线程最大存活时间继承

unit:存活时间的单位接口

workQueue:存听任务的队列队列

handler:超出线程范围和队列容量的任务的处理程序资源

2、知道这几个参数以后,看一下线程池的原理:get

提交一个任务到线程池中,线程池的处理流程以下:源码

一、判断线程池里的核心线程是否都在执行任务,若是不是(核心线程空闲或者还有核心线程没有被建立)则建立一个新的工做线程来执行任务。若是核心线程都在执行任务,则进入下个流程。it

二、线程池判断工做队列是否已满,若是工做队列没有满,则将新提交的任务存储在这个工做队列里。若是工做队列满了,则进入下个流程。

三、判断线程池里的线程是否都处于工做状态,若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略来处理这个任务。

拒绝策略有3种:以下

3、在分析ScheduledThreadPoolExecutor以前,咱们先看看ThreadPoolExecutor的源码,而后在看ScheduledThreadPoolExecutor,对比分析ScheduledThreadPoolExecutor产生的缘由和使用场景。

1.ThreadPoolExecutor 的源码

由此能够看到ThreadPoolExecutor本质上就是个Executor,因此他的的核心是execute方法。咱们来看一下execute方法:

在具体看这个方法以前,能够看看这个方法的注释(本身翻译吧,英语太菜,只会直译):

1. 若是运行线程数小于核心线程数,则尝试用给定的命令做为第一个任务启动一个新线程。调用addWorker会原子性地检查runState和workerCount,以此来防止错误警报,由于错误警报会在不该该添加线程的时候添加线程。

2. 若是一个任务能够成功添加到队列,那么咱们仍然须要再次检查是否应该添加一个线程(当自上次检查以来已有的线程已经死亡),或者在进入这个方法后关闭线程池。所以,咱们从新检查状态,若是必要的话,当线程池关闭时回滚队列;当没有线程的时候,则启动一个新线程。

3.若是任务没法添加到队列,则尝试添加新线程。若是它失败了,咱们知道咱们被关闭或饱和,因此拒绝任务。

这个方法的注释,就是前面写的,线程池的处理流程。

而后还能够看看这个类的这2段注释,更好的理解这个流程:

如今来分析下execute方法:

首先ctl的参数,一眼看上去不知道是什么,而后咱们先看看这个类关于这个参数的定义:

再跟一下这个ctl:

因此ctl.get()就是获取主池控制状态ctl。

当正在运行的线程数小于核心线程数,看一下addWorker(command,true)方法:

 

关于这个方法,先看一下这段注释:

而后能够在前面看看runState的状态,基本上这个方法就能本身看明白了。其中须要注意的是:

这个线程是禁止打断的执行运行。

还有就是最后finally作了个判断,若是workStarted为false,会从线程池中移除该线程。

而后再看一下Worker:

继承了AQS类,能够方便的实现工做线程的停止操做;

实现了Runnable接口,能够将自身做为一个任务在工做线程中执行;

当前提交的任务firstTask做为参数传入Worker的构造方法;

因此咱们应该看一下他的run方法:

这个runWorker方法(上面的图方法没截全),主要是如下几步:

unlock释放锁,表示可中断;

firstTask不为null或者从线程池中获取的线程不为null,线程加锁,检查线程池状态,看是否可中断,能够的话就进行中断;

执行beforeExecute方法,run方法,afterExecute方法;

释放锁。

而后在看一下getTask方法:

这个方法看一下注释,基本上就没啥了。

看完这些以后,再返回到execute方法:

关于ThreadPoolExecutor的execute方法就分析完了。
execute是任务的执行,再看一下任务的提交,submit方法:

ThreadPoolExecutor并无submit方法,而他的父类AbstractExecutorService实现了ExecutorService的submit方法。

这个execute方法已经分析过了,如今就看看newTaskFor方法:

既然有这么多状态,确定有对状态的获取方法:

看一下get()方法:

内部经过awaitDone方法对主线程进行阻塞:

若是主线程被中断,则抛出中断异常;
判断FutureTask当前的state,若是大于COMPLETING,说明任务已经执行完成,则直接返回;
若是当前state等于COMPLETING,说明任务已经执行完,这时主线程只需经过yield方法让出cpu资源,等待state变成NORMAL;
经过WaitNode类封装当前线程,并经过UNSAFE添加到waiters链表;
最终经过LockSupport的park或parkNanos挂起线程;
 

看完这个以后,在看一下FutureTask的类图关系:

而后看一下FutureTask的run方法:

 

这样的话,ThreadPoolExecutor 的源码部分就分析差很少了。

而后若是想给这个类添加额外的功能,能够看看这个示例:

2.而后看一下ScheduledThreadPoolExecutor:

这个类有这样一段注释:

咱们仍是跟ThreadPoolExecutor同样的方式,研究一下这个类:

先看execute方法:

调用delayedExecute(t)延迟任务的执行:

刚加进去的线程,确定是走else的,看一下ensurePrestart方法:

走到这里就能够看出来,跟ThreadPoolExecutor的addWorker是同样的了。

因此咱们须要好好看看这个部分:

先看一下triggerTime方法:获取下一次具体执行时间

而后看一下ScheduledFutureTask:

这个FutureTask是否是很熟悉,这个在前面ThreadPoolExecutor的时候分析过,而后ScheduledFutureTask从新了写了run方法:

看完这个以后,再看一下ScheduledFutureTask的构造方法:

time:下次任务执行时的时间;
period:执行周期;
sequenceNumber:保存任务被添加到ScheduledThreadPoolExecutor中的序号。

再看一下这个类:

这个就总体串起来了。

定时任务,通常会调用ScheduledThreadPoolExecutor这2个方法:

scheduleAtFixedRate方法
该方法设置了执行周期,下一次执行时间至关因而上一次的执行时间加上period,它是采用已固定的频率来执行任务:

scheduleWithFixedDelay方法
该方法设置了执行周期,与scheduleAtFixedRate方法不一样的是,下一次执行时间是上一次任务执行完的系统时间加上period,于是具体执行时间不是固定的,但周期是固定的,是采用相对固定的延迟来执行任务:

这里的unit.toNanos(-delay)),再看一下setNextRunTime()方法,就能理解了,这里把周期设置为负数来表示是相对固定的延迟执行。

再看ScheduledThreadPoolExecutor的submit方法:

也是走到schedule方法,剩下的就本身看看吧。

(shutdown的部分没有写,能够本身看看)

相关文章
相关标签/搜索