当咱们须要执行大量的小任务时,有经验的Java开发人员都会采用线程池来高效执行这些小任务。然而,有一种任务,例如,对超过1000万个元素的数组进行排序,这种任务自己能够并发执行,但如何拆解成小任务须要在任务执行的过程当中动态拆分。这样,大任务能够拆成小任务,小任务还能够继续拆成更小的任务,最后把任务的结果汇总合并,获得最终结果,这种模型就是Fork/Join模型。java
Java7引入了Fork/Join框架,咱们经过RecursiveTask这个类就能够方便地实现Fork/Join模式。数组
下面为一个Fork/Join例子:并发
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * Java7引入了Fork/Join框架,咱们经过RecursiveTask这个类就能够方便地实现Fork/Join模式。 * 如:对一个大数组进行并行求和的RecursiveTask */ public class SumTask extends RecursiveTask<Long>{ static final int THRESHOLD = 100; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { System.out.println("当前执行任务的线程name="+Thread.currentThread().getName()); //先判断任务是否是足够小,若是足够小,就直接计算并返回结果(注意模拟了1秒延时) if (end - start <= THRESHOLD) { // 若是任务足够小,直接计算: long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(String.format("compute %d~%d = %d", start, end, sum)); return sum; } // 任务太大,一分为二: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); // subtask1.fork(); // subtask2.fork(); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } public static void main(String[] args) throws Exception { // 建立随机数组成的数组: long[] array = new long[400]; for(int i=0;i<400;i++){ array[i]= i; } // fork/join task: ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4 ForkJoinTask<Long> task = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = fjp.invoke(task);//提交一个Fork/Join任务并发执行,而后得到异步执行的结果 long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } }
注:invokeAll(subtask1, subtask2); 和 subtask1.fork(); subtask2.fork(); 是有区别的,查看JDK的invokeAll()
方法的源码就能够发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,可是,它还会留一个任务本身执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程,故为了节约线程池中的线程利用率,应该用invokeAll()方法框架