从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每一个小任务的结果获得这个大任务的结果。html
这种思想和MapReduce很像(input --> split --> map --> reduce --> output)java
主要有两步:算法
它的模型大体是这样的:线程池中的每一个线程都有本身的工做队列(PS:这一点和ThreadPoolExecutor不一样,ThreadPoolExecutor是全部线程公用一个工做队列,全部线程都从这个工做队列中取任务),当本身队列中的任务都完成之后,会从其它线程的工做队列中偷一个任务执行,这样能够充分利用资源。api
工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。工做窃取的运行流程图以下:数组
那么为何须要使用工做窃取算法呢?app
假如咱们须要作一个比较大的任务,咱们能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,因而把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应,好比A线程负责处理A队列里的任务。可是有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。框架
工做窃取算法的优势是充分利用线程进行并行计算,并减小了线程间的竞争,其缺点是在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且消耗了更多的系统资源,好比建立多个线程和多个双端队列。dom
An ExecutorService for running ForkJoinTasks.异步
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.async
ForkJoinPool与其它的ExecutorService区别主要在于它使用“工做窃取”:线程池中的全部线程都企图找到并执行提交给线程池的任务。当大量的任务产生子任务的时候,或者同时当有许多小任务被提交到线程池中的时候,这种处理是很是高效的。特别的,当在构造方法中设置asyncMode为true的时候这种处理更加高效。
ForkJoinTask表明运行在ForkJoinPool中的任务。
主要方法:
子类:
A thread managed by a ForkJoinPool, which executes ForkJoinTasks.
ForkJoinWorkerThread表明ForkJoinPool线程池中的一个执行任务的线程。
接下来,简略的看一下关键代码来加深对Fork/Join的理解。
WorkQueue是一个ForkJoinPool中的内部类,它是线程池中线程的工做队列的一个封装,支持任务窃取。
什么叫线程的任务窃取呢?就是说你和你的一个伙伴一块儿吃水果,你的那份吃完了,他那份没吃完,那你就偷偷的拿了他的一些水果吃了。存在执行2个任务的子线程,这里要讲成存在A,B两个个WorkQueue在执行任务,A的任务执行完了,B的任务没执行完,那么A的WorkQueue就从B的WorkQueue的ForkJoinTask数组中拿走了一部分尾部的任务来执行,能够合理的提升运行和计算效率。
能够看到:
前面咱们说过,每一个线程都有一个WorkQueue,而WorkQueue中有执行任务的线程(ForkJoinWorkerThread owner),还有这个线程须要处理的任务(ForkJoinTask<?>[] array)。那么这个新提交的任务就是加到array中。
从代码中咱们能够清楚地看到,ForkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以代表该线程属于哪一个线程池,它的工做队列是哪一个
能够看到,若是是ForkJoinWorkerThread运行过程当中fork(),则直接加入到它的工做队列中,不然,从新提交任务。
能够看到它们都会等待计算完成
下面盗两张图
批量发送消息
1 package com.cjs.boot.demo; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ForkJoinPool; 6 import java.util.concurrent.RecursiveAction; 7 import java.util.concurrent.TimeUnit; 8 9 public class ForkJoinPoolDemo { 10 11 class SendMsgTask extends RecursiveAction { 12 13 private final int THRESHOLD = 10; 14 15 private int start; 16 private int end; 17 private List<String> list; 18 19 public SendMsgTask(int start, int end, List<String> list) { 20 this.start = start; 21 this.end = end; 22 this.list = list; 23 } 24 25 @Override 26 protected void compute() { 27 28 if ((end - start) <= THRESHOLD) { 29 for (int i = start; i < end; i++) { 30 System.out.println(Thread.currentThread().getName() + ": " + list.get(i)); 31 } 32 }else { 33 int middle = (start + end) / 2; 34 invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list)); 35 } 36 37 } 38 39 } 40 41 public static void main(String[] args) throws InterruptedException { 42 List<String> list = new ArrayList<>(); 43 for (int i = 0; i < 123; i++) { 44 list.add(String.valueOf(i+1)); 45 } 46 47 ForkJoinPool pool = new ForkJoinPool(); 48 pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list)); 49 pool.awaitTermination(10, TimeUnit.SECONDS); 50 pool.shutdown(); 51 } 52 53 }
求和
1 package com.cjs.boot.demo; 2 3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ForkJoinPool; 5 import java.util.concurrent.ForkJoinTask; 6 import java.util.concurrent.RecursiveTask; 7 8 public class ForkJoinTaskDemo { 9 10 private class SumTask extends RecursiveTask<Integer> { 11 12 private static final int THRESHOLD = 20; 13 14 private int arr[]; 15 private int start; 16 private int end; 17 18 public SumTask(int[] arr, int start, int end) { 19 this.arr = arr; 20 this.start = start; 21 this.end = end; 22 } 23 24 /** 25 * 小计 26 */ 27 private Integer subtotal() { 28 Integer sum = 0; 29 for (int i = start; i < end; i++) { 30 sum += arr[i]; 31 } 32 System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum); 33 return sum; 34 } 35 36 @Override 37 protected Integer compute() { 38 39 if ((end - start) <= THRESHOLD) { 40 return subtotal(); 41 }else { 42 int middle = (start + end) / 2; 43 SumTask left = new SumTask(arr, start, middle); 44 SumTask right = new SumTask(arr, middle, end); 45 left.fork(); 46 right.fork(); 47 48 return left.join() + right.join(); 49 } 50 } 51 } 52 53 public static void main(String[] args) throws ExecutionException, InterruptedException { 54 int[] arr = new int[100]; 55 for (int i = 0; i < 100; i++) { 56 arr[i] = i + 1; 57 } 58 59 ForkJoinPool pool = new ForkJoinPool(); 60 ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length)); 61 System.out.println("最终计算结果: " + result.invoke()); 62 pool.shutdown(); 63 } 64 65 }
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678 ForkJoinPool.commonPool-worker-2: ∑(62~75)=897 ForkJoinPool.commonPool-worker-2: ∑(75~87)=978 ForkJoinPool.commonPool-worker-2: ∑(87~100)=1222 ForkJoinPool-1-worker-1: ∑(0~12)=78 ForkJoinPool-1-worker-1: ∑(12~25)=247 ForkJoinPool-1-worker-1: ∑(25~37)=378 ForkJoinPool-1-worker-1: ∑(37~50)=572 ForkJoinPool-1-worker-2: ∑(75~87)=978 ForkJoinPool-1-worker-3: ∑(50~62)=678 ForkJoinPool-1-worker-5: ∑(62~75)=897 ForkJoinPool.commonPool-worker-7: ∑(0~12)=78 ForkJoinPool.commonPool-worker-3: ∑(37~50)=572 ForkJoinPool-1-worker-4: ∑(87~100)=1222 ForkJoinPool.commonPool-worker-2: ∑(25~37)=378 ForkJoinPool.commonPool-worker-5: ∑(12~25)=247 最终计算结果: 5050
api文档中的两个示例
1 package com.cjs.boot.demo; 2 3 import java.util.Arrays; 4 import java.util.concurrent.*; 5 6 public class RecursiveActionDemo { 7 8 private static class SortTask extends RecursiveAction { 9 10 static final int THRESHOLD = 100; 11 12 final long[] array; 13 final int lo, hi; 14 15 public SortTask(long[] array, int lo, int hi) { 16 this.array = array; 17 this.lo = lo; 18 this.hi = hi; 19 } 20 21 public SortTask(long[] array) { 22 this(array, 0, array.length); 23 } 24 25 public void sortSequentially(int lo, int hi) { 26 Arrays.sort(array, lo, hi); 27 } 28 29 public void merge(int lo, int mid, int hi) { 30 long[] buf = Arrays.copyOfRange(array, lo, mid); 31 for (int i = 0, j = lo, k = mid; i < buf.length; j++) { 32 array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; 33 } 34 } 35 36 @Override 37 protected void compute() { 38 if (hi - lo < THRESHOLD) { 39 sortSequentially(lo, hi); 40 }else { 41 int mid = (lo + hi) >>> 1; 42 invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); 43 merge(lo, mid, hi); 44 } 45 } 46 } 47 48 public static void main(String[] args) throws ExecutionException, InterruptedException { 49 long[] array = new long[120]; 50 for (int i = 0; i < array.length; i++) { 51 array[i] = (long) (Math.random() * 1000); 52 } 53 System.out.println(Arrays.toString(array)); 54 55 ForkJoinPool pool = new ForkJoinPool(); 56 pool.submit(new SortTask(array)); 57 pool.awaitTermination(5, TimeUnit.SECONDS); 58 pool.shutdown(); 59 60 } 61 62 }
1 package com.cjs.boot.demo; 2 3 import java.util.concurrent.*; 4 5 public class RecursiveTaskDemo { 6 7 private static class Fibonacci extends RecursiveTask<Integer> { 8 9 final int n; 10 11 public Fibonacci(int n) { 12 this.n = n; 13 } 14 15 @Override 16 protected Integer compute() { 17 if (n <= 1) { 18 return n; 19 }else { 20 Fibonacci f1 = new Fibonacci(n - 1); 21 f1.fork(); 22 Fibonacci f2 = new Fibonacci(n - 1); 23 return f2.compute() + f1.join(); 24 } 25 } 26 } 27 28 public static void main(String[] args) throws InterruptedException, ExecutionException { 29 ForkJoinPool pool = new ForkJoinPool(); 30 Future<Integer> future = pool.submit(new Fibonacci(10)); 31 System.out.println(future.get()); 32 pool.shutdown(); 33 } 34 35 }
http://gee.cs.oswego.edu/dl/papers/fj.pdf
http://ifeve.com/talk-concurrency-forkjoin/
https://www.cnblogs.com/senlinyang/p/7885964.html
https://blog.csdn.net/u012403290/article/details/70917810