使用 java8 lambda 表达式大半年了,一直都知道底层使用的是 Fork/Join 框架,今天终于有机会来学学 Fork/Join 框架了。java
Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每一个小任务结果后获得大任务结果的框架。面试
Fork/Join 的运行流程示意图:
算法
好比,一个 1+2+3+...+100 的工做任务,咱们能够把它 Fork 成 10 个子任务,分别计算这 10 个子任务的运行结果。最后再把 10 个子任务的结果 Join 起来,汇总成最后的结果。数组
为了减小线程间的竞争,一般把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应。可是,有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其它线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。线程的这种执行方式,咱们称之为“工做窃取”算法。并发
实现 Fork/Join 框架的设计,大抵须要两步:框架
首先咱们须要建立一个 ForkJoin 任务,把大任务分割成子任务,若是子任务不够小,则继续往下分,直到分割出的子任务足够小。dom
在 Java 中咱们可使用 ForkJoinTask 类,它提供在任务中执行 fork() 和 join() 操做的机制,一般状况下,咱们只须要继承它的子类:异步
分割的子任务分别放在双端队列里,而后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,而后合并这些数据。ide
在 Java 中任务的执行须要经过 ForkJoinPool 来执行。性能
来一个阿里面试题:百万级 Integer 数据量的一个 array 求和。
public class ArrayCountTask extends RecursiveTask<Long> { /** * 阈值 */ private static final Integer THRESHOLD = 10000; private Integer[] array; private Integer start; private Integer end; public ArrayCountTask(Integer[] array, Integer start, Integer end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; // 最小子任务计算 if (end - start <= THRESHOLD) { for (int i = start; i < end; i++) { sum += array[i]; } } else { // 把大于阈值的任务继续往下拆分,有点相似递归的思惟。 recursive 就是递归的意思。 int middle = (start + end) >>> 1; ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle); ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end); // 执行子任务 //leftArrayCountTask.fork(); //rightArrayCountTask.fork(); // invokeAll 方法使用 invokeAll(leftArrayCountTask, rightArrayCountTask); //等待子任务执行完,并获得其结果 Long leftJoin = leftArrayCountTask.join(); Long rightJoin = rightArrayCountTask.join(); // 合并子任务的结果 sum = leftJoin + rightJoin; } return sum; } }
public static void main(String[] args) { // 1. 造一个 int 类型的百万级别数组 Integer[] array = new Integer[150000000]; for (int i = 0; i < array.length; i++) { array[i] = new Random().nextInt(100); } // 2.普通方式计算结果 long start = System.currentTimeMillis(); long sum = 0; for (int i = 0; i < array.length; i++) { sum += array[i]; } long end = System.currentTimeMillis(); System.out.println("普通方式计算结果:" + sum + ",耗时:" + (end - start)); long start2 = System.currentTimeMillis(); // 3.fork/join 框架方式计算结果 ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length); ForkJoinPool forkJoinPool = new ForkJoinPool(); sum = forkJoinPool.invoke(arrayCountTask); long end2 = System.currentTimeMillis(); System.out.println("fork/join 框架方式计算结果:" + sum + ",耗时:" + (end2 - start2)); // 结论: // 1. 电脑 i5-4300m,双核四线程 // 2. 数组量少的时候,fork/join 框架要进行线程建立/切换的操做,性能不明显。 // 3. 数组量超过 100000000,fork/join 框架的性能才开始体现。 }
ForkJoinTask 与通常任务的主要区别在于它须要实现 compute 方法,在这个方法里,首先须要判断任务是否足够小,若是足够小就直接执行任务。若是不足够小,就必须分割成两个子任务,每一个子任务在调用 fork 方法时,又会进入 compute 方法,看看当前子任务是否须要继续分割成子任务,若是不须要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并获得其结果。
在执行子任务时调用 fork 方法并非最佳的选择,最佳的选择是 invokeAll 方法。由于执行 compute() 方法的线程自己也是一个 worker 线程,当对两个子任务调用 fork() 时,这个worker 线程就会把任务分配给另外两个 worker,可是它本身却停下来等待不干活了!这样就白白浪费了 Fork/Join 线程池中的一个 worker 线程,致使了4个子任务至少须要7个线程才能并发执行。
好比甲把 400 分红两个 200 后,fork() 写法至关于甲把一个 200 分给乙,把另外一个 200 分给丙,而后,甲成了监工,不干活,等乙和丙干完了他直接汇报工做。乙和丙在把 200 分拆成两个 100 的过程当中,他俩又成了监工,这样,原本只须要 4 个工人的活,如今须要 7 个工人才能完成,其中有3个是不干活的。
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成。ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务;而 ForkJoinWorkerThread 数组负责执行这些任务,ForkJoinWorkerThread 体现的就是“工做窃取”算法。
ForkJoinPool 使用 submit 或 invoke 提交的区别:invoke 同步执行,调用以后须要等待任务完成,才能执行后面的代码;submit 是异步执行,只有在 Future 调用 get 的时候会阻塞。
ForkJoinPool 继承自 AbstractExecutorService, 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。