「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」前端
ThreadPoolExecutor是Java的线程池并发代名词,多线程开发基本都是基于这个去作具体的业务开发。虽然以为本身回了,网上帖子已经有不少的文章写这个,可是是本身一一点写的,终归是要比看别人的理解更加深入,因此最近本身在对java知识的系统梳理。 那么接下来主要分析下这个多线程框架的原理。java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
复制代码
面试靠的最可能是这个构造函数中7个参数的做用,面试
成员变零后端
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程容量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
面试最喜欢问的是 ctl变量的表明什么意义? ctl变量的的的用高3位表示线程池的状态,用低29位表示线程个数,二者经过 | 操做,拼接出ctl变量,也就是线程池的最大线程数capacity是 (2^29)-1。markdown
首先咱们来看平时业务代码是提交任务到线程池执行的函数是经过execute或者submit方法, 区别就是submit返回具备Future,execute返回void,的、那么接下来咱们主要分析execute 的执行流程,submit涉及到线程异步返回,以后会另外单独分析,那么下面这个execute函数 就能看出线程池的整个执行流程,多线程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 当线程池的核心线程数设置为0状况下,那么这时workerCountOf(recheck)为0,这时就开启非线程数处理队列任务
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
复制代码
线程池执行任务流程图以下: 我相信大概的流程通常同窗是清楚的:并发
实际源码中执行流程还有一些小细节容易被忽略的地点框架
线程池新增工做任务主要addWorker方法。因为代码比较长,我就在 代码里写好注释less
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
//第一个条件: 线程至少不是运行状态,那么就是shutdown stop tidying,terminated状态
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//第二个条件: 当前线程池是shutdown状态且任务队列非空而且工做任务第一个任务是空的取反条件,这个含义是当除了SHUTDOWN状态且第一个任务为空且任务队列不为空
// 状况下,直接返回false,增长Work线程失败
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
// 线程池是running状态
(rs == SHUTDOWN && firstTask == null)) {
//线程池处于shutdown状态而且第一个task为空
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入工做线程的集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//记录最大线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
添加工做线程主要步骤异步
若是WorkerAdded失败,则从Worder的Set移除刚才加入Worker线程,并将线程池的线程数减1,
首先来看下Work的类的成员变量的构造函数,从下面的Work的代码,能够看到它是实现了 RUnnable接口,上一节Worker启动是调用了它的start方法,真正由操做系统调度执行 的其run方法,那么接下来重点看下run的工做流程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//初始化状态为-1,表示不能被中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
复制代码
下面代码中Work的run直接调用runWork,并传入自身对象, 开始一个循环判断 第一个任务后者从任务队列中取任务不为空,就开始上锁,而后执行任务,若是任务 队列为空了,则处理Work的退出。
/** Delegates main run loop to outer runWorker */
public void run() {
//直接调用runWorker函数
runWorker(this);
}
final void runWorker(Worker w) {
// Wokder当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//将state值赋值为0,这样就运行中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环判断第一个Task获取从获取任务
while (task != null || (task = getTask()) != null) {
//获取当前Work的锁,处理任务,也就是当前Work线程处理是同步处理任务的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//线程池的状态至少是stop,即便stop,tidying.terminated状态
if ((runStateAtLeast(ctl.get(), STOP)
//检查线程是否中断且清楚中断
|| (Thread.interrupted()
&&
//再次检查线程池的状态至少是STOP
runStateAtLeast(ctl.get(), STOP))) &&
//再次判断是否中断
!wt.isInterrupted())
//中断线程
wt.interrupt();
try {
//执行业务任务前处理(钩子函数)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这里就是执行提交线程池的Runnable的任务的run方法 task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行业务任务后处理(钩子函数)
afterExecute(task, thrown);
}
} finally {
//执行结束重置为空,回到while循环拿下一个
task = null;
//处理任务加1
w.completedTasks++;
//释放锁,处理下一个任务
w.unlock();
}
}
//代码执行到这里,表明业务的任务没有异常,否则不会走到这里,
//由于上一层try没有catch异常的,而业务执行出现异常,最里层
//虽然catch了异常,可是也都经过throw向外抛出
completedAbruptly = false;
} finally {
//若是循环结束,则处理Work退出工做,表明任务拿不到任务,即任务队列没有任务了
processWorkerExit(w, completedAbruptly);
}
}
复制代码
下面就来看下getTask获取任务队列的处理逻辑 、 若是这里返回null,即runWorker循环退出,则会处理finnaly中processWorkExit, 处理Work线程的退出,下面是getWork返回null的状况:
private Runnable getTask() {
//超时标志
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//获取线程状态
int c = ctl.get();
//线程状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 若是线程池状态值至少是SHUTDOWN状态,
if (rs >= SHUTDOWN
线程池状态值至少是STOP状态,或者是任务队列是空
&& (rs >= STOP || workQueue.isEmpty())) {
// CAS将worker线程数减1
decrementWorkerCount();
return null;
}
//计算线程池线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut参数设置为true,或则线程池的线程数大于corePoolSize, 表示须要超时的Worker须要退出,
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数大于最大线程数 || 已经超时
if ((wc > maximumPoolSize || (timed && timedOut))
// 线程数大于1 或者 任务队列为空
&& (wc > 1 || workQueue.isEmpty())) {
// CAS将线程数减1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 须要处理超时的Worker,则获取任务队列中任务等待的时间
//就是线程池构造函数中keepAliveTime时间,若是不处理超时的Worker
//则直接调用take一直阻塞等待任务队列中有任务,拿到就返回Runnale任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
Worker的退出处理: 1 从上面分析知道completedAbruptly是任务执行时是否出现异常标志, 若是任务执行过程出错,则将线程池的线程数量减1 2.加线程池的mainLock的全局锁,这里主要区分Worker执行任务中,拿的是Worker内部的锁,完成任务加1,将worker从Worker的集合移除, 3. 执行tryTerminate函数,是否线程池线程池是否关闭 4. 根据线程池状态是否补充非核心的Worker线程去处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//任务执行时出现异常,则减去工做
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//拿到线程池的主锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//完成任务加1
completedTaskCount += w.completedTasks;
//将worker从Worker的集合移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试线程池关闭
tryTerminate();
//获取线程池的ctl
int c = ctl.get();
//若是线程池的状态值小于STOP,即便SHUTDOWN RUNNING
if (runStateLessThan(c, STOP)) {
//任务执行没有异常
if (!completedAbruptly) {
//allowCoreThreadTimeOut参数true,则min=0,表示不须要线程常驻。
//负责是有corePoolSize个线程常驻线程池
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//若是线程池数大于最小,也就是不须要补充线程执行任务队列的任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 走到这里表示线程池的线程数为0,而任务队列又不为空,得补充一个线程处理任务 addWorker(null, false);
}
}
复制代码
tryTerminate的逻辑是处理线程池关闭的场景
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//线程池是RUNNING状态
if (isRunning(c) ||
//线程池状态至少是TIDYING
runStateAtLeast(c, TIDYING) ||
//线程池状态是SHUTDOWN可是队列不为空
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//只有最后一个线程才能走到这里,处理线程池从TIDYIING状态
//到TERMINATED状态
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//钩子函数
terminated();
} finally {
//设置线程池TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
当线程池没法处理任务时的处理策略:
1.默认拒绝策略是AbortPolicy 直接抛出RejectedExecutionException异常
2.DiscardPolicy 直接丢弃任务
3.DiscardOldestPolicy 丢弃任务队列中最老的任务,这里以前理解是直接丢弃,其实看了源码以后,其实它仍是当线程池还咩有关闭时,尝试去提交该任务到线程池去执行
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
复制代码
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
复制代码
总结 本文主要就线程池的状态转换、工做线程Worker建立以及执行任务队列中任务的流程、拒绝策略的详细分析。