前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,咱们来简单地看看咱们在开发中怎么对JDK提供的工具类进行应用,以提升咱们的需求处理效率。html
Fork Join这东西确实用好了能给咱们的任务处理提升效率,也为开发带来方便。但Fork Join不是那么容易用好的,咱们先来看几个例子(反例)。java
####1. 反例错误分析api
咱们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (由于是反例,就不提供超连接了,只以普通文本给出URL)数组
这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来讲这篇文章前面分析得仍是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,仍是贴下这个例子中具体的的代码吧。并发
public class Calculator extends RecursiveTask { private static final int THRESHOLD = 100; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if((start - end) < THRESHOLD){ for(int i = start; i< end;i++){ sum += i; } }else{ int middle = (start + end) /2; Calculator left = new Calculator(start, middle); Calculator right = new Calculator(middle + 1, end); // 下三行高亮, 哈` left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } }
咱们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。oracle
其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,因为第一个子任务不在队列尾而不能经过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join原本的作法是想经过子任务的合理划分,避免过多的阻塞状况出现。这样,这个例子中的操做就违背了Fork Join的初衷,每次子任务的迭代,线程都会由于第一个子任务的join()而阻塞,加大了代码运行的成本,提升了资源开销,不利于提升程序性能。ide
除此以外,这段程序仍是不能进入Fork Join的过程,由于还有一个低级错误。看下第1五、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将致使全部任务都将以循环累加的方式完成,而不会执行fork()和join()。工具
因而可知,Fork Join的使用仍是要注意对其自己的理解和对开发过程当中细节的把握的。咱们看下JDK中RecursiveAction和RecursiveTask这两个类。性能
####2. RecursiveAction分析及应用实例学习
这两个类都是继承了ForkJoinTask,自己给出的实现逻辑并很少不复杂,在JDK的类文件中,它的注释比源码还要多。咱们能够看下它的实现代码。
public abstract class RecursiveAction extends ForkJoinTask<Void> { private static final long serialVersionUID = 5232453952276485070L; protected abstract void compute(); public final Void getRawResult() { return null; } protected final void setRawResult(Void mustBeNull) { } protected final boolean exec() { compute(); return true; } }
咱们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是咱们使用Fork Join时须要本身实现的逻辑。
咱们能够看下API中给出的一个最简单最具体的例子:
class IncrementTask extends RecursiveAction { final long[] array; final int lo; final int hi; IncrementTask(long[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } protected void compute() { if (hi - lo < THRESHOLD) { for (int i = lo; i < hi; ++i) array[i]++; } else { int mid = (lo + hi) >>> 1; invokeAll(new IncrementTask(array, lo, mid), new IncrementTask(array, mid, hi)); } } }
大体的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操做。咱们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。咱们来看下继承自FutureTask的invokeAll()方法实现。很简单:
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { t2.fork(); t1.invoke(); t2.join(); }
对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,而后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(固然若是不能执行,就须要阻塞等待了)。
其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是同样的,咱们拿出一个通用一点的来看一下:
public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<?> t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); else if (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); else if (t.doJoin() < NORMAL && ex == null) ex = t.getException(); } } if (ex != null) UNSAFE.throwException(ex); }
咱们发现第一个子任务(i==0的状况)没有进行fork,而是直接执行,其他的通通先调用fork()放入任务队列,以后再逐一join()。其实咱们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会形成阻塞,而不能充分利用Fork Join的特色,也就不能保证任务执行的性能。
Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,因为代码较长,就不列在这里了。整体思路和上面是同样的,额外增长了动态阈值的判断,感兴趣的想深刻理解的能够到这里去参考一下。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html
####3. RecursiveTask简要说明
其实说完了RecursiveAction,RecursiveTask能够用“同理”来解释。实现代码也很简单:
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { private static final long serialVersionUID = 5232453952276485270L; V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
咱们看到惟一不一样的是返回结果的处理,其他均可以和RecursiveAction同样使用。
####4. Fork Join应用小结
Fork Join是为咱们提供了一个很是好的“分而治之”思想的实现平台,而且在必定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不彻底是通用的,对于可很好分解成子任务的场景,咱们能够对其进行应用,更多时候要考虑需求和应用场景,而且注意其使用要点才行。