JDK 1.7 时,标准类库添加了 ForkJoinPool
,做为对 Fork/Join 型线程池的实现。Fork 在英文中有 分叉 的意思,而 Join 有 合并 的意思。ForkJoinPool
的功能也是如此:Fork 将大任务分叉为多个小任务,而后让小任务执行,Join 是得到小任务的结果,而后进行合并,将合并的结果做为大任务的结果 —— 而且这会是一个递归的过程 —— 由于任务若是足够大,能够将任务多级分叉直到任务足够小。算法
因而可知,ForkJoinPool
能够知足 并行 地实现 分治算法(Divide-and-Conquer) 的须要。并发
ForkJoinPool
的类图以下:异步
能够看到 ForkJoinPool
实现了 ExecutorService
接口,因此首先 ForkJoinPool
也是一个 ExecutorService
(线程池)。于是 Runnable
和 Callable
类型的任务,ForkJoinPool
也能够经过 submit
、invokeAll
和 invokeAny
等方法来执行。可是标准类库还为 ForkJoinPool
定义了一种新的任务,它就是 ForkJoinTask<V>
。ide
ForkJoinTask
类图:高并发
ForkJoinTask<V>
用来专门定义 Fork/Join 型任务 —— 完成将大任务分割为小任务以及合并结果的工做。通常咱们不须要直接使用 ForkJoinTask<V>
,而是经过继承它的子类 RecursiveAction
和 RecursiveTask
并实现对应的抽象方法 —— compute
,来定义咱们本身的任务。其中,RecursiveAction
是不带返回值的 Fork/Join 型任务,因此使用此类任务并不产生结果,也就不涉及到结果的合并;而 RecursiveTask
是带返回值的 Fork/Join 型任务,使用此类任务的话,在任务结束前,咱们须要进行结果的合并。其中,经过 ForkJoinTask<V>
的 fork
方法,咱们能够产生子任务并执行;经过 join
方法,咱们能够得到子任务的结果。this
ForkJoinPool
可使用三种方法用来执行 ForkJoinTask
:spa
invoke
方法:线程
invoke
方法用来执行一个带返回值的任务(一般继承自RecursiveTask
),而且该方法是阻塞的,直到任务执行完毕,该方法才会中止阻塞并返回任务的执行结果。3d
submit
方法:code
除了从 ExecutorService
继承的 submit
方法外,ForkJoinPool
还定义了用来执行 ForkJoinTask
的 submit
方法 —— 通常该 submit
方法用来执行带返回值的ForkJoinTask
(一般继承自RecursiveTask
)。该方法是非阻塞的,调用以后将任务提交给 ForkJoinPool
去执行便当即返回,返回的即是已经提交到 ForkJoinPool
去执行的 task —— 由类图可知 ForkJoinTask
实现了 Future
接口,因此能够直接经过 task 来和已经提交的任务进行交互。
execute
方法:
除了从 Executor
得到的 execute
方法外,ForkJoinPool
也定义了用来执行ForkJoinTask
的 execute
方法 —— 通常该 execute
方法用来执行不带返回值的ForkJoinTask
(一般继承自RecursiveAction
) ,该方法一样是非阻塞的。
如今让咱们来实践下 ForkJoinPool
的功能:计算 π 的值。计算 π 的值有一个经过多项式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),并且多项式的项数越多,计算出的 π 的值越精确。
首先咱们定义用来估算 π 的 PiEstimateTask
:
class PiEstimateTask extends RecursiveTask<Double> { private final long begin; private final long end; private final long threshold; // 分割任务的临界值 public PiEstimateTask(long begin, long end, long threshold) { this.begin = begin; this.end = end; this.threshold = threshold; } @Override protected Double compute() { // 实现 compute 方法 if (end - begin <= threshold) { // 临界值之下,再也不分割,直接计算 int sign; // 符号,多项式中偶数位取 1,奇数位取 -1(位置从 0 开始) double result = 0.0; for (long i = begin; i < end; i++) { sign = (i & 1) == 0 ? 1 : -1; result += sign / (i * 2.0 + 1); } return result * 4; } // 分割任务 long middle = (begin + end) / 2; PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold); PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold); leftTask.fork(); // 异步执行 leftTask rightTask.fork(); // 异步执行 rightTask double leftResult = leftTask.join(); // 阻塞,直到 leftTask 执行完毕返回结果 double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果 return leftResult + rightResult; // 合并结果 } }
而后咱们使用 ForkJoinPool
的 invoke
执行 PiEstimateTask
:
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 计算 10 亿项,分割任务的临界值为 1 千万 PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); double pi = forkJoinPool.invoke(task); // 阻塞,直到任务执行完毕返回结果 System.out.println("π 的值:" + pi); forkJoinPool.shutdown(); // 向线程池发送关闭的指令 } }
运行结果:
咱们也可使用 submit
方法异步的执行任务(此处 submit
方法返回的 future 指向的对象即提交任务时的 task):
public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); Future<Double> future = forkJoinPool.submit(task); // 不阻塞 double pi = future.get(); System.out.println("π 的值:" + pi); System.out.println("future 指向的对象是 task 吗:" + (future == task)); forkJoinPool.shutdown(); // 向线程池发送关闭的指令 }
运行结果:
值得注意的是,选取一个合适的分割任务的临界值,对 ForkJoinPool
执行任务的效率有着相当重要的影响。临界值选取过大,任务分割的不够细,则不能充分利用 CPU;临界值选取太小,则任务分割过多,可能产生过多的子任务,致使过多的线程间的切换和加剧 GC 的负担从而影响了效率。因此,须要根据实际的应用场景选择一个合适的分割任务的临界值。
ForkJoinPool
相比于 ThreadPoolExecutor
,还有一个很是重要的特色(优势)在于,ForkJoinPool
具备 Work-Stealing (工做窃取)的能力。所谓 Work-Stealing,在 ForkJoinPool
中的实现为:线程池中每一个线程都有一个互不影响的任务队列(双端队列),线程每次都从本身的任务队列的队头中取出一个任务来运行;若是某个线程对应的队列已空而且处于空闲状态,而其余线程的队列中还有任务须要处理可是该线程处于工做状态,那么空闲的线程能够从其余线程的队列的队尾取一个任务来帮忙运行 —— 感受就像是空闲的线程去偷人家的任务来运行同样,因此叫 “工做窃取”。
Work-Stealing 的适用场景是不一样的任务的耗时相差比较大,即某些任务须要运行较长时间,而某些任务会很快的运行完成,这种状况下用 Work-Stealing 很合适;可是若是任务的耗时很平均,则此时 Work-Stealing 并不适合,由于窃取任务时不一样线程须要抢占锁,这可能会形成额外的时间消耗,并且每一个线程维护双端队列也会形成更大的内存消耗。因此 ForkJoinPool
并非 ThreadPoolExecutor
的替代品,而是做为对 ThreadPoolExecutor
的补充。
总结:ForkJoinPool
和 ThreadPoolExecutor
都是 ExecutorService
(线程池),但ForkJoinPool
的独特色在于:
ThreadPoolExecutor
只能执行 Runnable
和 Callable
任务,而 ForkJoinPool
不只能够执行 Runnable
和 Callable
任务,还能够执行 Fork/Join 型任务 —— ForkJoinTask
—— 从而知足并行地实现分治算法的须要;ThreadPoolExecutor
中任务的执行顺序是按照其在共享队列中的顺序来执行的,因此后面的任务须要等待前面任务执行完毕后才能执行,而 ForkJoinPool
每一个线程有本身的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些状况下 ForkJoinPool
能更大程度的提升并发效率。