“分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采起了分而治之的思想。简单来讲,就是若是你要处理1000个数据,可是你并不具有处理1000个数据的能力,那么你能够只处理其中的10个,而后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。java
Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux 平台中,函数 fork()用来建立子进程,使得系统进程能够多一个执行分支。在 Java 中也沿用了相似的命名方式。并发
而 Join() 的含义和 Thread 类的 join 相似,表示等待。也就是使用 fork() 后系统多了一个执行分支(线程),因此须要等待这个执行分支执行完毕,才有可能获得最终的结果,所以 join 就是表示等待。框架
在实际使用中,若是毫无顾忌的使用 fork 开启线程进行处理,那么颇有可能致使系统开启过多的线程而严重影响性能。因此,在JDK中,给出一个 ForkJoinPool 线程池,对于 fork() 方法并不急着开启线程,而是提交给 ForkJoiinPool 线程池进行处理,以节省系统资源。ide
因为线程池的优化,提交的任务和线程数量并非一对一的关系。在绝大多数状况下,一个物理线程其实是须要处理多个逻辑任务的。所以,每一个线程必然须要拥有一个任务队列。所以,在实际执行过程当中,可能遇到这么一种状况:线程A已经把本身的任务都处理完了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助” 线程B,从线程 B的任务队列中拿一个任务来处理,尽量的达到平衡。值得注意的是:当线程试图帮助别人时,老是从任务队列的底部开始拿数据,而线程试图执行本身的任务时,则从相反的顶部开始拿。所以这种行为也十分有利于避免数据竞争。函数
咱们看看线程池 ForkJoinPool 的一个接口:高并发
/** * Submits a ForkJoinTask for execution. * * @param task the task to submit * @param <T> the type of the task's result * @return the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }
你能够向 ForkJoinPool 线程池提交一个 ForkJoinTask 任务。所谓 ForkJoinTask 任务就是支持 fork () 分解以及 join()等待的任务。 ForkJoinTask 有两个重要的子类,RecursiveAction 和 RecursiveTask。他们粉笔表示没有返回值的任务和能够携带返回值的任务。有点像 Rannable 和 Callable。性能
下面来要给简单的例子展现 Fork/Join 框架的使用。这里用来计算求和。优化
/** * Fork/Join 核心思想:分而治之 * * 著名的 MapReduce 也是这个思想。将任务进行分解,而后合并全部的结果。 * */ public class CountTask extends RecursiveTask<Long> { /** * 阀值 */ static final int THRESHOLD = 10000; long start; long end; public CountTask(long start, long end) { this.start = start; this.end = end; } /** * 有返回值的 * @return */ @Override protected Long compute() { long sum = 0; // 当阀值小于10000则不分解了 boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { // 2000 long step = (start + end) / 100; ArrayList<CountTask> subTasks = new ArrayList<>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) { lastOne = end; } //0-2000 个计算任务 * 100 CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; subTasks.add(subTask); subTask.fork();// fork } for (CountTask t : subTasks) { sum += t.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0, 200000L); // 将一个大的任务提交到池中 ForkJoinTask<Long> result = forkJoinPool.submit(task); long res = 0; try { // 等待运算结果 res = result.get(); System.out.println("sum = " + res); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
因为计算求和必须须要返回值,所以咱们选择了 RecursiveTask 做为任务的模型。首先咱们构造了一个大任务,提交给线程池,线程池会返回一个携带结果的任务,经过 get 方法能够获得最终结果。若是执行 get 方法时任务没有结束,那么主线程就会在 get 方法等待。this
再看看 CountTask 的实现,首先 CountTask 继承自 RecursiveTask ,能够携带返回值,这里的返回值类型设置为 long,定义一个 THRESHOLD 设置了任务分解的规模,也就是若是须要求和的总数大于 THRESHOLD 个,那么任务就须要再次分解,不然就直接执行。 每次分解时,简单的将原有任务划分红100个规模相等的小任务,并使用 fork() 提交子任务。以后,等待全部的子任务结束,并将结果再次求和。线程
再使用 ForkJoin的时候注意:若是任务的划分层次很深,一直得不到返回,那么可能出现两种状况: 第一,系统内的线程数量愈来愈多,致使性能严重降低。第二,函数的调用层次变的很深,最终致使栈溢出。
此外,ForkJoin 线程池使用一个无锁的栈来管理空闲线程,若是一个工做线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中,待未来有任务可用时,再从栈中唤醒这些线程。
本文来源自 《Java 高并发程序设计》,没有什么本身的看法。由于使用场景太少了。不过仍是能够看看源码来涨涨姿式的。嘿嘿。
good luck !!!!