JDK7引入了Fork/Join框架,所谓Fork/Join框架,我的解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果。这样就能使用多线程的方式来执行一个任务。数组
JDK7引入的Fork/Join有三个核心类:多线程
ForkJoinPool,执行任务的线程池并发
ForkJoinWorkerThread,执行任务的工做线程框架
ForkJoinTask,一个用于ForkJoinPool的任务抽象类。this
由于ForkJoinTask比较复杂,抽象方法比较多,平常使用时通常不会继承ForkJoinTask来实现自定义的任务,而是继承ForkJoinTask的两个子类:spa
RecursiveTask:子任务带返回结果时使用线程
RecursiveAction:子任务不带返回结果时使用code
对于Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework对象
在看了网上的不少例子以后,发如今自定义任务类实现compute方法的逻辑通常是这样的:blog
if 任务足够小 直接返回结果 else 分割成N个子任务 依次调用每一个子任务的fork方法执行子任务 依次调用每一个子任务的join方法合并执行结果
而执行该自定义任务的调用的则是ForkJoinPool的execute方法,所以首先来看的就是ForkJoinPool的execute方法,看看和普通线程池执行任务有什么不一样:
public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); }
所以forkOrSubmit是真正执行ForkJoinTask的方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else // 正常执行的时候是主线程调用的,所以关注addSubmission addSubmission(task); }
那么咱们首先要关注的是addSubmission方法,发觉所作的事情和普通线程池很相似,就是把任务加入到队列中,不一样的是直接使用Unsafe操做内存来添加任务对象
private void addSubmission(ForkJoinTask<?> t) { final ReentrantLock lock = this.submissionLock; lock.lock(); try { // 队列只是普通的数组而不是普通线程池的BlockingQueue, // 唤醒worker线程的工做由下面的signalWork来完成 // 使用Unsafe进行内存操做,把任务放置在数组中 ForkJoinTask<?>[] q; int s, m; if ((q = submissionQueue) != null) { long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if (s - queueBase == m) // 数组已满,为数组扩容 growSubmissionQueue(); } } finally { lock.unlock(); } // 通知有新任务来了:两种操做,有空闲线程则唤醒该线程 // 不然若是能够新建worker线程则为这个任务新建worker线程 // 若是不能够就返回了,等到有空闲线程来执行这个任务 signalWork(); }
接下来要弄清楚就是在compute中fork时,按道理来讲这个动做是和主任务在同一个线程中执行,fork是若是把子任务变成多线程执行的:
public final ForkJoinTask<V> fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }
在上面分析forkOrSubmit的时候一样见到了ForkJoinWorkerThread的pushTask方法调用,那么来看这个方法:
final void pushTask(ForkJoinTask<?> t) { // 代码的基本逻辑和ForkJoinPool的addSubmission方法基本一致 // 都是把任务加入了任务队列中,这里是加入到ForkJoinWorkerThread // 内置的任务队列中 ForkJoinTask<?>[] q; int s, m; if ((q = queue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; // or use putOrderedInt // 这里不太明白 if ((s -= queueBase) <= 2) pool.signalWork(); else if (s == m) growQueue(); } }
看到这里一会儿陷入了僵局,为何ForkJoinWorkerThread要内建一个队列呢,并且若是子任务仍旧在同一个线程内的话,何以实现并发执行子任务呢?下一篇文章继续。