虽然目前处理器核心数已经发展到很大数目,可是按任务并发处理并不能彻底充分的利用处理器资源,由于通常的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分红多个单元,每一个单元分别获得执行,最后合并每一个单元的结果。java
Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每一个小任务结果后获得大任务结果的框架。算法
指的是某个线程从其余队列里窃取任务来执行。使用的场景是一个大任务拆分红多个小任务,为了减小线程间的竞争,把这些子任务分别放到不一样的队列中,而且每一个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。可是会出现这样一种状况:A线程处理完了本身队列的任务,B线程的队列里还有不少任务要处理。A是一个很热情的线程,想过去帮忙,可是若是两个线程访问同一个队列,会产生竞争,因此A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感受A线程像是小偷在窃取B线程的东西同样。并发
工做窃取算法的优势:框架
利用了线程进行并行计算,减小了线程间的竞争。异步
工做窃取算法的缺点:ide
一、若是双端队列中只有一个任务时,线程间会存在竞争。this
二、窃取算法消耗了更多的系统资源,如会建立多个线程和多个双端队列。spa
Fork/Join中两个重要的类:线程
一、ForkJoinTask:使用该框架,须要建立一个ForkJoin任务,它提供在任务中执行fork和join操做的机制。通常状况下,咱们并不须要直接继承ForkJoinTask类,只须要继承它的子类,它的子类有两个:设计
a、RecursiveAction:用于没有返回结果的任务。
b、RecursiveTask:用于有返回结果的任务。
二、ForkJoinPool:任务ForkJoinTask须要经过ForkJoinPool来执行。
1 package test; 2 3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ForkJoinPool; 5 import java.util.concurrent.Future; 6 import java.util.concurrent.RecursiveTask; 7 8 9 public class CountTask extends RecursiveTask<Integer> 10 { 11 private static final long serialVersionUID = 1L; 12 //阈值 13 private static final int THRESHOLD = 2; 14 private int start; 15 private int end; 16 17 public CountTask(int start, int end) 18 { 19 this.start = start; 20 this.end = end; 21 } 22 23 @Override 24 protected Integer compute() 25 { 26 int sum = 0; 27 //判断任务是否足够小 28 boolean canCompute = (end - start) <= THRESHOLD; 29 if(canCompute) 30 { 31 //若是小于阈值,就进行运算 32 for(int i=start; i<=end; i++) 33 { 34 sum += i; 35 } 36 } 37 else 38 { 39 //若是大于阈值,就再进行任务拆分 40 int middle = (start + end)/2; 41 CountTask leftTask = new CountTask(start,middle); 42 CountTask rightTask = new CountTask(middle+1,end); 43 //执行子任务 44 leftTask.fork(); 45 rightTask.fork(); 46 //等待子任务执行完,并获得执行结果 47 int leftResult = leftTask.join(); 48 int rightResult = rightTask.join(); 49 //合并子任务 50 sum = leftResult + rightResult; 51 52 } 53 return sum; 54 } 55 56 public static void main(String[] args) 57 { 58 ForkJoinPool forkJoinPool = new ForkJoinPool(); 59 CountTask task = new CountTask(1,6); 60 //执行一个任务 61 Future<Integer> result = forkJoinPool.submit(task); 62 try 63 { 64 System.out.println(result.get()); 65 } 66 catch (InterruptedException e) 67 { 68 e.printStackTrace(); 69 } 70 catch (ExecutionException e) 71 { 72 e.printStackTrace(); 73 } 74 75 } 76 77 }
这个程序是将1+2+3+4+5+6拆分红1+2;3+4;5+6三个部分进行子程序进行计算后合并。
一、leftTask.fork();
1 public final ForkJoinTask<V> fork() { 2 Thread t; 3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 4 ((ForkJoinWorkerThread)t).workQueue.push(this); 5 else 6 ForkJoinPool.common.externalPush(this); 7 return this; 8 }
fork方法内部会先判断当前线程是不是ForkJoinWorkerThread的实例,若是知足条件,则将task任务push到当前线程所维护的双端队列中。
1 final void push(ForkJoinTask<?> task) { 2 ForkJoinTask<?>[] a; ForkJoinPool p; 3 int b = base, s = top, n; 4 if ((a = array) != null) { // ignore if queue removed 5 int m = a.length - 1; // fenced write for task visibility 6 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); 7 U.putOrderedInt(this, QTOP, s + 1); 8 if ((n = s - b) <= 1) { 9 if ((p = pool) != null) 10 p.signalWork(p.workQueues, this); 11 } 12 else if (n >= m) 13 growArray(); 14 } 15 }
在push方法中,会调用ForkJoinPool的signalWork方法唤醒或建立一个工做线程来异步执行该task任务。
二、
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
经过doJoin方法返回的任务状态来判断,若是不是NORMAL,则抛异常:
private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); }
来看下doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
先查看任务状态,若是已经完成,则直接返回任务状态;若是没有完成,则从任务队列中取出任务并执行。