虽然目前处理器核心数已经发展到很大数目,可是按任务并发处理并不能彻底充分的利用处理器资源,由于通常的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分红多个单元,每一个单元分别获得执行,最后合并每一个单元的结果。javascript
Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每一个小任务结果后获得大任务结果的框架java
指的是某个线程从其余队列里窃取任务来执行。使用的场景是一个大任务拆分红多个小任务,为了减小线程间的竞争,把这些子任务分别放到不一样的队列中,而且每一个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。可是会出现这样一种状况:A线程处理完了本身队列的任务,B线程的队列里还有不少任务要处理。A是一个很热情的线程,想过去帮忙,可是若是两个线程访问同一个队列,会产生竞争,因此A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感受A线程像是小偷在窃取B线程的东西同样。算法
工做窃取算法的优势:并发
利用了线程进行并行计算,减小了线程间的竞争。框架
工做窃取算法的缺点:异步
一、若是双端队列中只有一个任务时,线程间会存在竞争。ide
二、窃取算法消耗了更多的系统资源,如会建立多个线程和多个双端队列。this
Fork/Join中两个重要的类:线程
一、ForkJoinTask:使用该框架,须要建立一个ForkJoin任务,它提供在任务中执行fork和join操做的机制。通常状况下,咱们并不须要直接继承ForkJoinTask类,只须要继承它的子类,它的子类有两个:设计
a、RecursiveAction:用于没有返回结果的任务。
b、RecursiveTask:用于有返回结果的任务。
二、ForkJoinPool:任务ForkJoinTask须要经过ForkJoinPool来执行。
package test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; //阈值 private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //判断任务是否足够小 boolean canCompute = (end - start) <= THRESHOLD; if(canCompute) { //若是小于阈值,就进行运算 for(int i=start; i<=end; i++) { sum += i; } } else { //若是大于阈值,就再进行任务拆分 int middle = (start + end)/2; CountTask leftTask = new CountTask(start,middle); CountTask rightTask = new CountTask(middle+1,end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务执行完,并获得执行结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); //合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1,6); //执行一个任务 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
这个程序是将1+2+3+4+5+6拆分红1+2;3+4;5+6三个部分进行子程序进行计算后合并。
一、leftTask.fork();
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
fork方法内部会先判断当前线程是不是ForkJoinWorkerThread的实例,若是知足条件,则将task任务push到当前线程所维护的双端队列中。
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
在push方法中,会调用ForkJoinPool的signalWork方法唤醒或建立一个工做线程来异步执行该task任务。
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
经过doJoin方法返回的任务状态来判断,若是不是NORMAL,则抛异常:
private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); }
来看下doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
先查看任务状态,若是已经完成,则直接返回任务状态;若是没有完成,则从任务队列中取出任务并执行。
map
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final List<Integer> doubleNumbers = numbers.stream() .map(number -> number * 2) .collect(Collectors.toList());
结果:[2, 4, 6, 8]
也能够搞成其余的类型,初始List是Integer,也能够变成String
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final List<String> numberIndex = numbers.stream() .map(number -> "#" + number) .collect(Collectors.toList());
结果:[#1, #2, #3, #4]
reduce
1.不提供初始值的reduce,返回值是Optional,表示可能为空,使用orElseGet能够返回一个null时的默认值
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final Optional<Integer> sum = numbers.stream() .reduce((a, b) -> a + b); sum.orElseGet(() -> 0);
结果:10
这里的(a, b) -> a + b的类型实际上是BinaryOperator,它接受两个类型相同的参数
当把numbers改成Arrays.asList()时,结果为0。
2.使用初始值的reduce,由于提供了初始值,因此返回值再也不是Optional
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final Integer sum = numbers.stream() .reduce(0, (a, b) -> a + b);
结果:10