观看本文章以前,最好看一下这篇文章熟悉下ThreadPoolExecutor基础知识。java
讲解本篇文章从下面一个例子开始,test1()和test2()方法都会抛出RejectedExecutionException异常,ThreadPoolExecutor默认的拒绝任务策略是AbortPolicy。test1()中线程池中corePoolSize和maximumPoolSize都为2,阻塞队列的长度是10,线程池最多能处理12个任务。当超过12个任务时,就会拒绝新的任务,抛出RejectedExecutionException。而test2()中的任务没有超过线程池的阀值,可是在线程池调用shutdown()后,线程池的状态会变成shutdown,此时不接收新任务,但会处理正在运行的任务和在阻塞队列中等待处理的任务。因此咱们在shutdown()以后再调用submit(),会抛出RejectedExecutionException异常。有了这个例子的基础,咱们再来分析源码,会好过一点。安全
/**
* @author cmazxiaoma
* @version V1.0
* @Description: 分析抛出RejectedExecutionException问题
* @date 2018/8/16 14:35
*/
public class RejectedExecutionExceptionTest {
public static void main(String[] args) {
// test1();
test2();
}
/**
* 提交的任务数量超过其自己最大能处理的任务量
*/
public static void test1() {
CustomThreadPoolExecutor customThreadPoolExecutor =
new CustomThreadPoolExecutor(2, 2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 13; i++) {
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("线程" + Thread.currentThread().getName()
+ "正在执行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
if (i == 12) {
// throw RejectedExectionException
customThreadPoolExecutor.submit(customTask);
} else {
customThreadPoolExecutor.submit(customTask);
}
}
customThreadPoolExecutor.shutdown();
}
/**
* 当线程池shutdown()后,会中断空闲线程。可是正在运行的线程和处于阻塞队列等待执行的线程不会中断。
* shutdown(),不会接收新的线程。
*/
public static void test2() {
CustomThreadPoolExecutor customThreadPoolExecutor =
new CustomThreadPoolExecutor(2, 2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 2; i++) {
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("线程" + Thread.currentThread().getName()
+ "正在执行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
customThreadPoolExecutor.submit(customTask);
}
customThreadPoolExecutor.shutdown();
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("线程" + Thread.currentThread().getName()
+ "正在执行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
customThreadPoolExecutor.submit(customTask);
}
}
复制代码
关于线程池执行过程,咱们看下面一幅图,就能明白个大概。 1.当线程池中的线程数量小于corePoolSize,就会建立新的线程来处理添加的任务直至线程数量等于corePoolSize。bash
2.当线程池中的线程数量大于等于corePoolSize且阻塞队列(workQueue)未满,就会把新添加的任务放到阻塞队列中。网络
3.当线程池中的线程数量大于等于corePoolSize且阻塞队列满了,就会建立线程来处理添加的任务直到线程数量等于maximumPoolSize多线程
4.若是线程池的数量大于maximumPoolSize,会根据RejectedExecutionHandler策略来拒绝任务。AbortPolicy就是其中的一种拒绝任务策略。 并发
submit()相比于execute()而言,多了RunnableFuture<Void> ftask = newTaskFor(task, null);
这一步,把task包装成RunnableFuture类型的ftask。因此submit()有返回值,返回值类型是Future<?>,能够经过get()获取线程执行完毕后返回的值。还能够经过isDone()
、isCancelled()
、cancel(boolean mayInterruptIfRunning)
这些方法进行某些操做。好比判断线程是否执行完毕、判断线程是否被取消,显式取消启动的线程的操做。ide
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
复制代码
线程池去处理被提交的任务,很明显经过execute()方法提交的任务必需要实现Runnable接口。oop
咱们来仔细看下execute()注释,发现它说到:若是任务不能被成功提交获得执行,由于线程池已经处于shutdown状态或者是任务数量已经达到容器上限,任务会被RejectedExecutionHandler处理进行拒绝操做。很明显,注释已经告诉上文抛出RejectedExecutionException异常的答案了。有时候真的要仔细看注释!!!多看注释,事半功倍。源码分析
咱们来看execute()中作了什么操做。ui
1.获取线程池的状态,若是线程池中的线程数量小于corePoolSize,调用addWorker(command, true)
建立新的线程去处理command任务。若是addWorker()返回失败,咱们再次获取线程池的状态。由于addWorker()失败的缘由可能有:线程池已经处于shutdown状态不接收新的任务或者是存在并发,在workerCountOf(c) < corePoolSize
这块代码后,有其余的线程建立了worker线程,致使worker线程的数量大于等于corePoolSize
2.若是线程池的数量大于等于corePoolSize,且线程池的状态处于RUNNING状态,咱们将任务放到阻塞队列中。当任务成功放入阻塞队列中,咱们仍然须要一个双重校验的机制去判断是否应该建立新的线程去处理任务。
由于会存在这些状况:有些线程在咱们上次校验后已经死掉、线程池在上次校验后忽然关闭处于shutdown状态。考虑到这些缘由,咱们必须再次校验线程池的状态。若是线程池的状态不处于RUNNING状态,那么就行回滚操做,把刚才入队的任务移除掉,后续经过reject(command)
执行拒绝任务策略。
若是线程池处于RUNNING状态且线程池中线程数量等于0或者从阻塞队列中删除任务失败(意味着:这个任务已经被其余线程处理掉了)且线程池中线程数量等于0,那么调用addWorker(null, false)
新建一个worker线程,去消费workQueue中里面的任务
3.若是线程池不处于RUNNING状态或者任务没法成功入队(此时阻塞队列已经满了),此时须要建立新的线程扩容至maximumPoolSize。若是addWorker(command, false)
返回false,那么经过reject(command)
执行拒绝任务策略。
这里再唠叨几句,调用addWorker()有这4种传参的方式,适用于不一样场景。
1.addWorker(command, true)
当线程池中的线程数量少于corePoolSize,会把command包装成worker而且放入到workers集合中。若是线程池中的线程数量超过了corePoolSize,会返回false。
2.addWorker(command, false)
当阻塞队列满了,一样会把command包装成worker而且放入到worker集合中。若是线程池中的线程数量超过了maximumPoolSize,会返回false。
3.addWorker(null, false)
说明firstTask是个空任务,一样把它包装成worker而且放入到worker集合中。若是线程池中的数量超过了maximumPoolSize,会返回false。这样firstTask为空的worker在线程执行的时候,也能够从阻塞队列中获取任务去处理。
4.addWorker(null, true)
:和上面同样,只是线程池的线程数量限制在corePoolSize,超过也是返回false。使用它的有prestartAllCoreThreads()
和prestartCoreThread()
这2个方法,其使用目的是预加载线程池中的核心线程。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } 复制代码
addWorker()主要是建立新的线程,而后执行任务。
1.首先判断线程池的状态是否知足建立worker线程的要求。
若是线程池的状态大于SHUTDOWN状态,那么此时处于STOP、TIDYING、TERMINATE状态,不能建立worker线程,返回false。
若是线程池处于shutdown状态且firstTask不等于null,此时也没法建立worker线程。由于处于shutdown状态的线程池不会去接收新的任务。
若是线程池处于shutdown状态且firstTask等于null且workQueue阻塞队列为空,此时就更没有必要建立worker线程了。由于firstTask为null,就是为了建立一个没有任务的worker线程去阻塞队列里面获取任务。而阻塞队列都已经为空,那么再建立一个firstTask为null的worker线程显然没有什么意思,返回false便可。
retry:
位置。若是CAS操做失败,说明workerCount被其余线程修改过。咱们再次获取ctl,判断当前线程池状态和以前的状态是否匹配。若是不匹配,说明线程池状态发生变动,继续循环操做。3.经过传入来的firstTask建立worker线程。Worker的构造方法中经过setState(-1)
设置state(同步状态)为-1。Worker继承了AbstractQueuedSynchronizer,其自己是一把不可重入锁。getThreadFactory().newThread(this)
建立新线程,由于Worker实现了Runnable接口,其自己也是一个可执行的任务。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
复制代码
4.咱们往workers添加worker线程时,经过ReentrantLock保证线程安全。只有在当前线程池处于RUNNING状态或者是处于SHUTDOWN状态且firstTask等于null的状况下,才能够添加worker线程。若是worker线程已经处于启动且未死亡的状态,会抛出IllegalThreadStateException异常。
添加完毕后,启动worker线程。若是worker线程启动成功返回true,启动失败调用addWorkerFailed()进行回滚操做。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
复制代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
咱们来看下ThreadPoolExecutor的内部类Worker,上文已经说到Worker继承了AbstractQueuedSynchronizer类且实现了Runnable接口。因此说是一个可执行的任务,也是一把不可重入锁,具备排他性。
1.咱们建立Worker对象时,默认的state为-1。咱们中断的时候,要获取worker对象的锁(state从0 CAS到1)。获取锁成功后,才能进行中断。这说明了在初始化worker对象阶段,不容许中断。只有调用了runWorker()
以后,将state置为0,才能中断。
2.shutdown()中调用interruptIdleWorkers()中断空闲线程和shutdownNow()中调用interruptWorkers()中断全部线程。
interruptIdleWorkers()中中断空闲线程的前提是要获取worker对象的锁。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
复制代码
interruptWorkers()中中断全部线程时,不用调用tryLock()获取worker对象的锁,最终是经过worker中的interruptIfStarted()来中断线程。在这个方法中只有state大于等于0且线程不等于null且线程没有被中断过,才能进行中断操做。说明只有通过了runworker()
阶段才能进行中断操做。
这也是Worker为何要设计成不可重入的缘由,就是为了防止中断在运行中的任务,只会中断在等待从workQueue中经过getTask()获取任务的线程(由于他们没有上锁,此时state为0)。
如下这5种方法都会调用到interruptIdleWorkers()去中断空闲线程。
setCorePoolSize()
setKeepAliveTime(long time, TimeUnit unit)
setMaximumPoolSize(int maximumPoolSize)
shutdown()
allowCoreThreadTimeOut(boolean value)
复制代码
还有一点必须强调。Task没有真正的被执行,执行的是Work线程。Work线程中只是调用到了Task中的run()方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
1.work线程启动后,会调用其run()方法。run()方法再去调用runWorker(this)方法。
2.执行任务以前,获取work线程中的task,而后释放worker的锁。让state状态从-1 CAS到0。当state为0,说明能够去中断此线程。
3.以轮询的方式经过getTask()从阻塞队列中获取task,当task为null,跳出轮询。
4.开始执行任务的时候,经过lock()获取锁,将state从0 CAS到1。任务执行完毕时,经过unlock()释放锁。
5.若是线程池处于STOP、TIDYING、TERMINATE状态,要中断worker线程。
6.经过beforeExecute(wt, task)和afterExecute(task, thrown)对task进行前置和后置处理。
7.在task.run()、beforeExecute(wt, task)、afterExecute(task, thrown)发生异常时都会致使worker线程终止。经过调用processWorkerExit(w, completedAbruptly)
来进行worker退出操做。
8.在getTask()获取阻塞队列中的任务,若是队列中没有任务或者是获取任务超时,都会调用processWorkerExit(w, completedAbruptly)
来进行worker退出操做。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
上文已经提起过getTask()方法,主要是从阻塞队列获取task的。那么条件下task会返回null呢?咱们能够经过注释获得一些信息。
1.首先获取线程池运行状态,若是线程池的状态处于shutdown状态且workQueue为空,或者处于stop状态。而后调用decrementWorkerCount()递减workerCount,最后返回null。
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
复制代码
2.allowCoreThreadTimeOut默认为false。为false的时候,核心线程即时在空闲时也会保持活跃。为true的时候,核心线程在keepAliveTime时间范围内等待工做。若是线程池的数量超过maximumPoolSize或者等待任务超时或者workQueue为空,那么直接经过CAS减小workerCount数量,返回null。
3.若是timed为true,经过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
获取task,等待时间超过了keepAliveTime还没获取到task,直接返回null。若是timed为false,经过workQueue.take()
获取task。若是没有获取到task,会一直阻塞当前线程直到获取到task(当阻塞队列中加入了新的任务,会唤醒当前线程)为止。
4.若是获取task成功,就直接返回。若是获取task超时,timedOut会置为true,会在下一次循环中以返回null了结。
再强调一点,只有当线程池中的线程数量大于corePoolSize才会进行获取任务超时检查,这也体现线程池中的一种策略:当线程池中线程数量达到maximumPoolSize大小后,若是一直没有任务进来,会逐渐减小workerCount直到线程数量等于corePoolSize。
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
1.completedAbruptly为true,说明worker线程时忽然终止,说明执行task.run()发生了异常,因此要经过CAS减小workerCount的数量。 2.completedAbruptly为false,说明worker线程是正常终止,不须要对workerCount进行减小的操做。由于在getTask()中已经作了此操做。
3.对worker完成的任务数进行统计,而且从workers集合中移出。
4.调用tryTerminate()方法,尝试终止线程池。若是状态知足的话,线程池还存在线程,会调用interruptIdleWorkers(ONLY_ONE)
进行中断处理,使其进入退出流程。若是线程池中的线程数量等于0的话,经过CAS把线程池的状态更新到TIDYING。而后经过terminated()进行一些结束的处理,最后经过CAS把线程池状态更新到TERMINATED。最后的最后,调用termination.signalAll()
唤醒等待的线程,通知它们线程池已经终止。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
5.获取线程池的状态。若是线程池的状态还处于RUNNING、SHUTDOWN,说明tryTerminate()
没有成功。若是worker线程是忽然终止的话,经过addWorker(null, false)
再建立一个没有task的worker线程去处理任务。
6.若是worker线程是正常终止的话,且当前线程池中的线程数量小于须要维护的数量,咱们也会经过addWorker(null, false)
再建立一个没有task的worker线程去处理任务。
7.默认状况下allowCoreThreadTimeOut为false,那么min就等于corePoolSize。那么线程池须要维护的线程数量就是corePoolSize个。若是allowCoreThreadTimeOut为true,min就等于0。在workQueue不等于空的状况,min会被赋值成1。此时线程池须要维护的线程池数量是1。
若是线程池处于shutdown状态,在workQueue不为空的状况下,线程池始终会维护corePoolSize个线程。当workQueue为空的话,线程池会逐渐销毁这corePoolSize个线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 复制代码
你们好,我是cmazxiaoma(寓意是沉梦昂志的小马),感谢各位阅读本文章。 小弟不才。 若是您对这篇文章有什么意见或者错误须要改进的地方,欢迎与我讨论。 若是您以为还不错的话,但愿大家能够点个赞。 但愿个人文章对你能有所帮助。 有什么意见、看法或疑惑,欢迎留言讨论。
最后送上:心之所向,素履以往。生如逆旅,一苇以航。