能够说,线程池是Java并发场景中应用到的最多并发框架了。几乎全部须要异步或者并发执行的任务程序均可以使用线程池。在开发过程当中,合理的使用线程池会带来如下3个好处:前端
世界上没有完美无瑕的事情,任何事情都有正反两面。若是滥用线程池或者使用不当,也有可能带来安全隐患。所以,必须合理的使用线程池才能使得收益最大化。接下来,咱们就来系统的了解线程池,以便可以达到“合理使用”的境界。java
当任务提交到线程池以后,线程池是如何处理这些任务的呢?它是这样处理的的:数据库
从以上图中,能够看见,当提交了一个新任务时,线程池的处理过程以下:安全
以上2/3步骤是否能够调换顺序呢?实际上,线程池之因此采用以上的设计思路,是由于,1/3步骤都是要获取全局锁的。若是任务频繁提交执行,此时将加重锁的竞争,而2步骤是不须要额外的全局锁的竞争。
带着以上的认知,咱们来剖析一下源码:架构
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 首先获取ctl变量的值(ctl变量及其重要,在这里咱们只需知道它能够同时表明线程数和线程池状态便可)
int c = ctl.get();
// 基于ctl计算出当前的线程数量,若是小于设定的核心线程数。
if (workerCountOf(c) < corePoolSize) {
// 则经过addWorker建立一个线程并执行当前任务,addWorker方法内部须要获取全局锁
if (addWorker(command, true))
return;
// 若是addWorker返回失败标志,则从新获取当前ctl的值。
c = ctl.get();
}
// 基于ctl获取当前线程池状态,若是是RUNNING状态而且任务添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 则从新检查线程池的状态,若是当前不是RUNNING状态了,则移除当前任务。
if (! isRunning(recheck) && remove(command))
若是成功,则执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0) // 若是当前线程数为0
// 则经过addWorker方法初始化ctl
addWorker(null, false);
}
// 若是失败,则经过addWorker方法建立新的线程来执行任务,
// 若是addWorker方法返回false标志,说明此时建立新的线程来执行任务失败了。
// 此时说明线程池已满,或者线程池已经不是RUNNING状态了。
else if (!addWorker(command, false))
// 此时将执行拒绝策略
reject(command);
}
复制代码
经过以上源码解析,可以清晰的了解一个任务提交到线程池是如何处理的了。源码中有两个重要的地方尚未讲解。一是ctl变量的做用;二是addWorker方法的解析。这两个点能够说是线程池的精髓所在了。并发
ctl变量是一个AtomicInteger类型,它包含了两个概念:框架
In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable. If this is ever an issue in the future, the variable can be changed to be an AtomicLong, and the shift/mask constants below adjusted. But until the need arises, this code is a bit faster and simpler using an int.异步
以上大概意思就是:为了让有效线程数和线程池的状态可以用一个int变量表示,将线程数限制在了2^29-1(约为5亿),这样的话就能够用低29位来表示有效线程数,高3位来表示线程的状态。为了更好的理解,咱们直接上源码:函数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
基于源码,能够得出它们的实际值:
COUNT_BITS = Integer.SIZE - 3;即:COUNT_BITS = 32 -3 = 29;
CAPACITY = (1 << 29) - 1;即:CAPACITY = 2^29 - 1 约等于5亿;
RUNNING = -1 << 29; --> 高3位为:111
SHUTDOWN = 0 << 29; --> 高3位为:000
STOP = 1 << 29; --> 高3位为:001
TIDYING = 2 << 29; --> 高3位为:010
TERMINATED= 3 << 29; --> 高3位为:011性能
有了以上的计算,咱们再来看:
runStateO()方法是获取当前线程池状态的方法,它的计算公式为: ctl & ~ CAPACITY.
~ CAPACITY获取到的值实际上就是高3位为1,低29位为0. 所以 ctl & ~ CAPACITY 获得的实际上就是ctl高3位的值。
同理,workerCountOf()方法获取到的实际上就是ctl低29位的值。表示为当前有效的线程数。
经过以上解析,应该就能够理解ctl变量的含义了!
在线程池中,线程并非Thread,而是基于Thread包装成了一个Worker。Worker是ThreadPoolExecutor的一个内部类。通所以,addWorker()方法实际上就是基于当前线程池的状态来决定是否构建Worker并执行。Worker执行万当前任务后,并不会直接退出,而是循环获取队列中的任务来执行,从源码中咱们能证实这个结论:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 若是task不为空,则执行task。
// rugo task为空,则从队列中继续获取task。
while (task != null || (task = getTask()) != null) {
w.lock();
// 判断线程池的状态和当前task的中断标志,是否知足继续执行的条件。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子函数:执行task以前的钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数:执行task以后调用的钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
从源码中,咱们能够得知,ThreadPoolExecutor框架预留了几个钩子函数.worker在执行任务的过程当中,会触发钩子函数,若是咱们须要执行一些特殊的业务(好比统计任务的执行时长),则能够继承ThreadPoolExecutor实现钩子函数来达到特定业务的目的。
钩子函数 | 说明 |
---|---|
beforeExecute() | 任务执行前触发 |
afterExecute() | 任务执行后触发 |
terminated() | 当线程池状态变成TIDYING时,会触发此方法 |
线程池一共有5种状态。
状态 | 解释 |
---|---|
RUNNING | 运行状态。当线程池建立成功后,线程池的状态就是RUNNING状态了。此状态下能够接收新的任务和执行队列中的任务 |
SHUTDOWN | 此状态下,线程池再也不接收新的任务了,可是会将执行中的任务和队列中的任务执行完成。 |
STOP | 此状态下,线程池再也不接收新的任务;再也不执行队列中的任务;会中断正在执行中的任务。 |
TIDYING | 此状态下,线程池中的workCount=0,线程池将调用调用terminated()方法。 |
TERMINATED | 当terminated()方法执行完成以后,线程池状态将变成TERMINATED,至此,线程池的生命周期完成。 |
线程池状态的流转:
TIP:线程池状态的流转和线程状态的流转是彻底不同的概念。
基于以上介绍,咱们对线程池的原理已经了然于胸。接下来,经过一个例子来看看实战中线程池的使用方式和技巧。
建立线程池的方式有不少种。好比:
// 1.建立固定大小的线程池
Executors.newFixedThreadPool();
// 2.建立一个基于SynchronousQueue队列的线程池.(此队列的特性在前文是有解析)
Executors.newCachedThreadPool();
// 3.建立一个具备延时功能的线程池(实际上就是基于DelayQueue实现,此队列在前文中解析)
Executors.newScheduledThreadPool();
// 4.建立一个只有一个线程的线程池
Executors.newSingleThreadExecutor();
复制代码
以上是快捷建立线程池的的4种方式,其实若是咱们再深刻理解一波的话,能够发现其实他们底层都是基于ThreadPoolExecutor提供的构造方法构建的线程池。这4种方式建立出来的线程池都具有必定的特性在里面,若是对于队列理解透彻的话,能够发现,它们的本质其实就是选择的队列不一样。从而能够基于队列提供的特色实现特殊的功能。
在实战中,除非应用场景比较简单,任务量不是很大的状况下,咱们能够采用这种快捷建立线程池的方式。但若是咱们在大中型工程中,则最好基于ThreadPoolExecutor自定义建立线程池,这样能够更加贴切实际的场景使用。
ThreadPoolExecutor提供了如下几个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
复制代码
能够发现,经过原生构造器来构建线程池,是最灵活的。所以,在《阿里巴巴Java开发规范》中也强烈建议采用这种方式来建立线程池。每一个参数的含义以下:
参数名 | 含义 |
---|---|
corePoolSize | 核心线程数 |
maximumPoolSize | 最大线程数 |
keepAliveTime | 当线程数大于核心线程数,多余的线程的存活时长,和unit配合使用 |
unit | 和keepAliveTime配合使用 |
workQueue | 工做队列。当核心线程数满时,任务会先入队。若无特殊要求,应该尽可能选择有界队列!不然当任务激增时,有可能撑爆内存致使性能降低甚至崩溃。 |
threadFactory | 线程工厂类,用于建立线程。若是没有指定,则采用默认的工厂类 |
handler | 任务拒绝策略。当线程池不能再接收新的任务时,将执行任务拒绝策略。若是没有指定,则采用默认的拒绝策略。 |
当线程池满了以后,将会执行拒绝策略。线程池提供了4种拒绝策略。以下:
拒绝策略 | 说明 |
---|---|
AbortPolicy | 当线程池满时,将会拒绝接收新任务,并抛出RejectedExecutionException。若是没有指定拒绝策略,此为默认策略 |
CallerRunsPolicy | 当线程池满时,将会使用任务提交者所在的线程来执行这个任务。线程池自己会丢弃掉这个任务。 |
DiscardPolicy | 当线程池满时,将会默默地丢弃掉这个任务。tips:在实际开发当中,若是任务不影响业务,则能够采用此策略,不然断然不可采起这个策略。 |
DiscardOldestPolicy | 当线程池满时,将会默默地丢弃掉队列最前端的任务。而后执行提交的任务。tips:和以上同样 |
关闭线程池是颇有必要的。当应用程序须要退出时,能够经过注册回调函数来关闭线程池。若是咱们暴力关闭应用程序的话,会致使正在执行的任务和队列中的任务丢失。在企业工程中,这一点千万注意。 线程池提供了两种关闭方法:
关闭方式 | 说明 |
---|---|
shutdown() | 当调用此方法时,线程池状态会变成SHUTDOWN状态,此时线程池将不会再接收新的任务,但以及接收的任务会执行完毕。 |
shutdownNow() | 当调用此方法时,线程池状态会变成STOP状态,此时线程池将不会再接收新的任务,而且会中断全部正在执行中的任务以及丢弃掉队列中的任务。 |
在实际工程中,具体采起哪一种方式,应该根据实际状况来抉择。若是任务对业务有影响,则应当选择shutdown(),不然能够视状况选择shutdownNow()。
在Java应用中,线程属于稀有资源。那么线程数是设置的越大越好么?非也。在计算机体系中,若是想让性能发挥极致,应该是各个子系统之间的合理配置使用。对于线程数而言也是如此。要想合理的设置线程数,就必须首先分析人物的特性。能够从如下几个角度来分析:
咱们能够根据任务的不一样特性来综合考虑线程数的设置。通常而言。若是是CPU密集型,则应该分配尽量小的线程数:一般状况下,能够设置为CPU核数 + 1;若是是IO密集型,则线程并不老是在执行任务,则应该分配尽量大的线程数:一般状况下,能够设置为2 * CPU核数;若是是混合型,则能够将任务拆分红一个CPU密集型和一个IO密集型,只要两个任务执行的时间不会相差太大,则性能会比串行执行的效率要高,若是拆分后任务执行相差的时间过大,则没有必要拆分。
经过以上的介绍,如何用好线程池应该不是问题。对于一个完善的应用而言,应当还要有良好的监控能力,以便在任务执行出现问题时,能够快速的定位、分析、解决问题。
ThreadPoolExecutor提供了一些基本且好用的方法来监控线程池的运行状况:
方法名 | 说明 |
---|---|
getTaskCount() | 线程池队列中须要执行的任务数量 |
getCompletedTaskCount() | 线程池中已经执行完成的任务数量 |
getActiveCount() | 线程池中正在执行任务的线程数量 |
若是咱们想要更加全面的监控线程池的运行状态以及任务的执行过程。能够继承ThreadPoolExecutor来自定义线程池。
本篇文章围绕ThreadPoolExecutor,系统介绍了线程池的实现;以及实际项目中如何正确的使用线程池。经过本篇文章的写做,本身对于线程池的认识有多了一些不同的感受。好比clt变量的设计真的很精妙。像Doug Lea大神致以崇高的敬意!