ForkJoinPool 是Java 1.7 引入的一种新的并发框架—— ForkJoin Framework。如下是重要的几点特性:java
ForkJoinPool & ForkJoinTask 概述:算法
引用:https://www.infoq.cn/article/fork-join-introduction并发
好比计算1至1000的正整数之和,使用ForkJoinPool 如何进行并行的计算。框架
package common.forkjoinpool; public interface Calculator { long sumUp(long[] numbers); }
package common.forkjoinpool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; public ForkJoinCalculator() { // 也可使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length - 1)); } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 当须要计算的数字小于6时,直接计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 不然,把任务一分为二,递归计算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } }
package common.forkjoinpool; import java.util.stream.LongStream; public class Main { public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 1000).toArray(); Calculator calculator = new ForkJoinCalculator(); System.out.println(calculator.sumUp(numbers)); // 打印结果500500 } }
这段代码的核心方法是 less
@Override protected Long compute() { // 当须要计算的数字小于6时,直接计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 不然,把任务一分为二,递归计算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } }
经过 compute() 方法,分解任务,分而治之。async
ForkJoinTask 继承关系以下,ide
RecursiveTask 和 RecursiveAction 的区别:函数
RecursiveAction性能
它是一种没有任何返回值的任务。只是作一些工做,好比写数据到磁盘,而后就退出了。 一个RecursiveAction能够把本身的工做分割成更小的几块, 这样它们能够由独立的线程或者CPU执行。 咱们能够经过继承来实现一个RecursiveAction。ui
RecursiveTask
它是一种会返回结果的任务。能够将本身的工做分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 能够有几个水平的分割和合并。
RecursiveAction以下使用方法,
package common.forkjointask; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class MyRecursiveAction extends RecursiveAction { /** * 每一个"小任务"最多只打印20个数 */ private static final int MAX = 20; private int start; private int end; public MyRecursiveAction(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { //当end-start的值小于MAX时,开始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "-i的值" + i); } } else { // 将大任务分解成两个小任务 int middle = (start + end) / 2; MyRecursiveAction left = new MyRecursiveAction(start, middle); MyRecursiveAction right = new MyRecursiveAction(middle, end); left.fork(); right.fork(); } } public static void main(String[] args) throws InterruptedException { // 建立包含Runtime.getRuntime().availableProcessors()返回值做为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 forkJoinPool.submit(new MyRecursiveAction(0, 1000)); while (!forkJoinPool.isTerminated()) { forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); } // 关闭线程池 forkJoinPool.shutdown(); } }
这段代码虽然打印了0-999这一千个数字,可是并非连续打印的,这是由于程序将这个打印任务进行了分解,分解后的任务会并行执行,因此不会按顺序打印。
翻看源码,ForkJoinPool 有这么一个 字段,
/** * Common (static) pool. Non-null for public use unless a static * construction exception, but internal usages null-check on use * to paranoically avoid potential initialization circularities * as well as to simplify generated code. */ static final ForkJoinPool common;
这就是 commonPool ,是ForkJoinPool 在类加载时候建立的,
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
在makeCommonPool 方法中建立commonPool的时候 调用的构造函数以下,
/** * Creates a {@code ForkJoinPool} with the given parameters, without * any security checks or parameter validation. Invoked directly by * makeCommonPool. */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
重要参数解释(咱们仍是结合英文注释来看):
1. parallelism:并行度( the parallelism level),默认状况下跟咱们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()能够获得咱们机器运行时可用的CPU个数
2. factory:建立新线程的工厂( the factory for creating new threads)。默认状况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
3. handler:线程异常状况下的处理器,该处理器在线程执行任务时因为某些没法预料到的错误而致使任务线程中断时进行一些处理,默认状况为null。
4. asyncMode:这个参数要注意,在ForkJoinPool中,每个工做线程都有一个独立的任务队列,asyncMode表示工做线程内的任务队列是采用何种方式进行调度,能够是先进先出FIFO,也能够是后进先出LIFO。若是为true,则线程池中的工做线程则使用先进先出方式进行任务调度,默认状况下是false。
ForkJoinPool
有一个 Async Mode ,效果是工做线程在处理本地任务时也使用 FIFO 顺序。这种模式下的ForkJoinPool
更接近因而一个消息队列,而不是用来处理递归式的任务。
fork()
作的工做只有一件事,既是把任务推入当前工做线程的工做队列里。能够参看如下的源代码:
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
join()
的工做则复杂得多,也是 join()
可使得线程免于被阻塞的缘由——不像同名的 Thread.join()
。
join()
的线程是不是 ForkJoinThread 线程。若是不是(例如 main 线程),则阻塞当前线程,等待任务完成。若是是,则不阻塞。将上述流程画成序列图的话就是这个样子:
public static void main(String[] args) throws InterruptedException { // 建立包含Runtime.getRuntime().availableProcessors()返回值做为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 forkJoinPool.submit(new MyRecursiveAction(0, 1000)); while (!forkJoinPool.isTerminated()) { forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); } // 关闭线程池 forkJoinPool.shutdown(); }
其实除了前面介绍过的每一个工做线程本身拥有的工做队列之外,ForkJoinPool
自身也拥有工做队列,这些工做队列的做用是用来接收由外部线程(非 ForkJoinThread
线程)提交过来的任务,而这些工做队列被称为 submitting queue 。
submit()
和 fork()
其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操做)。submitting queue 和其余 work queue 同样,是工做线程”窃取“的对象,所以当其中的任务被一个工做线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。
参考:http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
=======END=======