在前面的文章"ForkJoin框架之ForkJoinTask"中梳理了ForkJoin框架的简要运行格架和异常处理流程,显然要理解ForkJoin框架的调度,包含工做窃取等思想,须要去ForkJoinPool中了解,而对于ForkJoinTask的拓展和使用则须要了解它的一些子类,前文中偶尔会提到ForkJoinTask的一个子类:CountedCompleter,直译为计数的完成器.java
前文也说过,JAVA8的并行流其实就是基于了ForkJoin框架实现,所以并行流其实就在使用咱们前面提到的工做窃取和分治思想.为了方便对于ForkJoinTask的理解,本文将详述CountedCompleter(同时在ForkJoinPool中也须要了解它),以及前文提到的工做线程ForkJoinWorkerThread,并简单看一看并行流.算法
根据doug的注释,CoutedCompleter是一个特殊的ForkJoinTask,它会在触发完成动做时,检查有没有挂起action,若没有则执行一个完成动做.这个概念有些抽象,必须结合源码和源码做者给出的示例加以理解,一样的,理解了它,也就理解了CountedCompleter的扩展类的实现方式,从而能阅读懂有关的源码(如并行流中涉及到运行集拆分,结果合并,运算调度等源码).编程
它也是一个抽象类,基于ForkJoinTask的exec函数进行了若干扩展.api
public abstract class CountedCompleter<T> extends ForkJoinTask<T> //任务的完成者,很明显这是一个全局的栈结构(暂时这么理解吧,其实也不太严格). final CountedCompleter<?> completer; //重要字段,表明完成前挂起的任务数量,用volatile修饰. volatile int pending; //带有completer的构造器. protected CountedCompleter(CountedCompleter<?> completer) { this.completer = completer; } //不带completer的构造器 protected CountedCompleter() { this.completer = null; } //抽象的compute方法,它是相似ForkJoinTask的扩展方式. public abstract void compute(); //重写的exec方法 protected final boolean exec() { //直接调用compute方法并返回false.回到ForkJoinTask类中的doExec方法,能够看到 //调用了exec后若获得true值,将会执行setCompletion(NORMAL)动做.且该动做将在首次唤醒等待结果的线程. //此处return了false,将不去执行上述操做.详情参考上篇文章. compute(); return false; }
以上是CountedCompleter的签名,字段,构造器和核心的抽象方法compute,其实整个CountedCompleter就是在围着这点东西转,首先看一看与ForkJoinTask的结合.数组
显然,CountedCompleter简单重写了ForkJoinTask的exec方法简单调用抽象的compute方法并返回false,当出现异常时,流程不变,但当compute方式正常完成的状况,将不可能进行父类后续的设置完成和唤醒操做.所以它必须由CountedCompleter自定义的完成.安全
而CountedCompleter也确实暴露了一些公有函数,可是调用的时机却要用户继承它以后决定.咱们先来继续一些辅助源码并理解Completer的设计理念,稍后再来看它的完成方法.数据结构
//onCompletion勾子方法,默认空实现. //CountedCompleter在tryComplete方法中会在符合完成的第一个条件(无挂起任务)的状况下执行它. //complete方法也会对它有无条件地调用. //关于这两个方法稍后详述. //它的实现取决于要实现的操做,并行流中的一些ops会在此处进行一些中间结果处理,好比结果集的合并(reduce操做). public void onCompletion(CountedCompleter<?> caller) { } //重写ForkJoinTask中的方法.上篇源码分享文章中提过,在ForkJoinTask的setExceptionalCompletion会调用internalPropagateException //传递异常,并且是个空实现,而在CountedCompleter中实现了该方法,并在内部调用onExceptionalCompletion void internalPropagateException(Throwable ex) { CountedCompleter<?> a = this, s = a; //循环判断每个task是否要传递异常给它的completer //无方法体的while循环.道格大神的代码神迹. while (a.onExceptionalCompletion(ex, s) && //要传递给completer且具有completer且completer还不是完成态(正常或非正常) (a = (s = a).completer) != null && a.status >= 0 && //则令completer去记录异常完成,若记录成功则进入下一轮循环. a.recordExceptionalCompletion(ex) == EXCEPTIONAL) ; //由于onExceptionalCompletion固定返回true,若没有中间完成的任务,直到最后一个completer,也就是root, //root不具有completer,将中断循环. } //异常完成勾子方法. //按上一节的概念,当ForkJoinTask执行出错,即exec->compute出错时,最终会调到此勾子.或当手动completeExceptionally或cancel时. public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) { //直接返回true,显然也是一个供扩展的方法.返回true表明异常应该传递给this的completer. return true; } //返回completer public final CountedCompleter<?> getCompleter() { return completer; } //返回挂起任务数量. public final int getPendingCount() { return pending; } //设置挂起任务数量 public final void setPendingCount(int count) { pending = count; } //原子地为挂起任务数量添加delta public final void addToPendingCount(int delta) { U.getAndAddInt(this, PENDING, delta); } //原子地将当前挂起任务数量从expected更改到count public final boolean compareAndSetPendingCount(int expected, int count) { return U.compareAndSwapInt(this, PENDING, expected, count); } //将当前任务的挂起数量原子减至0. public final int decrementPendingCountUnlessZero() { int c; do {} while ((c = pending) != 0 && !U.compareAndSwapInt(this, PENDING, c, c - 1)); return c; } //返回root completer.逻辑很简单. public final CountedCompleter<?> getRoot() { CountedCompleter<?> a = this, p; while ((p = a.completer) != null) a = p; return a; }
以上是几个工具函数,逻辑也很简单,仅有一处可能留有疑问:完成态/异常态是如何传递的.多线程
如今你们应该理解为何ForkJoinTask要将internalPropagateException置为空实现了,显然,对于不一样方式的实现,确实须要不一样的传递行为.CountedCompleter保存了一个相似"栈结构"的任务链,虽然提早讲到栈底即为root任务(固然root在底部仍是顶部自己不重要),显然任何一个子任务出现了问题,与它关联的父任务的行为显然要有一个明确的由子类定义的规则.app
咱们看到在重写的internalPropagateException方法中,不停地判断当前任务是否要将异常信号传递给链上的下一个任务(on方法始终返回true,不要紧咱们能够在子类中重写),而后让未完成的completer去记录同一个异常ex.框架
那么问题来了,只要completer已完成过(正常完成过异常完成或取消),显然while循环中断,completer和它的后续completer将不会被处理(1).一样,若传递异常的任务自己就是另外一个或几个任务的completer,它的异常信息显然不会反向传递(2).
对于问题(1),显然若是后续的completer已出现过异常,必然也会走一遍一样的逻辑,传递给后面的completer,若是它正常完成,也必然要有相应向后传递的行为,不然没法解决(1),咱们接下来即论述相关方法.
对于问题(2),显然问题(1)中描述的状况与此有所交集,若是咱们创建了一个CountedCompleter任务,并在compute方法中大肆fork子任务入队,fork以后不等子任务完成,也不获取子任务的执行结果,直接将父任务setCompletion或者setExceptionalCompletion,子任务仍是会继续执行的.
为了便于理解,咱们继续来看与任务的完成有关的方法.
//尝试完成根任务或减小栈链下游的某一个completer的挂起数(包含它自身). public final void tryComplete() { //1.初始用a保存this,后续为当前操做任务,用s保存a. CountedCompleter<?> a = this, s = a; for (int c;;) { //2.第一次进入或在6形成竞态的某一次循环中,a(this或this的completer链中的某一个)的的挂起任务数为0,表明它挂起的任务都完成了. if ((c = a.pending) == 0) { //3.a的勾子方法,若已经运行过4,且判断条件为假未能到5并在下一次循环从新回到3的状况,a!=s且a是s的completer, //在对onCompletion重写时,能够根据this与参数是否相等进行判断,如并行流聚合时能够根据这个条件进行结果集的合并. a.onCompletion(s); //4.将a指向本身的completer,s指向原来的a. if ((a = (s = a).completer) == null) { //5.原来a的completer不存在,即a不是root,不须要再传递了,让root进行quietlyComplete并返回. //此时说明整条链上的competer挂起任务所有是0. s.quietlyComplete(); return; } //隐藏的7.当原a的completer存在(a不是root)的状况,继续对该complter判断挂起任务数或尝试减1,对下一个元素开启下一轮循环. } //6.对this的completer栈的某一次循环时发现了挂起任务数不为0的,则对该completer的挂起数减1, //表示它挂起的任务完成了一个,并返回.若在此时刚好出现了竞态,另外一条链上的任务抢先减一,则当前 //的a要进入下一循环,它可能会在2处判断经过,进入到链上的下一个completer的传播逻辑. else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; } } //基本等效于tryComplete,只是不执行onCompletion,tryComplete会在判断链上某个completer的挂起任务数是0当即执行onCompletion. public final void propagateCompletion() { CountedCompleter<?> a = this, s = a; for (int c;;) { if ((c = a.pending) == 0) { if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; } } //complete方法,逻辑简单,丝绝不考虑挂起数,直接执行当前task的几个完成方法,并尝试对completer进行tryComplete. //它不改变本身的挂起任务数,但会让completer对栈上的其余completer或自身尝试减小挂起数或完成root. public void complete(T rawResult) { CountedCompleter<?> p; setRawResult(rawResult);//使用参数设置为当前任务的结果,尽管它为空方法. onCompletion(this);//直接调用onCompletion勾子. quietlyComplete();//安静地将status置为NORMAL. if ((p = completer) != null) //本身不改变自身挂起数,也不尝试完成root,但让completer尝试去向下执行这些操做. p.tryComplete(); } //没办法单独理解这个方法名.官方注释是和nextComplete放置在循环中使用. public final CountedCompleter<?> firstComplete() { for (int c;;) { if ((c = pending) == 0) //1.当前task没有挂起任务数,则返回它. return this; else if (U.compareAndSwapInt(this, PENDING, c, c - 1)) //2.不然尝试减小一个挂起任务数并返回null.但当出现竞态时,可能致使未能进入2而在下一次循环进入1. return null; } } //结合前面的firstComplete互相理解,它会对当前任务判断是否有completer,有则对该completer进行firstComplete, //不然将当前任务安静完成并返回null. //故结果只能返回null或completer public final CountedCompleter<?> nextComplete() { CountedCompleter<?> p; if ((p = completer) != null) //有completer且completer已无挂起任务数,则返回completer, //有completer且completer有挂起任务数,则尝试对该任务数减一并返回null.出现竞态则可能返回该completer. return p.firstComplete(); else { //无completer,安静完成当前任务并返回null. quietlyComplete(); return null; } } //等同于getRoot().quietlyComplete() public final void quietlyCompleteRoot() { for (CountedCompleter<?> a = this, p;;) { if ((p = a.completer) == null) { a.quietlyComplete(); return; } a = p; } } //若是当前任务未完成,尝试去出栈执行,并处理至多给定数量的其余未处理任务,且对这些未处理任务 //来讲,当前任务处于它们的完成路径上(即这些任务是completer栈链的前置任务),实现特殊的工做窃取. public final void helpComplete(int maxTasks) { Thread t; ForkJoinWorkerThread wt; if (maxTasks > 0 && status >= 0) { if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //当前线程是ForkJoinWorkerThread,尝试执行当前任务并尝试从线程的工做队列中尝试帮助前置任务执行. (wt = (ForkJoinWorkerThread)t).pool. helpComplete(wt.workQueue, this, maxTasks); else //使用common池的externalHelpComplete方法. ForkJoinPool.common.externalHelpComplete(this, maxTasks); } }
上一段代码整体逻辑不难,有如下几点总结:
1.显然tryComplete方法在调用后的最终结果只有两个:本身或completer链前方的某一个completer的挂起任务数减1(1),本身或completer链前方某一个completer(root)的quietlyComplete被执行(2).简单来讲,就是让root进行quietlyComplete(链上每个挂起任务数都是0)或让链上的某一个completer减小一个挂起任务.
2.tryComplete方法只会对root进行quietlyComplete,进而setComplete(NORMAL),对于链上的其余任务,最多会帮助挂起数减一,而不会把它们置为完成态,可是线程池在执行任务时,或者直接对一个链上的completer进行invoke,doExec甚至get等操做时,这些方法会将该中间completer进行setComplete.
3.每个CountedCompleter均可能有本身的completer栈链,每个CountedCompleter也能够位于其余CountedCompleter的栈链上且上游不惟一而下游惟一一(倒树形),任何一条栈链只能有一个root,root的completer为null.
4.从tryComplete方法来看正常运行状况下的规则,每个CountedCompleter的tryComplete只能向前影响到链上的另外一个completer,由于实现数量的增长方法有好几处,用户在实现时,随时可能将一些completer的数量设置成任意的数,故能够出现前面tryComplete注释中隐藏的7的状况,即存在一个completer,它的下一个completer的挂起数是0,它却能将下下个completer安静完成或将其挂起数减一,即跨无挂起数节点传递.
5.前面列出的helpComplete方法是CountedCompleter的特殊工做窃取方法(或者也不能叫做窃取,由于非common池状况窃取的是本身线程的任务,common池则依赖于一个探测值),具体的窃取细节在ForkJoinPool中,将在后面的文章中论述,但简单的逻辑已经在注释中描述清楚,把它归到这一块,也是由于它与前面描述的逻辑有所纠葛.124提到了tryComplete的向前影响结果,而在实际的应用中,咱们可能会有各类各样的情景,ForkJoin框架没法阻止咱们对ForkJoinTask的exec函数进行任意式的扩展,也没法阻止咱们对CountedCompleter的compute任意扩展,那么如何在咱们任意拓展的情景下保持效率和健壮?好比下面这个使用场景:
a.创建一种ForkJoinTask,直接继承CountedCompleter并重写compute方法,则它能够运行在ForkJoinPool中.
b.咱们接下来在compute方法中屡次根据计算结果集的大小进行拆分并递归fork子任务入池,父任务成为子任务的completer,同时compute方法自身也负责不可拆分的计算逻辑,并在自身这一块计算结束后,可能等待全部fork入池的子任务结束,也可能不等待子任务,直接结束父任务,让线程空出来作其余的事.
c.全部子任务结束后,使用一个合并函数合并子任务的结果集和自身的结果,并做为最终的结果.而后tryComplete(若是b中使用了join,或者判断当前任务是root).
显然,b中fork出的子任务,也一样要执行bc的逻辑.那么可能出现这样的状况:
不一样的父任务子任务在ForkJoinPool最初始压入当前工做线程的队列中,但随时可能被其余工做线程甚至外部线程偷去执行.
父任务抢先抢得运行资源,运行完本身计算的部分,而入池的子任务及子孙任务有大量未完成.
难道父任务的执行线程就这样干等?在前一篇文章中说过,ForkJoin框架适宜多计算,轻io,轻阻塞的状况,且自己就是为了不线程忙的忙死饿的饿死,所以每一个任务等待子任务执行结束是不可取的,这或许也是为何有了ForkJoinTask,却还要有CountedCompleter的缘由之一吧.
若咱们在任何每个任务中只是单纯地将该分出去的子任务fork入池并执行本身那一部分,并不让当前线程join子任务呢?(事实上不join子任务刚好能够将当前线程的资源腾出来作其余的事)
因此,除了前面5中提到的若干种(124)向前影响completer栈链的挂起数或root的完成态,还须要一个能向栈链后方有所影响的操做,好比帮助子任务的完成,毕竟子任务也是b中fork出来且由本身入队的.
helpComplete方法就能够作到这一点,它在ForkJoinPool中,它仅应在当前任务未完成时使用,首先它会尝试将当前任务从出队列并执行(ForkJoinPool::popCC及成功后续doExec,LIFO),出队失败则表示正在被执行甚至被偷去执行.出队这一步以后,再尝试本身的线程工做队列中找出本身的子孙任务(FIFO)并进行执行(ForkJoinPool::pollAndExecCC).
而若执行完某个父任务的工做线程必然会调用tryComplete等有关方法,将自身或栈链后方的某一个completer的挂起数减一,甚至由于一些不合理的api使用(如直接更改了后方某个任务的挂起数量)而直接终止了root,将root任务标记成完成态.(注意前面强调的"运行完本身计算的部分",这就是否认本句话的关键了,前面也说明"helpComplete仅在当前任务未完成时使用",显然,完成了本身负责的计算内容并不表明当前任务完成了,由于它的子任务尚未完成,所以它不会调用tryComplete,而且能够去帮助子任务)
同时,执行完父任务负责的计算内容的任务线程也会去找它栈链后方的其余任务,按照b的逻辑,这将是它的子任务,帮助它们完成,每完成一个子任务(子任务无子任务,再也不help的状况),会进行tryComplete传递一次.
余下的方法很简单.
//重写自ForkJoinTask的结果,前文也说过CountedCompleter也不维护result,返回null. //但并行流或者一些其余并行操做能够实现此结果,好比ConcurrentHashMap中支持的map reduce操做. public T getRawResult() { return null; } //同上,默认空,一些子类会有特别的实现. protected void setRawResult(T t) { }
显然,completer栈链上的全部任务是能够并行执行的,且每个完成均可以向后tryComplete一次,并在其后能够帮助前面的任务完成,而咱们若实现上述两个方法,彻底能够将自身运算的结果设置进去,在root被安静完成后,ForkJoinTask将能够get到结果(或join也将返回结果),可在此时合并计算结果,有些结果显然是能够并行的.
一些操做,好比find类型,任何一个子任务完成了find,就能够直接让root结束,而后直接让整条栈链上的任务cancelIgnoringExceptions.
一些须要聚合每个任务结果的操做,好比reduce类型,须要每一个父任务根据子任务的结果去reduce,它的父任务再根据他和兄弟任务的结果reduce,最终合并到root.显然,mapper由子任务实现,reducer由父任务实现.
一些接近find或reduce类型(或者说find的变种),好比filter,每个任务都会有结果,这个结果多是本身负责的原集中的一部分子集,也可能就是个空集,父任务合并每一个子任务的结果集,直到root.
排序类型的操做,如使用归并排序,显然每一个父任务便是divider也是merger,分解出的每一个子集交给子任务去计算,父任务再去负责merge.
......
以上是ForkJoinTask的抽象子类CountedCompleter的源码分析,接下来咱们继续分析工做线程.
只要对java的线程结构稍有了解,ForkJoinWorkerThread的源码十分简单,且前面提过,ForkJoinTask被声称是一个轻量于普通线程和Future的实体,而它在ForkJoinPool中的运行载体即是ForkJoinWorkerThread,这个轻量究竟体如今何处?
//类签名,直接继承自Thread public class ForkJoinWorkerThread extends Thread { //每一个ForkJoinWorkerThread都只能属于一个线程池,且保存该池的引用. final ForkJoinPool pool; //每一个ForkJoinWorkerThread都有一个工做队列, 显然队列中的任务就是该线程干活的最小单位了.它也是工做窃取机制的核心. final ForkJoinPool.WorkQueue workQueue; //构造函数,建立时指定线程池. protected ForkJoinWorkerThread(ForkJoinPool pool) { // 线程名称 super("aForkJoinWorkerThread"); this.pool = pool; //将工做线程注册到ForkJoinPool后会返回一个工做队列,供当前线程使用和供其余线程偷取. this.workQueue = pool.registerWorker(this); } //带线程组的构造器 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc) { super(threadGroup, null, "aForkJoinWorkerThread"); //inheritedAccessControlContext是从Thread继承下来的,字面意思是继承的访问控制上下文,设置为acc. U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc); //注册入池以前,清除掉本地化信息 eraseThreadLocals(); this.pool = pool; this.workQueue = pool.registerWorker(this); }
//返回注册的池.
public ForkJoinPool getPool() { return pool; } //返回当前线程工做队列在池中的索引,每一个队列都会维护一个在池中的索引. public int getPoolIndex() { return workQueue.getPoolIndex(); } /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must * invoke {@code super.onStart()} at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts * processing tasks. */ //空函数,可交给子类实现,按照官方注释,它的做用是在构造以后(这个构造不是指new出线程对象, //而是在run方法已进入的时候,说明"构造"是指线程已经完成了建立可以正常运行),处理任务以前. protected void onStart() { } //工做线程终止时的勾子方法,负责执行一些有关的清理操做.可是若要重写它,必须在方法的 //最后调用super.onTermination.参数exception是形成该线程终止的异常.如果正常结束, //则它是null. protected void onTermination(Throwable exception) { } //核心方法. public void run() { //doug在这一块标注"只运行一次",查看ForkJoinPool的源码, //ForkJoinPool中会有一个WorkQueue的数组,在取消线程的注册后, //本线程关联的WorkQueue会从该数组移除,但WorkQueue中的array不会置空. if (workQueue.array == null) { Throwable exception = null; try { //前面说过的预先操做 onStart(); //用线程池的runWorker方法执行,传入队列. pool.runWorker(workQueue); } catch (Throwable ex) { //发生异常,中断前记录下来 exception = ex; } finally { try { //将记录下来的异常调用勾子方法. onTermination(exception); } catch (Throwable ex) { if (exception == null) //执行勾子方法自己出现了异常,记录下来 exception = ex; } finally { //调用线程池的解除注册方法,会将本线程的WorkQueue从数组中移除,同时使用上述异常. pool.deregisterWorker(this, exception); } } } } //擦除本地变量.把当前线程的两个ThreadLocalMap所有置空 final void eraseThreadLocals() { U.putObject(this, THREADLOCALS, null); U.putObject(this, INHERITABLETHREADLOCALS, null); } //每正常运行完一次顶级task,就调用一次它.这个顶级任务自带易误解天性,其实能够理解为每一次从队列取出的任务. void afterTopLevelExec() { } //自带子类.它不具有任何特殊权限,也不是用户定义的任何线程组的成员,每次运行完一个顶级任务, //则擦除本地化变量. static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { //自已建立默认线程组. private static final ThreadGroup innocuousThreadGroup = createThreadGroup(); //访问控制上下文支持权限. private static final AccessControlContext INNOCUOUS_ACC = new AccessControlContext( new ProtectionDomain[] { new ProtectionDomain(null, null) }); //构造函数. InnocuousForkJoinWorkerThread(ForkJoinPool pool) { super(pool, innocuousThreadGroup, INNOCUOUS_ACC); } @Override void afterTopLevelExec() { //在每一次从队列取出的"顶级"任务运行后即擦除本地化变量. eraseThreadLocals(); } @Override public ClassLoader getContextClassLoader() { //若是获取线程上下文类加载器,永远直接返回系统类加载器. return ClassLoader.getSystemClassLoader(); } //尝试对未捕获异常处理器的设置,忽略. @Override public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } //禁止直接设置线程的上下文类加载器. @Override public void setContextClassLoader(ClassLoader cl) { throw new SecurityException("setContextClassLoader"); } //建立一个以顶级线程组为父的线程组. private static ThreadGroup createThreadGroup() { try { sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; Class<?> gk = ThreadGroup.class; long tg = u.objectFieldOffset(tk.getDeclaredField("group")); long gp = u.objectFieldOffset(gk.getDeclaredField("parent")); //当前线程的所属组. ThreadGroup group = (ThreadGroup) u.getObject(Thread.currentThread(), tg); //循环条件,当前线程的所属组不是null while (group != null) { //不停地循环向上取parent ThreadGroup parent = (ThreadGroup)u.getObject(group, gp); if (parent == null) //发现无parent的线程组,说明是系统顶级线程组,用它当parent建立一个"无害"线程组返回. return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup"); //有parent,把它赋给group开启下一轮循环. group = parent; } } catch (Exception e) { //有异经常使用Error包装抛出. throw new Error(e); } //不能return就抛出Error. throw new Error("Cannot create ThreadGroup"); } }
以上是工做线程的代码,粗略总结一下它和普通线程的区别.
首先,它内部会维护一个工做队列,用它来实现任务调度和窃取.
其次,它提供了一些扩展,如每次顶层任务运行结束,清理ThreadLocal,这也是一种保护机制,避免同线程的本地化数据随之污染.但粗略去看ForkJoinPool的代码,发现它只是在每次从队列取出并运行完一个任务后清除,并称这个为"顶级循环",这倒也没错,但这个任务并不能称之为顶级任务,由于这里的任务类型是ForkJoinTask,不必定是CountedCompleter等明显标识了依赖关系的子类,因此父任务和子任务被塞进一个队列,即便未被窃取,只由当前线程执行,两次的本地化数据也是不一样的.
不过若是咱们在ForkJoinTask的exec方法中加入本地化,或在CountedCompleter中加入本地化,显然每个在今生成的子任务都会在相应的线程执行doExec时设置这些属性,并在执行结束后清除.
最后官方提供的默认子类,以及一些线程组,优先级,权限等做者也未深刻研究,可是咱们构建线程池的时候有一个参数就是"线程工厂",了解下它或许能对后续的ForkJoinPool源码阅读有所帮助.
接下来简述一个官方提供的案例,并以此聊一聊并行流.
第一节论述了CountedCompleter,显然它做为一个抽象类,只是定义了某一些环节,以及一些环节的子环节的组合过程,而具体的实现与使用它定义的api则由用户实现,它的源码中并没有使用(固然也能够看一些子类,但比较复杂),在CountedCompleter的源码注释中,道格大神提供了若干案例,这里举出两个来简要说明一下前面论述过的使用方式,也能够为下一节论述官方提供的子类(并行流api中)提供阅读基础.
第一个是并行的可窃取的分治查找算法.
@Test public void testDivideSearch(){ Integer[] array = new Integer[10000000]; for(int i = 0; i < array.length; i++){ array[i] = i+1; } AtomicReference<Integer> result = new AtomicReference<>(); Integer find = new Searcher<>(null, array, result, 0, array.length - 1,this::match).invoke(); LOGGER.info("查找结束,任务返回:{},result:{}",find,result.get()); } static class Searcher<E> extends CountedCompleter<E> { final E[] array; final AtomicReference<E> result; final int lo, hi; final Function<E,Boolean> matcher; Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi,Function<E,Boolean> matcher){ super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; this.matcher = matcher; } @Override public void compute() { int l = this.lo;int h = this.hi; while(result.get() == null && h >= l){ if(h - l >=2){ int mid = (l + h)>>>1; //添加挂起任务数量,这样当出现tryComplete时能够触发root的结束(未查到) addToPendingCount(1); new Searcher<E>(this,array,result,mid,h,matcher).fork(); h = mid; }else{ E x = array[l]; if(matcher.apply(x) && result.compareAndSet(null,x)){ super.quietlyCompleteRoot(); } break; } } //当前未有任何一个线程查到结果,当前任务也完成了子集查找,减小一个挂起数量,若挂起数已减至0则终止. if(null == result.get()) tryComplete(); } } private boolean match(Integer x) { return x > 2000000 && x%2 ==0 && x%3 == 0 && x%5 ==0 && x %7 ==0; }
该案例的逻辑很简单,给定一个很是大的数组,充分利用本机的资源去查找知足一个条件的元素.为了方便,在具体的查找数据上选定了整型,查找的条件也很是简单.
在该案例中,会对结果进行分治,首先分治出足够多的子任务,剩下的不需再分的父任务由当前线程完成,子任务则压入工做队列,其余空闲的线程就会来偷取子任务并执行.当有任务一个子任务查找到相应的数字后,即将它存放到result,并安静地完成根任务.
此时整个任务链处在一个很是尴尬的状况:查找到结果的子任务将root设置为完成,而整条链上的非root任务均未完成.但因循环条件不知足,退出了循环.此时查到result已有值,并不执行最后的tryComplete,执行结束,任务的status依旧为未完成,是否有重复执行的问题?
答案是没有问题,由于ForkJoinTask绝对会在ForkJoinPool中调度(哪怕是common池),在common池中,任务执行前必须出队,尽管compute方法在本例中没有将这些任务设置为完成,但任务不会被二次执行.可见,上一章中费大力介绍的status字段也有无用的时候.
可是除了root任务须要使用到获取结果的功能,须要保证status是负数,它产生的子孙任务还有什么用呢?全部compute方法会由于循环停止而结束,此后的这些任务不存在任何外部引用,会被gc清理,即便存在外部引用,用它去获取子孙任务的执行状况或result也没有任何意义.
显然这个案例解决了至少两个疑问,一是怎么实现一个保存result的ForkJoinTask,二是ForkJoin框架如何在查找方面大幅提高性能,很明显,相比单线程遍历的办法,此例多线程查询,且任何一个子任务在并行条件下完成了查询,整个大任务都可以终止.
第二个是传说中的map reduce.大数据中常使用此概念(跨节点).
在并行流中,map能够表明非阻断操做,reduce能够表明阻断操做,可是reduce一样能够并行地执行.
道格在注释上给出了两个map reduce案例,咱们只看第一个,它也是后续并行流一节咱们要看的例子比较相近的解法.方法二有些绕,较难理解,但也优雅.
@Test public void testMapReduce() { Integer[] array = {1, 2, 3}; //方法一. Integer result = new MapRed<>(null, array, (a)->a+2, (a,b)->a+b, 0,array.length).invoke(); LOGGER.info("方法一result:{}",result); //方法二我就不抄了,就在官方注释上. result = new MapReducer<>(null, array, (a) -> a + 1 , (a, b) -> a + b, 0, array.length, null).invoke(); LOGGER.info("方法二result:{}", result); } /** * 第一种map reduce方式,很好理解. * @param <E> */ private class MapRed<E> extends CountedCompleter<E> { final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapRed<E> sibling;//兄弟节点的引用 E result; MapRed(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapRed<E> left = new MapRed(this, array, mapper, reducer, lo, mid); MapRed<E> right = new MapRed(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; //只挂起右任务 setPendingCount(1); right.fork(); //直接运算左任务. left.compute(); } else { if (hi > lo) result = mapper.apply(array[lo]); //它会依次调用onCompletion.而且是本身调本身或completer调子, //且只有左右两个子后完成的能调成功(父任务的挂起数达到0). tryComplete(); } } public void onCompletion(CountedCompleter<?> caller) { //忽略本身调本身. if (caller != this) { //参数是子任务. MapRed<E> child = (MapRed<E>) caller; MapRed<E> sib = child.sibling; //设置父的result. if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } } //mapper和reducer简单的不能再简单. @FunctionalInterface private static interface MyMapper<E> { E apply(E e); } @FunctionalInterface private static interface MyReducer<E> { E apply(E a, E b); }
上面的逻辑也很简单,首先就是对任务的分解,简单的将任务分为左和右,左直接由父任务执行(可能再分),右则入池,全部子任务直到不能再分(叶子任务)以map为result,每一个叶子任务完成后会调用tryComplete.
这个动做会触发一系列的completer栈元素的挂起数降低或完成,显然,若是把completer理解为一个普通树(这是做者不多见到的非二叉树的状况,尽管这个例子写成了二叉树,咱们彻底能够在compute中将父任务一分为多,而不是限2个),从叶子节点开始,每一个叶子节点完成(result是mapper的结果)会尝试onCompletion并减小父节点的挂起任务数,但只有同父节点的最后一个兄弟节点能够进入onCompletion设置父节点的结果,而且因为这个设置过程的前提是父节点符合挂起任务数为0,所以符合循环继续的条件,叶子节点的动做会继续向上判断父节点的父节点,直到root为止.假设线程数量足够,保证每一个子任务都有一个线程处理,那么深度每上一层,就会有一半(非二叉树的状况每一个父节点只能有一个经过)的执行叶子节点任务的线程因不符合某个任务的挂起数量为0的条件而退出,这样逐级传导,最后到root调用它最后一个子节点的onCompletion,使用reducer进行合并.
本例中进行结果合并的写法(onCompletion)只适合二叉树,有兴趣的读者能够看看道格在注释中给出的第二种写法,几叉均可以.并且该实现很优雅,并未写onCompletion函数,可是写法真心够绕的.
在JAVA8中支持了lamda表达式的同时,也支持了函数式编程,由此出现了一种新型的计算方式:流式计算,也出现了一种让包括做者在内不少人兴奋不已的编程方式:响应式编程.
流式计算的核心在于Stream api,流有不少分类,好比并行流和串行流,这点能够顾名思义,一样的,流中的每个操做均可以划分类型,好比阻断操做和非阻断操做.
java中实现并行流就是基于这些操做,CountedCompleter的一些子类就是这些操做的类型,显然,如在前一篇文章所说,使用了并行流,就是使用了ForkJoin框架.
当咱们使用下面的代码,会发生什么操做?
Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get(); //map只是将动做简单地记了下来,包装起来,等到阻断操做时才会真正执行. 位于ReferencePipeline public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper);//非空检查 //返回一个无状态操做. return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { //典型的适配器模式.将action一概封装为Sink. return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } //阻断操做reduce位于 ReferencePipeline public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { return evaluate(ReduceOps.makeRef(accumulator)); } //AbstractPipeline final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } //TerminalOp阻断操做接口的默认方法 default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default"); return evaluateSequential(helper, spliterator); } //看ReduceOps 它返回了一内部类ReduceTask public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask<>(this, helper, spliterator).invoke().get(); } //内部类ReduceTask间接继承自CountedCompleter private static final class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { private final ReduceOp<P_OUT, R, S> op; ReduceTask(ReduceOp<P_OUT, R, S> op, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator) { super(helper, spliterator); this.op = op; } ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) { super(parent, spliterator); this.op = parent.op; } //老外起的名子,造小孩. @Override protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { //和上面的例子很是类似的代码,只是封装更好. return new ReduceTask<>(this, spliterator); } @Override protected S doLeaf() { //叶子节点作这个. return helper.wrapAndCopyInto(op.makeSink(), spliterator); } //重写了前面提过的onCompletion函数 @Override public void onCompletion(CountedCompleter<?> caller) { if (!isLeaf()) { //不是叶子节点.这条件,和前面我们分析的多么匹配. //计算左结果 S leftResult = leftChild.getLocalResult(); //联合右结果. leftResult.combine(rightChild.getLocalResult()); //联合完的结果就是当前completer的结果. setLocalResult(leftResult); } // 直接父类是AbstractTask,它会对父,左右子帮助gc. super.onCompletion(caller); } } //AbstractTask帮助gc public void onCompletion(CountedCompleter<?> caller) { spliterator = null; leftChild = rightChild = null; } //更多实现细节自阅...
显然,并行流(至少我举的这个例子)是基于ForkJoin框架的.分治的思想与前面道格的例子类似,只是更加优雅和封装更好.有了前面的基础,若要详细熟悉并行流原理,须要进一步了解的只有他们的继承树,分割聚合组件等边角料,核心的调度思想已经再也不是困难.
回到问题,当咱们使用并行流时发生了什么?首先是非阻断操做时,与串行流状况一样,也是先将action封装成适配器,仅在阻断操做发生时的调度不一样,并行流在阻断操做下使用ForkJoin框架进行调度,任务的分割则使用它的Splitor,结果的合并也有它的Combiner.其余的流程与上面的案例无异.
1.CountedCompleter使用普通树的结构存放动做,可是它又是另类的树,由于子节点能找到父节点,父节点却找不到子节点,而只知道子节点表明的动做未执行的数量,所以或许从访问方式的角度来看仍是用栈来理解更好.在这里树既是数据结构,也是一个另类的操做栈.只从一个completer往下看,它是个栈,但从父节点的角度来说,它是一个访问不到子节点的普通树(或许咱们不该该强行为它套上一个数据结构,否则总以为不三不四,可是用树这个形状便于理解).每一个节点会存放挂起任务数量,每一个节点的任务完成未必会设置它本身的完成态,但会尝试将completer父元素栈(或者树的一条线)上的每一个任务挂起数量减一或将根节点安静置为完成态.关于具体的理解和代码实现,以及如何保存一个任务的运行结果,能够参考前面案例的章节,也能够以此为基础去看并行流的源码,但也要相应的理解并行流为了便捷实现而提供的各类分割合并组件.
2.ForkJoinWorkerThread是运行在ForkJoinPool中的主要线程,它内部维护了一个工做任务队列,并存放了该队列在线程池中的间接索引.借此实现任务的窃取,避免过于空闲等待,任务fork会直接push到该队列,第一次扩容时,才给该队列初始化任务数组,当线程从池中卸载时,不会清除掉该数组,这样线程没法再次启动.线程的启动有一些勾子,官方提供的线程工厂有两个,一个直接建立ForkJoinWorkerThread,另外一个建立它的子类
InnocuousForkJoinWorkerThread,它除了一些安全策略外,最大的区别在于ForkJoinWorkerThread在注册入池前进行本地化数据的清理,而它则每次完成一个主任务处理就清理一次.
3.并行流是ForkJoin框架的一个典型应用,JAVA8 Stream api中的并行流定义了大量的以CountedCompleter为基础的操做.利用分割/合并和周边组件实现了基于ForkJoin框架的并行计算调度.