在前面的全部示例中,由新的线程(由其Runnable
对象定义)和线程自己(由Thread
对象定义)完成的任务之间存在紧密的联系,这适用于小型应用程序,但在大型应用程序中,将线程管理和建立与应用程序的其他部分分开是有意义的,封装这些函数的对象称为执行器,如下小节详细描述了执行器。html
java.util.concurrent
包定义了三个执行器接口:java
Executor
,一个支持启动新任务的简单接口。ExecutorService
,Executor
的子接口,它添加了有助于管理生命周期的功能,包括单个任务和执行器自己。ScheduledExecutorService
,ExecutorService
的子接口,支持未来和/或按期执行任务。一般,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。git
Executor接口提供单个方法execute
,旨在成为常见线程建立语法的替代方法,若是r
是Runnable
对象,而且e
是Executor
对象,则能够替换github
(new Thread(r)).start();
为算法
e.execute(r);
可是,execute
的定义不太具体,低级别语法建立一个新线程并当即启动它,根据Executor
实现,execute
可能会作一样的事情,但更有可能使用现有的工做线程来运行r
,或者将r
放在队列中以等待工做线程变为可用(咱们将在线程池的部分中描述工做线程)。segmentfault
java.util.concurrent
中的执行器实现旨在充分利用更高级的ExecutorService
和ScheduledExecutorService
接口,尽管它们也能够与基本Executor
接口一块儿使用。api
ExecutorService接口使用相似但更通用的submit
方法补充execute
,与execute
同样,submit
接受Runnable
对象,但也接受Callable对象,这容许任务返回一个值。submit
方法返回一个Future对象,该对象用于检索Callable
返回值并管理Callable
和Runnable
任务的状态。数组
ExecutorService
还提供了提交大量Callable
对象的方法,最后,ExecutorService
提供了许多用于管理执行器关闭的方法,为了支持当即关闭,任务应该正确处理中断。服务器
ScheduledExecutorService接口使用schedule
补充其父级ExecutorService
的方法,在指定的延迟后执行Runnable
或Callable
任务,此外,接口定义了scheduleAtFixedRate
和scheduleWithFixedDelay
,它们以定义的间隔重复执行指定的任务。多线程
java.util.concurrent
中的大多数执行器实现都使用由工做线程组成的线程池,这种线程与它执行的Runnable
和Callable
任务分开存在,一般用于执行多个任务。
使用工做线程能够最小化因为建立线程而带来的开销,线程对象使用大量内存,在大型应用程序中,分配和释放许多线程对象会产生大量的内存管理开销。
一种常见类型的线程池是固定线程池,这种类型的池始终具备指定数量的线程,若是一个线程在它仍在使用时以某种方式被终止,它将自动被一个新线程替换,任务经过内部队列提交到池中,当活动任务多于线程时,该队列将保存额外的任务。
固定线程池的一个重要优势是使用它的应用程序能够优雅地降级,要理解这一点,请考虑一个Web服务器应用程序,其中每一个HTTP请求都由一个单独的线程处理。若是应用程序只是为每一个新的HTTP请求建立一个新线程,而且系统接收的请求数量超过了能够当即处理的数量,当全部这些线程的开销超过系统容量时,应用程序将忽然中止响应全部请求。因为能够建立的线程数量有限制,应用程序不会像HTTP请求进入时那样快地为它们提供服务,而是以系统可以承受的最快速度为它们提供服务。
建立使用固定线程池的执行器的一种简单方法是在java.util.concurrent.Executors中调用newFixedThreadPool工厂方法,该类还提供如下工厂方法:
ScheduledExecutorService
版本。若是上述工厂方法提供的执行器均没法知足你的需求,构造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例将为你提供额外选项。
fork/join框架是ExecutorService
接口的一个实现,可帮助你利用多个处理器,它专为能够递归分解成小块的工做而设计,目标是使用全部可用的处理能力来加强应用程序的性能。
与任何ExecutorService
实现同样,fork/join框架将任务分配给线程池中的工做线程,fork/join框架是不一样的,由于它使用了工做窃取算法,没有事情可作的工做线程能够从仍然忙碌的其余线程中窃取任务。
fork/join框架的中心是ForkJoinPool类,它是AbstractExecutorService
类的扩展,ForkJoinPool
实现了核心工做窃取算法,能够执行ForkJoinTask进程。
使用fork/join框架的第一步是编写执行工做片断的代码,你的代码应相似于如下伪代码:
if (个人工做部分足够小) 直接作这项工做 else 把个人工做分红两块 调用这两块并等待结果
将此代码包装在ForkJoinTask
子类中,一般使用其更专业的类型之一,RecursiveTask(能够返回结果)或RecursiveAction。
在ForkJoinTask
子类准备就绪后,建立表示要完成的全部工做的对象,并将其传递给ForkJoinPool
实例的invoke()
方法。
为了帮助你了解fork/join框架的工做原理,请考虑如下示例,假设你想模糊图像,原始源图像由整数数组表示,其中每一个整数包含单个像素的颜色值,模糊的目标图像也由与源相同大小的整数数组表示。
经过一次一个像素地处理源数组来完成模糊,将每一个像素与其周围像素进行平均(对红色、绿色和蓝色组件进行平均),并将结果放置在目标数组中,因为图像是大型数组,所以此过程可能须要很长时间,经过使用fork/join框架实现的算法,你能够利用多处理器系统上的并发处理,这是一个可能的实现:
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } } ...
如今,你实现抽象的compute()
方法,该方法能够直接执行模糊或将其拆分为两个较小的任务,简单的数组长度阈值有助于肯定是执行仍是拆分工做。
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
若是之前的方法在RecursiveAction
类的子类中,那么将任务设置为在ForkJoinPool
中运行是很简单的,涉及如下步骤:
建立一个表明要完成的全部工做的任务。
// source image pixels are in src // destination image pixels are in dst ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
建立将运行任务的ForkJoinPool
。
ForkJoinPool pool = new ForkJoinPool();
运行任务。
pool.invoke(fb);
有关完整源代码(包括建立目标图像文件的一些额外代码),请参阅ForkBlur示例。
除了使用fork/join框架来实如今多处理器系统上同时执行任务的自定义算法(例如ForkBlur.java
示例),Java SE中已经使用fork/join框架实现了一些一般有用的功能,在Java SE 8中引入的一种这样的实现被java.util.Arrays类用于其parallelSort()
方法,这些方法相似于sort()
,但经过fork/join框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快,可是,这些方法如何利用fork/join框架超出了Java教程的范围,有关此信息,请参阅Java API文档。
fork/join框架的另外一个实现由java.util.streams
包中的方法使用,这是Project Lambda计划用于Java SE 8版本的一部分,有关更多信息,请参阅Lambda表达式部分。