根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增长这5个工具类实现作简要分析。java
0. JDK中ForkJoin实现概述数组
在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:并发
ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工做窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。须要和线程紧密配合。框架
而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程本身的队列,同一个任务fork()操做原则上会添加到同一个线程队列中。而这个线程类须要和ForkJoinPool紧密合做,有指向对应ForkJoinPool对象的引用。less
ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操做。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。dom
两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不一样。工具
1. ForkJoinPoolthis
ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码咱们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。线程
1.1. 构建初始化code
咱们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,一般咱们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。咱们再来看3个参数的构造方法实现。其中:
1.2. 任务提交
前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来讲也是同样有效的。
须要说明的是,除了增长支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象作了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); return task; }
咱们接下来再看下任务提交中被调用到的forkOrSubmit()方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else addSubmission(task); }
逻辑很容易理解,先判断ForkJoinPool的状态,若已中止,则抛异常返回。以后若是当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,不然将新的任务放入ForkJoinPool的提交队列中,并通知线程工做。
1.3. 线程的启动和工做
前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一块儿的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。
咱们来看一下work()方法的实现。
final void work(ForkJoinWorkerThread w) { boolean swept = false; // true on empty scans long c; while (!w.terminate && (int)(c = ctl) >= 0) { int a; // active count if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) swept = scan(w, a); else if (tryAwaitWork(w, c)) swept = false; } }
里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,若是可以根据scan()方法获得任务,并执行,不然进入阻塞状态。
咱们来看一下scan()方法的实现。
private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; if (d != 0) signalWork(); // propagate if nonempty w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } }
看起来很复杂,实际的原理则很简单,就是先尝试作任务窃取( Work Stealing ),若是不知足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程自己也维护了线程内fork和join任务操做获得的队列,结合起来,整体执行任务的顺序就是:
1.4. ForkJoinPool的其它属性
除了上述提到的操做,ForkJoin中还维护了
等数据属性。
2. ForkJoinWorkerThread
ForkJoinWorkerThread扩展于Thread类,但提供了不少支持ForkJoin的特性。
上文在介绍ForkJoinPool的时候已经对这个类作了不少描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一块儿才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。
除了上面提到的之外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:
3. ForkJoinTask及两个抽象子类
ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要知足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。
对于fork(),这个也许你们都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现以下:
public final ForkJoinTask fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }
须要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。
咱们再来看看对应的join()实现:
public final V join() { if (doJoin() != NORMAL) return reportResult(); else return getRawResult(); }
显然,它有调用了doJoin()方法,咱们再来深刻了解下。
private int doJoin() { Thread t; ForkJoinWorkerThread w; int s; boolean completed; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { if ((s = status) < 0) return s; if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) return setCompletion(NORMAL); } return w.joinTask(this); } else return externalAwaitDone(); }
大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,不然当前线程调用wait()阻塞等待。更深刻的理解可续继续查阅源码。
最后,咱们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。
在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要作的事情,其实是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。
实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,须要开发者给出。
而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的状况不一样。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。