本文首发于一世流云的专栏: https://segmentfault.com/blog...
前一章——Fork/Join框架(1) 原理,咱们从总体上对Fork/Join框架做了介绍。java
回顾一下,Fork/Join框架的核心实现类是ForkJoinPool线程池,其它核心组件包括:ForkJoinTask(任务)、ForkJoinWorkerThread(工做线程)、WorkQueue(任务队列)。segmentfault
这一章,咱们将深刻F/J框架的实现细节,看看ForkJoinPool线程池究竟有何特殊之处,F/J框架的整个任务调度流程又是怎样的。数组
在开始以前,先来看下下面这张图:app
上图包含了F/J框架的整个任务调度流程,这里先简要介绍下,以便读者在有个印象,后续的源码分析将彻底按照这张图进行。框架
F/J框架调度任务的流程一共能够分为四大部分。dom
任务提交是整个调度流程的第一步,F/J框架所调度的任务来源有两种:async
①外部提交任务ide
所谓外部提交任务,是指经过ForkJoinPool的execute
/submit
/invoke
方法提交的任务,或者非工做线程(ForkJoinWorkerThread)直接调用ForkJoinTask的fork
/invoke
方法提交的任务:函数
外部提交的任务的特色就是调用线程是非工做线程。这个过程涉及如下方法:源码分析
ForkJoinPool.submit
ForkJoinPool.invoke
ForkJoinPool.execute
ForkJoinTask.fork
ForkJoinTask.invoke
ForkJoinPool.externalPush
ForkJoinPool.externalSubmit
②工做线程fork任务
所谓工做线程fork任务,是指由ForkJoinPool所维护的工做线程(ForkJoinWorkerThread)从自身任务队列中获取任务(或从其它任务队列窃取),而后执行任务。
工做线程fork任务的特色就是调用线程是工做线程。这个过程涉及如下方法:
ForkJoinTask.doExec
WorkQueue.push
任务提交完成后,ForkJoinPool会根据状况建立或唤醒工做线程,以便执行任务。
ForkJoinPool并不会为每一个任务都建立工做线程,而是根据实际状况(构造线程池时的参数)肯定是唤醒已有空闲工做线程,仍是新建工做线程。这个过程仍是涉及任务队列的绑定、工做线程的注销等过程:
ForkJoinPool.signalWork
ForkJoinPool.tryAddWorker
ForkJoinPool.createWorker
ForkJoinWorkerThread.registerWorker
ForkJoinPool.deregisterWorker
任务入队后,由工做线程开始执行,这个过程涉及任务窃取、工做线程等待等过程:
ForkJoinWorkerThread.run
ForkJoinPool.runWorker
ForkJoinPool.scan
ForkJoinPool.runTask
ForkJoinTask.doExec
ForkJoinPool.execLocalTasks
ForkJoinPool.awaitWork
任务结果通常经过ForkJoinTask的join
方法得到,其主要流程以下图:
任务结果获取的核心涉及两点:
ForkJoinPool.helpStealer
ForkJoinPool.tryCompensate
经过第二部分,大体了解了F/J框架调度任务的流程,咱们来看下源码实现。
①外部提交任务
咱们经过ForkJoinPool的submit(ForkJoinTask<T> task)
方法来看下这个过程(其它提交任务的方法内部调用几乎同样,再也不赘述):
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }
ForkJoinPool.submit
内部调用了externalPush
方法:
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; // m & r & SQMASK必为偶数,因此经过externalPush方法提交的任务都添加到了偶数索引的任务队列中(没有绑定的工做线程) if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) // 队列里只有一个任务 signalWork(ws, q); // 建立或激活一个工做线程 return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } // 未命中任务队列时(WorkQueue == null 或 WorkQueue[i] == null),会进入该方法 externalSubmit(task); }
当咱们首次建立了ForkJoinPool时,任务队列数组并无初始化,只有当首次提交任务时,才会初始化。
externalPush方法包含两部分:
/** * 完整版本的externalPush. * 处理线程池提交任务时未命中队列的状况和异常状况. */ private void externalSubmit(ForkJoinTask<?> task) { int r; // 线程相关的随机数 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (; ; ) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; // CASE1: 线程池已关闭 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } // CASE2: 初始化线程池 else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // 初始化工做队列数组, 数组大小必须为2的幂次 int p = config & SMASK; int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; // 线程池状态转化为STARTED } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } // CASE3: 入队任务 else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } // CASE4: 建立一个任务队列 else if (((rs = runState) & RSLOCK) == 0) { q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; // k为任务队列在队列数组中的索引: k == r & m & SQMASK, 在CASE3的IF判断中赋值 q.scanState = INACTIVE; // 任务队列状态为INACTIVE rs = lockRunState(); if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
externalSubmit方法的逻辑很清晰,一共分为4种状况:
②工做线程fork任务
工做线程fork的任务其实就是子任务,ForkJoinTask.fork
方法完成。
看下ForkJoinTask.fork
方法,当调用线程为工做线程时,直接添加到其自身队列中:
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) // 若是调用线程为【工做线程】 ((ForkJoinWorkerThread) t).workQueue.push(this); // 直接添加到线程的自身队列中 else ForkJoinPool.common.externalPush(this); // 外部(其它线程)提交的任务 return this; }
WorkQueue.push
方法,任务存入自身队列的栈顶(top
):
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); // 任务存入栈顶(top+1) if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); // 唤醒或建立一个工做线程 } else if (n >= m) growArray(); // 扩容 } }
若是当前 WorkQueue 为新建的等待队列(top - base <= 1
),则调用signalWork
方法为当前 WorkQueue 新建或唤醒一个工做线程;
若是 WorkQueue 中的任务数组容量太小,则调用growArray
方法对其进行两倍扩容,
从流程图能够看出,任务提交后,会调用signalWork
方法建立或唤醒一个工做线程,该方法的核心其实就两个分支:
/** * 尝试建立或唤醒一个工做线程. * * @param ws 任务队列数组 * @param q 当前操做的任务队列WorkQueue */ final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active // CASE1: 工做线程数不足 if ((sp = (int) c) == 0) { if ((c & ADD_WORKER) != 0L) tryAddWorker(c); // 增长工做线程 break; } // CASE2: 存在空闲工做线程,则唤醒 if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } }
先来看建立工做线程的方法tryAddWorker
,其实就是设置下字段值(活跃/总工做线程池数),而后调用createWorker
真正建立一个工做线程:
private void tryAddWorker(long c) { boolean add = false; do { // 设置活跃工做线程数、总工做线程池数 long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; // 建立工做线程 if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int) c == 0); } private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { // 使用线程池工厂建立线程 if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); // 启动线程 return true; } } catch (Throwable rex) { ex = rex; } // 建立出现异常,则注销该工做线程 deregisterWorker(wt, ex); return false; }
若是建立过程当中出现异常,则调用deregisterWorker
注销线程:
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; // 1.移除workQueue if (wt != null && (w = wt.workQueue) != null) { // 获取ForkJoinWorkerThread的等待队列 WorkQueue[] ws; int idx = w.config & SMASK; // 计算workQueue索引 int rs = lockRunState(); // 获取runState锁和当前池运行状态 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; // 移除workQueue unlockRunState(rs, rs & ~RSLOCK); // 解除runState锁 } // 2.减小CTL数 long c; // decrement counts do { } while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); // 3.处理被移除workQueue内部相关参数 if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } // 4.若是线程未终止,替换被移除的workQueue并唤醒内部线程 for (; ; ) { // possibly replace WorkQueue[] ws; int m, sp; // 尝试终止线程池 if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; // 唤醒被替换的线程,依赖于下一步 if ((sp = (int) (c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } // 建立工做线程替换 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } // 5.处理异常 if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }
deregisterWorker方法用于工做线程运行完毕以后终止线程或处理工做线程异常,主要就是清除已关闭的工做线程或回滚建立线程以前的操做,并把传入的异常抛给 ForkJoinTask 来处理。
工做线程在构造的过程当中,会保存线程池信息和与本身绑定的任务队列信息。它经过ForkJoinPool.registerWorker
方法将本身注册到线程池中:
protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }
final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); // 建立一个工做队列, 并于该工做线程绑定 WorkQueue w = new WorkQueue(this, wt); int i = 0; // 记录队列在任务队列数组中的索引, 始终为奇数 int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // 经计算后, i为奇数 if (ws[i] != null) { // 槽冲突, 即WorkQueue[i]位置已经有了任务队列 // 从新计算索引i int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } // 设置队列状态为SCANNING w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
前文讲过,工做线程(Worker)自身的任务队列,其数组下标始终是奇数,registerWorker方法的主要做用就是在任务队列数组WorkQueue[]
找到一个空的奇数位,而后在该位置建立WorkQueue
。
至此,线程池的任务提交工做和工做线程建立工做就所有完成了,接下来开始工做线程的执行。
ForkJoinWorkerThread启动后,会执行它的run方法,该方法内部调用了ForkJoinPool.runWorker
方法来执行任务:
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); // 钩子方法 pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
runWorker方法的核心流程以下:
final void runWorker(WorkQueue w) { w.growArray(); // 初始化任务队列 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t; ; ) { // CASE1: 尝试获取一个任务 if ((t = scan(w, r)) != null) w.runTask(t); // 获取成功, 执行任务 // CASE2: 获取失败, 阻塞等待任务入队 else if (!awaitWork(w, r)) // 等待失败, 跳出该方法后, 工做线程会被注销 break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
注意:若是awaitWork
返回false,等不到任务,则跳出runWorker
的循环,回到run中执行finally,最后调用deregisterWorker
注销工做线程。
任务窃取——scan
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; // 根据随机数r定位一个任务队列 if ((q = ws[k]) != null) { // 获取workQueue if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && // 取base位置任务 q.base == b) { // 成功获取到任务 if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; // 更新base位 if (n < -1) signalWork(ws, q); // 建立或唤醒工做线程来运行任务 return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int) c], AC_UNIT); // 唤醒栈顶工做线程 } // base位置任务为空或base位置偏移,随机移位从新扫描 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } if ((k = (k + 1) & m) == origin) { // continue until stable // 运行到这里说明已经扫描了所有的 workQueues,但并未扫描到任务 if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; // 对当前WorkQueue进行灭活操做 int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int) c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
扫描并尝试偷取一个任务。随机选择一个WorkQueue,获取base
位的 ForkJoinTask,成功取到后更新base位并返回任务;若是取到的 WorkQueue 中任务数大于1,则调用signalWork
建立或唤醒其余工做线程。
阻塞等待——awaitWork
若是scan
方法未扫描到任务,会调用awaitWork
等待获取任务:
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss; ; ) { if ((ss = w.scanState) >= 0) // 正在扫描,跳出循环 break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) spins = SPINS; // continue spinning } } else if (w.qlock < 0) // 当前workQueue已经终止,返回false recheck after spins return false; else if (!Thread.interrupted()) { // 判断线程是否被中断,并清除中断状态 long c, prevctl, parkTime, deadline; int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); // 活跃线程数 if ((ac <= 0 && tryTerminate(false, false)) || // 无active线程,尝试终止 (runState & STOP) != 0) // pool terminating return false; if (ac <= 0 && ss == (int) c) { // is last waiter // 计算活跃线程数(高32位)并更新为下一个栈顶的scanState(低32位) prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short) (c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//总线程过量 return false; // else use timed wait // 计算空闲超时时间 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; // 设置parker,准备阻塞 if (w.scanState < 0 && ctl == c) // recheck before park U.park(false, parkTime); // 阻塞指定的时间 U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) // 正在扫描,说明等到任务,跳出循环 break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) // 未等到任务,更新ctl,返回false return false; // shrink pool } } return true; }
任务执行——runTask
窃取到任务后,调用WorkQueue.runTask
方法执行任务:
final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); // 更新currentSteal并执行任务 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); // 依次执行本地任务 ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); // 增长偷取任务数 scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); // 执行钩子函数 } }
1.首先调用FutureTask.deExec()
执行任务,其内部会调用FutureTask.exec()
方法,该方法为抽象方法,由子类实现。
子类实现该方法时,通常会进行fork,致使生成子任务,并最终添加到调用线程自身地任务队列中:
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); // exec为抽象方法, 由子类实现 } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
2.除了执行窃取到的任务,工做线程还会执行本身队列中的任务,即WorkQueue.execLocalTasks
方法:
final void execLocalTasks() { int b = base, m, s; ForkJoinTask<?>[] a = array; if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { if ((config & FIFO_QUEUE) == 0) { // LIFO, 从top -> base 遍历执行任务 for (ForkJoinTask<?> t; ; ) { if ((t = (ForkJoinTask<?>) U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) break; U.putOrderedInt(this, QTOP, s); t.doExec(); if (base - (s = top - 1) > 0) break; } } else // FIFO, 从base -> top 遍历执行任务 pollAndExecAll(); } }
构建线程池时的asyncMode
参数,决定了工做线程执行自身队列中的任务的方式。若是asyncMode == true
,则以 FIFO的方式执行任务;不然,以 LIFO的方式执行任务。
ForkJoinTask.join()
能够用来获取任务的执行结果。join方法的执行逻辑以下:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
能够看到,内部先调用doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
doJoin方法会判断调用线程是不是工做线程:
1.若是是非工做线程调用的join,则最终调用externalAwaitDone()
阻塞等待任务的完成。
2.若是是工做线程调用的join,则存在如下状况:
关键看下ForkJoinPool.awaitJoin
等待过程当中发生了什么:
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { int s = 0; if (task != null && w != null) { ForkJoinTask<?> prevJoin = w.currentJoin; // 获取给定Worker的join任务 U.putOrderedObject(w, QCURRENTJOIN, task); // 把currentJoin替换为给定任务 // 判断是否为CountedCompleter类型的任务 CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>) task : null; for (; ; ) { if ((s = task.status) < 0) // 已经完成|取消|异常 跳出循环 break; if (cc != null) // CountedCompleter任务由helpComplete来完成join helpComplete(w, cc, 0); else if (w.base == w.top || w.tryRemoveAndExec(task)) //尝试执行 helpStealer(w, task); // 队列为空或执行失败,任务可能被偷,帮助偷取者执行该任务 if ((s = task.status) < 0) // 已经完成|取消|异常,跳出循环 break; // 计算任务等待时间 long ms, ns; if (deadline == 0L) ms = 0L; else if ((ns = deadline - System.nanoTime()) <= 0L) break; else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L; if (tryCompensate(w)) { // 执行补偿操做 task.internalWait(ms); // 补偿执行成功,任务等待指定时间 U.getAndAddLong(this, CTL, AC_UNIT); // 更新活跃线程数 } } U.putOrderedObject(w, QCURRENTJOIN, prevJoin); // 循环结束,替换为原来的join任务 } return s; }
ForkJoinPool.awaitJoin
方法中有三个重要方法:
这里说下这三个方法的主要做用,不贴代码了:
tryRemoveAndExec:
当工做线程正在等待join的任务时,它会从top位开始自旋向下查找该任务:
helpStealer:
tryCompensate:
tryCompensate主要用来补偿工做线程由于阻塞而致使的算力损失,当工做线程自身的队列不为空,且还有其它空闲工做线程时,若是本身阻塞了,则在此以前会唤醒一个工做线程。
本章和上一章——Fork/Join框架(1) 原理,从思想、使用、实现等方面较完整地分析了Fork/Join框架,Fork/Join框架的使用须要根据实际状况划分子任务的大小。
理解F/J框架须要先从总体上了解框架调度任务的流程(参考本章开头的调度图),能够本身经过示例模拟一个任务的调度过程,而后根据实际运用过程当中遇到的问题,再去调试及在相应的源码中查看实现原理。