本文首发于一世流云的专栏: https://segmentfault.com/blog...
算法领域有一种基本思想叫作“分治”,所谓“分治”就是将一个难以直接解决的大问题,分割成一些规模较小的子问题,以便各个击破,分而治之。java
好比:对于一个规模为N的问题,若该问题能够容易地解决,则直接解决;不然将其分解为K个规模较小的子问题,这些子问题互相独立且与原问题性质相同,递归地解这些子问题,而后将各子问题的解合并获得原问题的解,这种算法设计策略叫作分治法。算法
许多基础算法都运用了“分治”的思想,好比二分查找、快速排序等等。编程
基于“分治”的思想,J.U.C在JDK1.7时引入了一套Fork/Join框架。Fork/Join框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务能够继续往下分解,当多个不一样的子任务都执行完成后,能够将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果:segmentfault
从上述Fork/Join框架的描述能够看出,咱们须要一些线程来执行Fork出的任务,在实际中,若是每次都建立新的线程执行任务,对系统资源的开销会很大,因此Fork/Join框架利用了线程池来调度任务。数组
另外,这里能够思考一个问题,既然由线程池调度,根据咱们以前学习普通/计划线程池的经验,必然存在两个要素:并发
通常的线程池只有一个任务队列,可是对于Fork/Join框架来讲,因为Fork出的各个子任务实际上是平行关系,为了提升效率,减小线程竞争,应该将这些平行的任务放到不一样的队列中去,如上图中,大任务分解成三个子任务:子任务一、子任务二、子任务3,那么就建立三个任务队列,而后再建立3个工做线程与队列一一对应。框架
因为线程处理不一样任务的速度不一样,这样就可能存在某个线程先执行完了本身队列中的任务的状况,这时为了提高效率,咱们可让该线程去“窃取”其它任务队列中的任务,这就是所谓的工做窃取算法。dom
“工做窃取”的示意图以下,当线程1执行完自身任务队列中的任务后,尝试从线程2的任务队列中“窃取”任务:异步
对于通常的队列来讲,入队元素都是在“队尾”,出队元素在“队首”,要知足“工做窃取”的需求,任务队列应该支持从“队尾”出队元素,这样能够减小与其它工做线程的冲突(由于正常状况下,其它工做线程从“队首”获取本身任务队列中的任务),知足这一需求的任务队列其实就是咱们在juc-collections框架中介绍过的双端阻塞队列—— LinkedBlockingDeque。
固然,出于性能考虑,J.U.C中的Fork/Join框架并无直接利用LinkedBlockingDeque做为任务队列,而是本身从新实现了一个。
为了给接下来的分析F/J框架组件作铺垫,咱们先经过一个简单示例看下Fork/Join框架的基本使用。async
假设有个很是大的long[]数组,经过FJ框架求解数组全部元素的和。
任务类定义,由于须要返回结果,因此继承RecursiveTask,并覆写compute方法。任务的fork经过ForkJoinTask的fork方法执行,join方法方法用于等待任务执行后返回:
public class ArraySumTask extends RecursiveTask<Long> { private final int[] array; private final int begin; private final int end; private static final int THRESHOLD = 100; public ArraySumTask(int[] array, int begin, int end) { this.array = array; this.begin = begin; this.end = end; } @Override protected Long compute() { long sum = 0; if (end - begin + 1 < THRESHOLD) { // 小于阈值, 直接计算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); subtask1.fork(); subtask2.fork(); long sum1 = subtask1.join(); long sum2 = subtask2.join(); sum = sum1 + sum2; } return sum; } }
调用方以下:
public class Main { public static void main(String[] args) { ForkJoinPool executor = new ForkJoinPool(); ArraySumTask task = new ArraySumTask(new int[10000], 0, 9999); ForkJoinTask future = executor.submit(task); // some time passed... if (future.isCompletedAbnormally()) { System.out.println(future.getException()); } try { System.out.println("result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
注意:ForkJoinTask在执行的时候可能会抛出异常,可是没办法在主线程里直接捕获异常,因此ForkJoinTask提供了isCompletedAbnormally()
方法来检查任务是否已经抛出异常或已经被取消了,而且能够经过ForkJoinTask的getException
方法获取异常.
在前几小节中,咱们简要介绍了Fork/Join框架和它的使用。本节咱们将更进一步,深刻F/J框架,了解它的各个组件的关系和核心设计思想,本节不会涉及太多的源码分析,而是参考 Doug Lea的这篇论文《A Java Fork/Join Framework》,从宏观上分析F/J框架,而后分析整个框架的调度流程,阅读完本节后,在下一节——Fork/Join框架(2) 实现中,咱们再去深刻源码会轻松不少。
F/J框架的实现很是复杂,内部大量运用了位操做和无锁算法,撇开这些实现细节不谈,该框架主要涉及三大核心组件:ForkJoinPool
(线程池)、ForkJoinTask
(任务)、ForkJoinWorkerThread
(工做线程),外加WorkQueue
(任务队列):
ForkJoinPool做为Executors框架的一员,从外部看与其它线程池并无什么区别,仅仅是ExecutorService的一个实现类:
ForkJoinPool的主要工做以下:
invoke
/execute
/submit
方法提交任务);WorkQueue[]
)的初始化和管理;注意:ForkJoinPool提供了3类外部提交任务的方法: invoke、 execute、 submit,它们的主要区别在于任务的执行方式上。
ForkJoinPool对象的构建有两种方式:
ForkJoinPool.commonPool()
静态方法构造。JDK8之后,ForkJoinPool又提供了一个静态方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例,主要是为了简化线程池的构建,这个ForkJoinPool实例能够知足大多数的使用场景:
public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
ForkJoinPool对外提供的3种构造器,其实最终都调用了下面这个构造器:
/** * @param parallelism 并行级别, 默认为CPU核心数 * @param factory 工做线程工厂 * @param handler 异常处理器 * @param mode 调度模式: true表示FIFO_QUEUE; false表示LIFO_QUEUE * @param workerNamePrefix 工做线程的名称前缀 */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long) (-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
这些入参目前不用关注,咱们重点是mode
这个字段,ForkJoinPool支持两种模式:
mode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE
注意:这里的同步/异步并非指F/J框架自己是采用同步模式仍是采用异步模式工做,而是指其中的工做线程的工做方式。在F/J框架中,每一个工做线程(Worker)都有一个属于本身的任务队列(WorkQueue),这是一个底层采用数组实现的 双向队列。
同步是指:对于工做线程(Worker)自身队列中的任务,采用 后进先出(LIFO)的方式执行;异步是指:对于工做线程(Worker)自身队列中的任务,采用 先进先出(FIFO)的方式执行。
从Fork/Join框架的描述上来看,“任务”必需要知足必定的条件:
所以,J.U.C提供了一个抽象类——ForkJoinTask,来做为该类Fork/Join任务的抽象定义:
ForkJoinTask实现了Future接口,是一个异步任务,咱们在使用Fork/Join框架时,通常须要使用线程池来调度任务,线程池内部调度的其实都是ForkJoinTask任务(即便提交的是一个Runnable或Callable任务,也会被适配成ForkJoinTask)。
除了ForkJoinTask,Fork/Join框架还提供了两个它的抽象实现,咱们在自定义ForkJoin任务时,通常继承这两个类:
public abstract class RecursiveAction extends ForkJoinTask<Void> { /** * 该任务的执行,子类覆写该方法 */ protected abstract void compute(); public final Void getRawResult() { return null; } protected final void setRawResult(Void mustBeNull) { } protected final boolean exec() { compute(); return true; } }
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { /** * 该任务的执行结果. */ V result; /** * 该任务的执行,子类覆写该方法 */ protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
ForkJoinTask除了和ForkJoinPool 结合使用外,也能够单独使用,当咱们调用ForkJoinTask的fork方法时,其内部会经过
ForkJoinPool.commonPool()
方法建立线程池,而后将本身做为任务提交给线程池。
Fork/Join框架中,每一个工做线程(Worker)都有一个本身的任务队列(WorkerQueue), 因此须要对通常的Thread作些特性化处理,J.U.C提供了ForkJoinWorkerThread类做为ForkJoinPool中的工做线程:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // 该工做线程归属的线程池 final ForkJoinPool.WorkQueue workQueue; // 对应的任务队列 protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); // 指定工做线程名称 this.pool = pool; this.workQueue = pool.registerWorker(this); } // ... }
ForkJoinWorkerThread 在构造过程当中,会保存所属线程池信息和与本身绑定的任务队列信息。同时,它会经过ForkJoinPool的registerWorker
方法将本身注册到线程池中。
线程池中的每一个工做线程(ForkJoinWorkerThread)都有一个本身的任务队列(WorkQueue),工做线程优先处理自身队列中的任务(LIFO或FIFO顺序,由线程池构造时的参数
mode
决定),自身队列为空时,以FIFO的顺序随机窃取其它队列中的任务。
任务队列(WorkQueue)是ForkJoinPool与其它线程池区别最大的地方,在ForkJoinPool内部,维护着一个WorkQueue[]
数组,它会在外部首次提交任务)时进行初始化:
volatile WorkQueue[] workQueues; // main registry
当经过线程池的外部方法( submit、 invoke、 execute)提交任务时,若是WorkQueue[]
没有初始化,则会进行初始化;而后根据数组大小和线程随机数(ThreadLocalRandom.probe
)等信息,计算出任务队列所在的数组索引(这个索引必定是 偶数),若是索引处没有任务队列,则初始化一个,再将任务入队。也就是说,经过外部方法提交的任务必定是在偶数队列,没有绑定工做线程。
WorkQueue做为ForkJoinPool的内部类,表示一个双端队列。双端队列既能够做为栈使用(LIFO),也能够做为队列使用(FIFO)。ForkJoinPool的“工做窃取”正是利用了这个特色,当工做线程从本身的队列中获取任务时,默认老是以栈操做(LIFO)的方式从栈顶取任务;当工做线程尝试窃取其它任务队列中的任务时,则是FIFO的方式。
咱们在ForkJoinPool一节中曾讲过,能够指定线程池的同步/异步模式(mode参数),其做用就在于此。同步模式就是“栈操做”,异步模式就是“队列操做”,影响的就是工做线程从本身队列中取任务的方式。
ForkJoinPool中的工做队列能够分为两类:
文字描述不太好理解,咱们经过示意图来看下任务入队和“工做窃取”的整个过程:
假设如今经过ForkJoinPool的submit方法提交了一个FuturetTask任务,参考 使用示例。
初始状态下,线程池中的任务队列为空,workQueues == null
,也没有工做线程:
此时会初始化任务队列数组WorkQueue[]
,大小为2的幂次,而后在某个槽位(偶数位)初始化一个任务队列(WorkQueue
),并插入任务:
注意,因为是非工做线程经过外部方法提交的任务,因此这个任务队列并无绑定工做线程。
之因此是2的幂次,是因为ForkJoinPool采用了一种随机算法(相似 ConcurrentHashMap的随机算法),该算法经过线程池随机数(ThreadLocalRandom的probe值)和数组的大小计算出工做线程所映射的数组槽位,这种算法要求数组大小为2的幂次。
首次提交任务后,因为没有工做线程,因此会建立一个工做线程,同时在某个奇数槽的位置建立一个与它绑定的任务队列,以下图:
ForkJoinWorkThread_1会随机扫描workQueues中的队列,直到找到一个能够窃取的队列——workQueues[2]
,而后从该队列的base
端获取任务并执行,并将base
加1:
窃取到的任务是FutureTask,ForkJoinWorkThread_1最终会调用它的compute
方法(子类继承ForkJoinTask,覆写compute,参考本文的使用示例),该方法中会新建两个子任务,并执行它们的fork
方法:
@Override protected Long compute() { long sum = 0; if (end - begin + 1 < THRESHOLD) { // 小于阈值, 直接计算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); subtask1.fork(); subtask2.fork(); long sum1 = subtask1.join(); long sum2 = subtask2.join(); sum = sum1 + sum2; } return sum; }
以前说过,因为是由工做线程ForkJoinWorkThread_1来调用FutureTask的fork
方法,因此会将这两个子任务放入ForkJoinWorkThread_1自身队列中:
而后,ForkJoinWorkThread_1会阻塞等待任务1和任务2的结果(先在subtask1.join
等待):
long sum1 = subtask1.join(); long sum2 = subtask2.join();
从这里也能够看出,任务放到哪一个队列,实际上是 由调用线程来决定的(根据线程探针值probe计算队列索引)。若是调用线程是工做线程,则必然有本身的队列( task queue),则任务都会放到本身的队列中;若是调用线程是其它线程(如主线程),则建立没有工做线程绑定的任务队列( submissions queue),而后存入任务。
ForkJoinWorkThread_1调用两个子任务1和2的fork
方法,除了将它们放入本身的任务队列外,还会致使新增一个工做线程ForkJoinWorkThread_2:
ForkJoinWorkThread_2运行后会像ForkJoinWorkThread_1那样从其它队列窃取任务,以下图,从ForkJoinWorkThread_1队列的base
端窃取一个任务(直接执行,并不会放入本身队列):
窃取完成后,ForkJoinWorkThread_2会直接执行任务1,又回到了FutureTask子类的compute
方法,假设此时又fork
出两个任务——任务三、任务4,则ForkJoinWorkThread_2最终会在任务3的join
方法上等待:
若是此时还有其它工做线程,则重复上述步骤:
窃取、执行、入队、join阻塞、返回
。ForkJoinTask的join方法内部逻辑很是复杂,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任务的完成,但事实上,ForkJoinTask存在一种
互助机制,即工做线程之间能够互相帮助执行任务,这里不详细展开,只须要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能会被其它工做线程唤醒。
咱们这里假设ForkJoinWorkThread_2被其它某个工做线程唤醒,任务3和任务4的join方法依次返回告终果,那么任务1的结果也会返回,因而ForkJoinWorkThread_1也被唤醒(它在任务1的join上等待),而后ForkJoinWorkThread_1会继续执行任务2的join方法,若是任务2再也不分解,则最终返回任务1和任务2的合并结果,计算结束。
ForkJoinWorkThread_1和ForkJoinWorkThread_2唤醒执行完窃取到的任务后,尚未结束,它们还会去看看自身队列中有无任务能够执行。
/** * Executes the given task and any remaining local tasks. */ final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
上述ForkJoinPool.WorkQueue.runTask
方法中,doExec()
就是执行窃取的任务,而execLocalTasks
用于执行队列自己的任务。
咱们假设此时的线程池是下面这种状态:
工做线程ForkJoinWorkThread_1调用execLocalTasks方法一次性执行本身队列中的全部任务,这时分红两种状况:
1.异步模式(asyncMode==true)
若是构造线程池时,asyncMode为true,表示以异步模式执行工做线程自身队列中的任务,此时会从 base -> top
遍历并执行全部任务。
2.同步模式(asyncMode==false)
若是构造线程池时,asyncMode为false(默认状况),表示以同步模式执行工做线程自身队列中的任务,此时会从 top -> base
遍历并执行全部任务。
任务的入队老是在top
端,因此当以同步模式遍历时,其实至关于栈操做(从栈顶pop元素);
若是是异步模式,至关于队列的出队操做(从base端poll元素)。
异步模式比较适合于那些不须要返回结果的任务。其实若是将队列中的任务当作一棵树(无环连通图)的话,异步模式相似于图的广度优先遍历,同步模式相似于图的深度优先遍历
假设此处以默认的同步模式遍历,ForkJoinWorkThread_1从栈顶开始执行并移除任务,先执行任务2并移除,再执行任务1并:
本章简要概述了Fork/Join框架的思想、主要组件及基本使用,Fork/Join框架的核心包含四大组件:ForkJoinTask任务类、ForkJoinPool线程池、ForkJoinWorkerThread工做线程、WorkQueue任务队列。
本章经过示例,描述了各个组件的关系以及ForkJoin线程池的整个调度流程,F/J框架的核心来自于它的工做窃取及调度策略,能够总结为如下几点:
下一章将经过源码分析更深刻的理解Fork/Join调度过程。