ThreadPoolExecutor算是JUC中最经常使用的类之一了。ThreadPoolExecutor,顾名思义,thread-pool-executor,硬翻译就是“线程-池-执行者”;java中,经过ThreadPoolExecutor能够很容易的建立一个线程池。可是咱们为何要使用线程池?呢?它可以带来什么样的优点呢?它又是怎么实现的呢?OK,带着这几个问题,咱们来学习一下JAVA中的线程池技术。java
关于这个问题其实有点鸡肋,我以为再问这个问题以前更应该问为何要有线程池。那为何呢?数组
this is a 例子:bash
快递行业最近两年发展的灰常火热,据说工资也很是的高,搞得我一每天的都没有心思去好好写代码了...
微信
以前的小快递公司都是没有固定的快递员的,就是说,每次去送一件快递,站点负责人就须要去找一我的来帮忙送,送完以后就没有而后了(固然,钱仍是要给的)。
框架
可是后来随着货愈来愈多,找人给钱成本太大,并且农忙时还须要花很长时间去找人,因此就雇用了5我的,签了合同,长期为站点配送。
ide
之前都是随时用随时找,如今不是,如今是成立了一个物流公司,开了一个配送部,配送部门规定正式配送员最多只能有五我的。函数
以前配送的缺点是什么:oop
成立配送部以后解决的问题post
OK,咱们以上述例子来对应理解线程池的基本原理学习
先来看下,JAVA对ThreadPoolExecutor的类申明:
public class ThreadPoolExecutor extends AbstractExecutorService 复制代码
在【初识】-JUC·Executor框架中给出了Executor的继承体系。ThreadPoolExecutor就是具有线程池功能的集成者。
//构造方法一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//构造方法二
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//构造方法三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//构造方法四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
从上面的代码能够看出,构造方法(1、2、三)都是经过调用(四)来作具体属性初始化的。那么咱们直接来看构造方法四;在构造方法四中总共须要7个参数,先来看下每一个参数的具体含义:
corePoolSize
核心线程数大小。那么什么是核心线程数呢,咱们能够类比于上面例子中的配送部中签定劳动合同的人的个数。
maximumPoolSize
最大线程数。加入说如今是双十一期间,快递异常的多,配送部的5我的彻底忙不过来,并且仓库也满了,怎么办呢?这个时候就须要再招聘一些临时配送员,假设maximumPoolSize为10,那么也就是说,临时招聘能够招5我的,配送部签定正式劳动合同的人和签定临时合同的人加一块不能超过配送部规定的最大人数(10人)。因此说,maximumPoolSize就是线程池可以容许的存在的最大线程的数量。
keepAliveTime
存活时间。为何要有这个呢?想一下,双十一过去了,货物已经配送的差很少了。临时合同写的是若是临时配送员2天没有配送了,那配送部就有权利终止临时合同,如今已经达到2天这个点了,须要开除这些临时配送专员了。对于线程池来讲,keepAliveTime就是用来表示,当除核心线程池以外的线程超过keepAliveTime时间以后,就须要被系统回收了。
unit
keepAliveTime的时间单位。
workQueue
工做队列。这个就至关于一个仓库,如今配送部5我的都在配送,可是还不断的有新的快递达到,这个时候就须要一个仓库来存放这些快递。对于线程池来讲,当核心线程都有本身的任务处理,而且还有任务进来的时候,就会将任务添加到工做队列中去。
threadFactory
线程工厂。就是用来建立线程的。能够类比成招聘组,会给每一个线程分配名字或者编号这样。
handler
RejectedExecutionHandler 用来描述拒绝策略的。假设如今个人仓库也知足,而且配送部已经达到10我的了。怎么办呢,那么只能采用一些策略来拒绝任务了。
// runState is stored in the high-order bits
//RUNNING;该状态的线程池接收新任务,而且处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN;该状态的线程池不接收新任务,但会处理阻塞队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP;不接收新任务,也不处理阻塞队列中的任务,而且会中断正在运行的任务;
private static final int STOP = 1 << COUNT_BITS;
//全部的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态
private static final int TIDYING = 2 << COUNT_BITS;
//线程池完全终止,就变成TERMINATED状态。
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
下面是在网上发现的一位大牛的图;感受能够较为直观的描述状态的变动
有几个点须要注意。
public void execute(Runnable command) {
//任务为null,直接抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//若是线程数大于等于基本线程数,将任务加入队列
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
复制代码
经过实现这个接口去建立一个新的线程
public interface ThreadFactory {
Thread newThread(Runnable r);
}
复制代码
经过addWorker方法来添加,其实在excute中只是做为一个提交任务的入口,实际的处理逻辑都是在addWorker这个方法里来完成的。addWorker有两个参数:
先来看下这个方法的前半部分:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋方式
for (;;) {
//获取当前线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
//若是状态是STOP,TIDYING,TERMINATED状态的话,则会返回false
//若是状态是SHUTDOWN,可是firstTask不为空或者workQueue为空的话,那么直接返回false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//经过自旋的方式,判断要添加的worker是否为corePool范畴以内的
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
复制代码
//若是超过CAPACITY限制了则直接返回false
wc >= CAPACITY
复制代码
//判断当前的workerCount是否大于corePoolsize,不然则判断是否大于maximumPoolSize //具体的比较取决于入参core是true仍是false。
wc >= (core ? corePoolSize : maximumPoolSize)
复制代码
若是上面两个有一个知足了,则直接返回false。
下面是判断WorkerCount经过CAS操做增长1是否成功,成功的话就到此结束
if (compareAndIncrementWorkerCount(c))
break retry;
复制代码
若是不成功,则再次判断当前线程池的状态,若是如今获取到的状态与进入自旋的状态不一致的话,那么则经过continue retry从新进行状态的判断。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
复制代码
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//建立一个新的Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
//
if (t != null) {
//加锁
mainLock.lock();
try {
// 在锁定的状况下从新检查。
// 在一下状况退出:ThreadFactory 建立失败或者在获取锁以前shut down了
int c = ctl.get();
int rs = runStateOf(c);
//状态校验
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 预先检查t是能够启动的
throw new IllegalThreadStateException();
//添加至workers中
workers.add(w);
int s = workers.size();
//若是超过了历史最大线程数,则将当前池数量设置为历史最大线程记录数
if (s > largestPoolSize)
largestPoolSize = s;
//标识添加工做线程成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//若是添加成功则启动当前工做线程
if (workerAdded) {
t.start();
//并将当前线程状态设置为已启动
workerStarted = true;
}
}
} finally {
//添加失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
固然咱们也能够自定义拒绝策略。
一、ArrayBlockingQueue
基于数组的阻塞队列,长度有限
二、LinkedBlockingQuene
基于链表的阻塞队列,长度无限,使用这个可能会致使咱们的拒绝策略失效。由于能够无限的建立新的工做线程。
三、PriorityBlockingQueue
具备优先级的无界阻塞队列;
三、SynchronousQuene
SynchronousQuene是一个是一个不存储元素的BlockingQueue;每个put操做必需要等待一个take操做,不然不能继续添加元素。因此这个比较特殊,它不存咱们的任务,也就说说它的每一个put操做必须等到另外一个线程调用take操做,不然put操做一直处于阻塞状态。
这个是ThreadPoolExecutor的一个内部类,表示一个工做线程。重要的是这个内部类实现了AbstractQueuedSynchronizer(AQS:抽象队列同步器)抽象类。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** 当前work持有的线程 */
final Thread thread;
/** 运行的初始任务。 可能为空。*/
Runnable firstTask;
/** 每一个线程完成任务的计数器 */
volatile long completedTasks;
/** * 构造函数 */
Worker(Runnable firstTask) {
// 禁止中断,直到runWorker
setState(-1);
//想提交的任务交给当前工做线程
this.firstTask = firstTask;
//经过线程工厂建立一个新的线程
this.thread = getThreadFactory().newThread(this);
}
/** 将run方法的执行委托给外部runWorker */
public void run() {
runWorker(this);
}
// 是否锁定
//
// 0表明解锁状态。
// 1表明锁定状态。
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁(重写AQS的方法)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁(重写AQS的方法)
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//加锁
public void lock() { acquire(1); }
//尝试加锁
public boolean tryLock() { return tryAcquire(1); }
//解锁
public void unlock() { release(1); }
//是否锁定
public boolean isLocked() { return isHeldExclusively(); }
//若是启动则中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
最后来看下runWorker这个方法(ThreadPoolExecutor中的方法):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
下面是对注释的蹩脚翻译,欢迎吐槽,但注意尺度,O(∩_∩)O哈哈~
主要工做循环运行。重复地从队列中获取任务并执行它们,同时处理一些问题:
异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有关于用户代码遇到的任何问题的准确信息。
本文是JUC的第二篇,意在经过查看源码来了解线程池的具体工做原理。文中若是存在不当的描述,但愿小伙伴们可以及时提出。灰常感谢!
欢迎关注微信公众号,干货满满哦~