本系列文章经补充和完善,已修订整理成书《Java编程的逻辑》(马俊昌著),由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买:京东自营连接 html
![]()
上节,咱们初步探讨了Java并发包中的任务执行服务,实际中,任务执行服务的主要实现机制是线程池,本节,咱们就来探讨线程池。java
线程池,顾名思义,就是一个线程的池子,里面有若干线程,它们的目的就是执行提交给线程池的任务,执行完一个任务后不会退出,而是继续等待或执行新任务。线程池主要由两个概念组成,一个是任务队列,另外一个是工做者线程,工做者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。git
线程池的概念相似于生活中的一些排队场景,好比在火车站排队购票、在医院排队挂号、在银行排队办理业务等,通常都由若干个窗口提供服务,这些服务窗口相似于工做者线程,而队列的概念是相似的,只是,在现实场景中,每一个窗口常常有一个单独的队列,这种排队难以公平,随着信息化的发展,愈来愈多的排队场合使用虚拟的统一队列,通常都是先拿一个排队号,而后按号依次服务。github
线程池的优势是显而易见的:编程
Java并发包中线程池的实现类是ThreadPoolExecutor,它继承自AbstractExecutorService,实现了ExecutorService,基本用法与上节介绍的相似,咱们就不赘述了。不过,ThreadPoolExecutor有一些重要的参数,理解这些参数对于合理使用线程池很是重要,接来下,咱们探讨这些参数。swift
ThreadPoolExecutor有多个构造方法,都须要一些参数,主要构造方法有:数组
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
第二个构造方法多了两个参数threadFactory和handler,这两个参数通常不须要,第一个构造方法会设置默认值。微信
参数corePoolSize, maximumPoolSize, keepAliveTime, unit用于控制线程池中线程的个数,workQueue表示任务队列,threadFactory用于对建立的线程进行一些配置,handler表示任务拒绝策略。下面咱们再来详细探讨下这些参数。多线程
线程池的大小主要与四个参数有关:并发
maximumPoolSize表示线程池中的最多线程数,线程的个数会动态变化,但这是最大值,无论有多少任务,都不会建立比这个值大的线程个数。
corePoolSize表示线程池中的核心线程个数,不过,这并非说,一开始就建立这么多线程,刚建立一个线程池后,实际上并不会建立任何线程。
通常状况下,有新任务到来的时候,若是当前线程个数小于corePoolSiz,就会建立一个新线程来执行该任务,须要说明的是,即便其余线程如今也是空闲的,也会建立新线程。
不过,若是线程个数大于等于corePoolSiz,那就不会当即建立新线程了,它会先尝试排队,须要强调的是,它是"尝试"排队,而不是"阻塞等待"入队,若是队列满了或其余缘由不能当即入队,它就不会排队,而是检查线程个数是否达到了maximumPoolSize,若是没有,就会继续建立线程,直到线程数达到maximumPoolSize。
keepAliveTime的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于corePoolSize时,额外空闲线程的存活时间,也就是说,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即keepAliveTime,若是到了时间仍是没有新任务,就会被终止。若是该值为0,表示全部线程都不会超时终止。
这几个参数除了能够在构造方法中进行指定外,还能够经过getter/setter方法进行查看和修改。
public void setCorePoolSize(int corePoolSize) public int getCorePoolSize() public int getMaximumPoolSize() public void setMaximumPoolSize(int maximumPoolSize) public long getKeepAliveTime(TimeUnit unit) public void setKeepAliveTime(long time, TimeUnit unit) 复制代码
除了这些静态参数,ThreadPoolExecutor还能够查看关于线程和任务数的一些动态数字:
//返回当前线程个数
public int getPoolSize() //返回线程池曾经达到过的最大线程个数 public int getLargestPoolSize() //返回线程池自建立以来全部已完成的任务数 public long getCompletedTaskCount() //返回全部任务数,包括全部已完成的加上全部排队待执行的 public long getTaskCount() 复制代码
ThreadPoolExecutor要求的队列类型是阻塞队列BlockingQueue,咱们在76节介绍过多种BlockingQueue,它们均可以用做线程池的队列,好比:
若是用的是无界队列,须要强调的是,线程个数最多只能达到corePoolSize,到达corePoolSize后,新的任务总会排队,参数maximumPoolSize也就没有意义了。
另外一面,对于SynchronousQueue,咱们知道,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,不然,老是会建立新线程,直到达到maximumPoolSize。
若是队列有界,且maximumPoolSize有限,则当队列排满,线程个数也达到了maximumPoolSize,这时,新任务来了,如何处理呢?此时,会触发线程池的任务拒绝策略。
默认状况下,提交任务的方法如execute/submit/invokeAll等会抛出异常,类型为RejectedExecutionException。
不过,拒绝策略是能够自定义的,ThreadPoolExecutor实现了四种处理方式:
它们都是ThreadPoolExecutor的public静态内部类,都实现了RejectedExecutionHandler接口,这个接口的定义为:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码
当线程池不能接受任务时,调用其拒绝策略的rejectedExecution方法。
拒绝策略能够在构造方法中进行指定,也能够经过以下方法进行指定:
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) 复制代码
默认的RejectedExecutionHandler是一个AbortPolicy实例,以下所示:
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
复制代码
而AbortPolicy的rejectedExecution实现就是抛出异常,以下所示:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
复制代码
咱们须要强调下,拒绝策略只有在队列有界,且maximumPoolSize有限的状况下才会触发。
若是队列无界,服务不了的任务老是会排队,但这不见得是指望的,由于请求处理队列可能会消耗很是大的内存,甚至引起内存不够的异常。
若是队列有界但maximumPoolSize无限,可能会建立过多的线程,占满CPU和内存,使得任何任务都难以完成。
因此,在任务量很是大的场景中,让拒绝策略有机会执行是保证系统稳定运行很重要的方面。
线程池还能够接受一个参数,ThreadFactory,它是一个接口,定义为:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
复制代码
这个接口根据Runnable建立一个Thread,ThreadPoolExecutor的默认实现是Executors类中的静态内部类DefaultThreadFactory,主要就是建立一个线程,给线程设置一个名称,设置daemon属性为false,设置线程优先级为标准默认优先级,线程名称的格式为: pool-<线程池编号>-thread-<线程编号>。
若是须要自定义一些线程的属性,好比名称,能够实现自定义的ThreadFactory。
线程个数小于等于corePoolSize时,咱们称这些线程为核心线程,默认状况下:
不过,ThreadPoolExecutor有以下方法,能够改变这个默认行为。
//预先建立全部的核心线程
public int prestartAllCoreThreads() //建立一个核心线程,若是全部核心线程都已建立,返回false public boolean prestartCoreThread() //若是参数为true,则keepAliveTime参数也适用于核心线程 public void allowCoreThreadTimeOut(boolean value) 复制代码
类Executors提供了一些静态工厂方法,能够方便的建立一些预配置的线程池,主要方法有:
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newCachedThreadPool() 复制代码
newSingleThreadExecutor基本至关于调用:
public static ExecutorService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
只使用一个线程,使用无界队列LinkedBlockingQueue,线程建立后不会超时终止,该线程顺序执行全部任务。该线程池适用于须要确保全部任务被顺序执行的场合。
newFixedThreadPool的代码为:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
使用固定数目的n个线程,使用无界队列LinkedBlockingQueue,线程建立后不会超时终止。和newSingleThreadExecutor同样,因为是无界队列,若是排队任务过多,可能会消耗很是大的内存。
newCachedThreadPool的代码为:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
它的corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime是60秒,队列为SynchronousQueue。
它的含义是,当新任务到来时,若是正好有空闲线程在等待任务,则其中一个空闲线程接受该任务,不然就老是建立一个新线程,建立的总线程个数不受限制,对任一空闲线程,若是60秒内没有新任务,就终止。
实际中,应该使用newFixedThreadPool仍是newCachedThreadPool呢?
在系统负载很高的状况下,newFixedThreadPool能够经过队列对新任务排队,保证有足够的资源处理实际的任务,而newCachedThreadPool会为每一个任务建立一个线程,致使建立过多的线程竞争CPU和内存资源,使得任何实际任务都难以完成,这时,newFixedThreadPool更为适用。
不过,若是系统负载不过高,单个任务的执行时间也比较短,newCachedThreadPool的效率可能更高,由于任务能够不经排队,直接交给某一个空闲线程。
在系统负载可能极高的状况下,二者都不是好的选择,newFixedThreadPool的问题是队列过长,而newCachedThreadPool的问题是线程过多,这时,应根据具体状况自定义ThreadPoolExecutor,传递合适的参数。
关于提交给线程池的任务,咱们须要特别注意一种状况,就是任务之间有依赖,这种状况可能会出现死锁。好比任务A,在它的执行过程当中,它给一样的任务执行服务提交了一个任务B,但须要等待任务B结束。
若是任务A是提交给了一个单线程线程池,就会出现死锁,A在等待B的结果,而B在队列中等待被调度。
若是是提交给了一个限定线程个数的线程池,也有可能出现死锁,咱们看个简单的例子:
public class ThreadPoolDeadLockDemo {
private static final int THREAD_NUM = 5;
static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
static class TaskA implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Future<?> future = executor.submit(new TaskB());
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("finished task A");
}
}
static class TaskB implements Runnable {
@Override
public void run() {
System.out.println("finished task B");
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
executor.execute(new TaskA());
}
Thread.sleep(2000);
executor.shutdown();
}
}
复制代码
以上代码使用newFixedThreadPool建立了一个5个线程的线程池,main程序提交了5个TaskA,TaskA会提交一个TaskB,而后等待TaskB结束,而TaskB因为线程已被占满只能排队等待,这样,程序就会死锁。
怎么解决这种问题呢?
替换newFixedThreadPool为newCachedThreadPool,让建立线程再也不受限,这个问题就没有了。
另外一个解决方法,是使用SynchronousQueue,它能够避免死锁,怎么作到的呢?对于普通队列,入队只是把任务放到了队列中,而对于SynchronousQueue来讲,入队成功就意味着已有线程接受处理,若是入队失败,能够建立更多线程直到maximumPoolSize,若是达到了maximumPoolSize,会触发拒绝机制,无论怎么样,都不会死锁。咱们将建立executor的代码替换为:
static ExecutorService executor = new ThreadPoolExecutor(
THREAD_NUM, THREAD_NUM, 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
复制代码
只是更改队列类型,运行一样的程序,程序不会死锁,不过TaskA的submit调用会抛出异常RejectedExecutionException,由于入队会失败,而线程个数也达到了最大值。
本节介绍了线程池的基本概念,详细探讨了其主要参数的含义,理解这些参数对于合理使用线程池是很是重要的,对于相互依赖的任务,须要特别注意,避免出现死锁。
ThreadPoolExecutor实现了生产者/消费者模式,工做者线程就是消费者,任务提交者就是生产者,线程池本身维护任务队列。当咱们碰到相似生产者/消费者问题时,应该优先考虑直接使用线程池,而非从新发明轮子,本身管理和维护消费者线程及任务队列。
在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,而后有任务完成就处理结果,而且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService,让咱们下一节来探讨它。
(与其余章节同样,本节全部代码位于 github.com/swiftma/pro…)
未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),从入门到高级,深刻浅出,老马和你一块儿探索Java编程及计算机技术的本质。用心原创,保留全部版权。