首先,咱们为何须要线程池?
让咱们先来了解下什么是 对象池 技术。某些对象(好比线程,数据库链接等),它们建立的代价是很是大的 —— 相比于通常对象,它们建立消耗的时间和内存都很大(并且这些对象销毁的代价比通常对象也大)。因此,若是咱们维护一个 池,每次使用完这些对象以后,并不销毁它,而是将其放入池中,下次须要使用时就直接从池中取出,即可以免这些对象的重复建立;同时,咱们能够固定 池的大小,好比设置池的大小为 N —— 即池中只保留 N 个这类对象 —— 当池中的 N 个对象都在使用中的时候,为超出数量的请求设置一种策略,好比 排队等候 或者 直接拒绝请求 等,从而避免频繁的建立此类对象。
线程池 即对象池的一种(池中的对象为线程 Thread
),相似的还有 数据库链接池(池中对象为数据库链接 Connection
)。合理利用线程池可以带来三个好处(参考文末的 References[1]):html
本文只介绍 Java 中线程池的基本使用,不会过多的涉及到线程池的原理。若是有兴趣的读者须要深刻理解线程池的实现原理,能够参考文末的 References。java
JDK 中线程池的基础架构以下:数据库
执行器 Executor
是顶级接口,只包含了一个 execute
方法,用来执行一个 Runnable
任务:segmentfault
执行器服务 ExecutorService
接口继承了 Executor
接口,ExecutorService
是全部线程池的基础接口,它定义了 JDK 中线程池应该实现的基本方法:缓存
线程池执行器 ThreadPoolExecutor
是基础线程池的核心实现,而且能够经过定制 ThreadPoolExecutor
的构造参数或者继承 ThreadPoolExecutor
,实现本身的线程池;多线程
ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
,是能执行周期性任务或定时任务的线程池;架构
ForkJoinPool
是 JDK1.7 时添加的类,做为对 Fork/Join 型线程池的实现。ide
本文只介绍 ThreadPoolExecutor
线程池的使用,ScheduledThreadPoolExecutor
和 ForkJoinPool
会在以后的文章中介绍。工具
查看 ThreadPoolExecutor
的源码可知,在 ThreadPoolExecutor
的内部,将每一个池中的线程包装为了一个 Worker
:性能
而后在 ThreadPoolExecutor
中定义了一个 HashSet<Worker>
,做为 “池”:
设置一个合适的线程池(即自定义 ThreadPoolExecutor
)是比较麻烦的,所以 JDK 经过 Executors
这个工厂类为咱们提供了一些预先定义好的线程池:
一、固定大小的线程池
建立一个包含 nThreads 个工做线程的线程池,这 nThreads 个线程共享一个无界队列(即不限制大小的队列);当新任务提交到线程池时,若是当前没有空闲线程,那么任务将放入队列中进行等待,直到有空闲的线程来从队列中取出该任务并运行。
(经过 Runtime.getRuntime().availableProcessors()
能够得到当前机器可用的处理器个数,对于计算密集型的任务,固定大小的线程池的 nThreads 设置为这个值时,通常能得到最大的 CPU 使用率)
二、单线程线程池
建立一个只包含一个工做线程的线程池,它的功能能够简单的理解为 即 newFixedThreadPool
方法传入参数为 1 的状况。可是与 newFixedThreadPool(1)
不一样的是,若是线程池中这个惟一的线程意外终止,线程池会建立一个新线程继续执行以后的任务。
三、可缓存线程的线程池
建立一个可缓存线程的线程池。当新任务提交到线程池时,若是当前线程池中有空闲线程可用,则使用空闲线程来运行任务,不然新建一个线程来运行该任务,并将该线程添加到线程池中;并且该线程池会终止并移除那些超过 60 秒未被使用的空闲线程。因此这个线程池表现得就像缓存,缓存的资源为线程,缓存的超时时间为 60 秒。根据 JDK 的文档,当任务的运行时间都较短的时候,该线程池有利于提升性能。
咱们看到每一个构造线程池的工厂方法都有一个带 ThreadFactory
的重载形式。ThreadFactory
即线程池用来新建线程的工厂,每次线程池须要新建一个线程时,调用的就是这个 ThreadFactory
的 newThread
方法:
(若是不提供自定义的 ThreadFactory
,那么使用的就是 DefaultThreadFactory
—— Executors
内定义的内部类)
好比咱们要为线程池中的每一个线程提供一个特定的名字,那么咱们就能够自定义 ThreadFactory
并重写其 newThread
方法:
public class SimpleThreadFactory implements ThreadFactory { private AtomicInteger id = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Test_Thread-" + id.getAndIncrement()); return thread; } }
经过 JDK 的源码咱们能够知道,以上三种线程池的实现都是基于 ThreadPoolExecutor
:
下面咱们来看一下线程池的基础接口 ExecutorService
中每一个方法的含义。
首先是从 Executor
接口继承到的 execute
方法:
使用该方法即将一个 Runnable
任务交给线程池去执行。
submit
方法:
submit
方法会提交一个任务去给线程池执行,该任务能够是带返回结果的 Callable<V>
任务,也能够是一开始就指定结果的 Runnable
任务,或者不带结果的 Runnable
任务(此时即一开始指定结果为 null
)。submit
方法会返回一个与所提交任务相关联的 Future<V>
。经过 上一篇文章 咱们能够知道,Future<V>
的 get
方法能够等待任务执行完毕并返回结果。因此经过 Future<V>
,咱们能够与已经提交到线程池的任务进行交互。submit
提交任务及任务运行过程大体以下:
Runnable
或者 Callable<V>
任务;newTaskFor
方法构造出 FutureTask<V>
;(由 上一篇文章 可知,FutureTask<V>
实现了 Runnable
和 Future<V>
两个接口,从而 FutureTask<V>
能够做为 Runnable
交给 Worker
(Thread
)去运行,也能够做为一个 Future<V>
与任务交互)
![newTaskFor 方法][19]
execute
方法将 FutureTask<V>
交给当前的 Worker
去运行,并将 FutureTask<V>
以 Future<V>
返回;Worker
执行任务(即运行 run
方法),在任务完成后,为 Future<V>
(FutureTask<V>
) 设置结果 —— 设置结果以前,调用 Future<V>
的 get
方法会让调用线程处于阻塞状态;Future<V>
的 get
方法,得到任务的结果。invokeAll
方法:
invokeAll
方法能够一次执行多个任务,但它并不一样等于屡次调用 submit
方法。submit
方法是非阻塞的,每次调用 submit
方法提交任务到线程池以后,会当即返回与任务相关联的 Future<V>
,而后当前线程继续向后执行;
而 invokeAll
方法是阻塞的,只有当提交的多个任务都执行完毕以后,invokeAll
方法才会返回,执行结果会以List<Future<V>>
返回,该 List<Future<V>>
中的每一个 Future<V>
是和提交任务时的 Collection<Callable<V>>
中的任务 Callable<V>
一 一对应的。带 timeout 参数的 invokeAll
就是设置一个超时时间,若是超过这个时间 invokeAll
中提交的全部任务还有没所有执行完,那么没有执行完的任务会被取消(中断),以后一样以一个 List<Future<V>>
返回执行的结果。
invokeAny
方法:
invokeAny
方法也是阻塞的,与 invokeAll
方法的不一样之处在于,当所提交的一组任务中的任何一个任务完成以后,invokeAny
方法便会返回(返回的结果即是那个已经完成的任务的返回值),而其余任务会被取消(中断)。
举一个 invokeAny
使用的例子:电脑有 C、D、E、F 四个盘,咱们须要找一个文件,可是咱们不知道这个文件位于哪一个盘中,咱们即可以使用 invokeAny
,并提交四个任务(对应于四个线程)分别查找 C、D、E、F 四个盘,若是哪一个线程找到了这个文件,那么此时 invokeAny
便中止阻塞并返回结果,同时取消其余任务。
shutdown
方法:
shutdown
方法的做用是向线程池发送关闭的指令。一旦在线程池上调用 shutdown
方法以后,线程池便不能再接受新的任务;若是此时还向线程池提交任务,那么将会抛出 RejectedExecutionException
异常。以后线程池不会马上关闭,直到以前已经提交到线程池中的全部任务(包括正在运行的任务和在队列中等待的任务)都已经处理完成,才会关闭。
shutdownNow
方法:
与 shutdown
不一样,shutdownNow
会当即关闭线程池 —— 当前在线程池中运行的任务会所有被取消,而后返回线程池中全部正在等待的任务。
(值得注意的是,咱们 必须显式的关闭线程池,不然线程池不会本身关闭)
awaitTermination
方法:
awaitTermination
能够用来判断线程池是否已经关闭。调用 awaitTermination
以后,在 timeout 时间内,若是线程池没有关闭,则阻塞当前线程,不然返回 true
;当超过 timeout 的时间后,若线程池已经关闭则返回 true
,不然返回 false
。该方法通常这样使用:
shutdown
方法向线程池发送关闭的指令;awaitTermination
来检测到线程池是否已经关闭,能够得知线程池中全部的任务是否已经执行完毕;awaitTermination
方法的线程中止阻塞,并返回 true
;isShutdown()
方法,若是线程池已经调用 shutdown
或者 shutdownNow
,则返回 true
,不然返回 false
;
isTerminated()
方法,若是线程池已经调用 shutdown
而且线程池中全部的任务已经执行完毕,或者线程池调用了 shutdownNow
,则返回 true
,不然返回 false
。
经过以上介绍,咱们已经了解了 ExecutorService
中全部方法的功能,如今让咱们来实践 ExecutorService
的功能。
咱们继续使用 上一篇文章 的两个例子中的任务,首先是任务类型为 Runnable
的状况:
import java.util.*; import java.util.concurrent.*; public class RunnableTest { public static void main(String[] args) throws Exception { System.out.println("使用线程池运行 Runnable 任务:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池 List<AccumRunnable> tasks = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10); tasks.add(task); threadPool.execute(task); // 让线程池执行任务 task } threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕以后,线程池会关闭 threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待线程池关闭,等待的最大时间为 1 小时 int total = 0; for (AccumRunnable task : tasks) { total += task.getResult(); // 调用在 AccumRunnable 定义的 getResult 方法得到返回的结果 } System.out.println("Total: " + total); } static final class AccumRunnable implements Runnable { private final int begin; private final int end; private int result; public AccumRunnable(int begin, int end) { this.begin = begin; this.end = end; } @Override public void run() { result = 0; try { for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } } catch (InterruptedException ex) { ex.printStackTrace(System.err); } System.out.printf("(%s) - 运行结束,结果为 %d\n", Thread.currentThread().getName(), result); } public int getResult() { return result; } } }
运行结果:
能够看到 NetBeans 给出的运行时间为 2 秒 —— 由于每一个任务须要 1 秒的时间,而线程池中的线程个数固定为 5 个,因此线程池最多同时处理 5 个任务,于是 10 个任务总共须要 2 秒的运行时间。
任务类型为 Callable
:
import java.util.*; import java.util.concurrent.*; public class CallableTest { public static void main(String[] args) throws Exception { System.out.println("使用线程池运行 Callable 任务:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池 List<Future<Integer>> futures = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10); Future<Integer> future = threadPool.submit(task); // 提交任务 futures.add(future); } threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕以后,线程池会关闭 int total = 0; for (Future<Integer> future : futures) { total += future.get(); // 阻塞,直到任务结束,返回任务的结果 } System.out.println("Total: " + total); } static final class AccumCallable implements Callable<Integer> { private final int begin; private final int end; public AccumCallable(int begin, int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int result = 0; for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } System.out.printf("(%s) - 运行结束,结果为 %d\n", Thread.currentThread().getName(), result); return result; } } }
运行结果:
改写上面的代码,使用 invokeAll
来直接执行一组任务:
public static void main(String[] args) throws Exception { System.out.println("使用线程池 invokeAll 运行一组 Callable 任务:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池 List<AccumCallable> tasks = new ArrayList<>(10); // tasks 为一组任务 for (int i = 0; i < 10; i++) { tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); } List<Future<Integer>> futures = threadPool.invokeAll(tasks); // 阻塞,直到全部任务都运行完毕 int total = 0; for (Future<Integer> future : futures) { total += future.get(); // 返回任务的结果 } System.out.println("Total: " + total); threadPool.shutdown(); // 向线程池发送关闭的指令 }
运行结果:
线程池是很强大并且很方便的工具,它提供了对线程进行统一的分配和调控的各类功能。自 JDK1.5 时 JDK 添加了线程池的功能以后,通常状况下更推荐使用线程池来编写多线程程序,而不是直接使用 Thread
。
(invokeAny
也是很实用的方法,请有兴趣的读者本身实践)
References: