线程池ThreadPoolExecutor

为何须要线程池?
线程池可以对线程进行统一分配,调优和监控:
- 下降资源消耗(防止线程不停的建立与销毁,减小了资源消耗)
- 提升响应速度
- 提升线程的可管理性java
核心参数
源码内部使用了一个Integer类型的原子变量来记录线程池状态(高三位)和线程池线程数(其他)。数组
状态 |
高三位 |
表现 |
RUNNING |
-1(111) |
接收并容许新任务 |
SHUTDOWN |
0(000) |
拒绝新任务,但处理阻塞队列里的任务 |
STOP |
1(001) |
拒绝新任务,放弃执行任务,并中断正在处理的任务 |
TIDYING |
2(010) |
全部任务执行完后,当前线程池活动数为0,将要调用 terminated 方法 |
TERMINATED |
3(011) |
terminated()方法执行完毕 |
//原子变量 ctl 存储线程池状态(高三位)与线程数(其余低位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数的表示位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//111 - 000~
private static final int RUNNING = -1 << COUNT_BITS;
//000 - 000~ 拒绝新任务,但处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001 - 000~ 放弃执行任务,并中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//010 - 000~ 全部任务执行完后,当前线程池活动数为0,将要调用 terminated 方法
private static final int TIDYING = 2 << COUNT_BITS;
//011 - 000~ 终止状态
private static final int TERMINATED = 3 << COUNT_BITS;
//取高三位
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//取低其余位
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//获取 ctl 值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//=========================分割线
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private volatile boolean allowCoreThreadTimeOut;
private int largestPoolSize;//历史最大建立线程数
private long completedTaskCount;//完成的任务数
//-------------------------分割线
//构造方法相关的参数
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
//阻塞队列
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
核心构造方法
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程空闲至销毁等待时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//工做队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//饱和策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
拒绝策略比较
拒绝策略 |
表现 |
ThreadPoolExecutor.AbortPolicy() 默认 |
丢弃任务并抛出RejectedExecutionException异常 |
ThreadPoolExecutor.CallerRunsPolicy() |
由调用线程(提交任务的线程)处理该任务 |
ThreadPoolExecutor.DiscardOldestPolicy() |
丢弃队列最前面的任务,而后提交当前任务 |
ThreadPoolExecutor.DiscardPolicy() |
丢弃任务,可是不抛出异常。 |
阻塞队列的比较
队列名 |
表现 |
ArrayBlockingQueue |
基于数组的有界队列 |
LinkedBlockingQueue |
基于链表的无界队列 |
SynchronousQueue |
最多只有一个元素的队列 |
PriorityBlockingQueue |
优先级队列![]() |
提交任务的源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1.是否比核心线程数少,是则新建一个核心线程完成该任务
* 2.尝试添加到阻塞队列
* 3.尝试新建线程完成任务,失败则线程池已满或关闭,执行拒绝策略
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//3
reject(command);
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
/* 检查队列是否只在必要时为空,返回false
* 1.若是线程池状态为 shutdown 之后的状态
* 2.若是线程池状态为 shutdown 而且有了第一个任务
* 3.若是线程池状态为 shutdown 且任务队列为空
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
//线程池正在运行 或 shutdown状态且第一个任务为空
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 线程处于运行态,状态错误 precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//添加任务
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动任务
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程池提交任务流程图

线程池的其余方法 简
方法名 |
表现 |
public void shutdown(); |
线程池再也不接收新任务,但还会完成工做队列里的任务 |
public List<Runnable> shutdownNow(); |
线程池再也不接收新任务,且会放弃工做队列的任务,正在执行的任务会被中断,返回被丢弃任务的集合 |
public boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; |
当线程调用该方法后,当前线程会被阻塞,直到线程池状态变成 TERMINATED 或超时才返回 |