在前面的三篇文章中前后介绍了ForkJoin框架的任务组件(ForkJoinTask体系,CountedCompleter体系)源码,并简单介绍了目前的并行流应用场景.ForkJoin框架本质上是对Executor-Runnable/Callable-Future/FutureTask的扩展,它依旧支持经典的Executor使用方式,即任务+池的配合,向池中提交任务,并异步地等待结果.java
毫无疑问,前面的文章已经解释了ForkJoin框架的新颖性,初步了解了工做窃取依托的数据结构,ForkJoinTask/CountedCompleter在执行期的行为,也提到它们必定要在ForkJoinPool中进行运行和调度,这也是本文力求解决的问题.算法
ForkJoinPool源码是ForkJoin框架中最复杂,最难理解的部分,且由于交叉依赖ForkJoinTask,CountedCompleter,ForkJoinWorkerThread,做者在前面单独用两篇文章分析了它们,之前两篇文章为基础,重复部分本文再也不详述.编程
首先看类签名.api
//禁止伪共享 @sun.misc.Contended //继承自AbstractExecutorService public class ForkJoinPool extends AbstractExecutorService
前面的几篇文章不止一次强调过ForkJoin框架的"轻量线程,轻量任务"等概念,也提到少许线程-多数计算,资源空闲时窃取任务.并介绍了基于status状态的调度(ForkJoinTask系列),不基于status而由子任务触发完成的调度(CountedCompleter系列),显然它们的共性就是让线程在正常调度的前提下尽可能少的空闲,最大幅度利用cpu资源,伪共享/缓存行的问题在ForkJoin框架中显然会是一个更大的性能大杀器.在1.8以前,通常经过补位的方式解决伪共享问题,1.8以后,官方使用@Contended注解,令虚拟机尽可能注解标注的字段(字段的状况)或成员字段放置在不一样的缓存行,从而规避了伪共享问题.数组
创建ForkJoinPool能够直接new,也能够使用Executors的入口方法.缓存
//Executors方法,显然ForkJoinPool被称做工做窃取线程池.参数指定了并行度. public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, //默认线程工厂,前文中已提过默认的ForkJoinWorkerThread ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //不提供并行度. public static ExecutorService newWorkStealingPool() { return new ForkJoinPool //使用全部可用的处理器 (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //对应的,ForkJoinPool的构造器们. //不指定任何参数. public ForkJoinPool() { //并行度取MAX_CAP和可用处理器数的最小值. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), //默认的线程工厂.无异常处理器,非异步模式. defaultForkJoinWorkerThreadFactory, null, false); } //同上,只是使用参数中的并行度. public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { //并行度须要校验 this(checkParallelism(parallelism), //校验线程工厂 checkFactory(factory), //参数指定的未捕获异常处理器. handler, //前面的几处代码asyncMode都是false,会选用LIFO队列,是true是会选用FIFO队列,后面详述. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //线程名前缀 "ForkJoinPool-" + nextPoolId() + "-worker-"); //检查许可,不关心. checkPermission(); } //检查方法很简单. //并行度不能大于MAX_CAP不能不大于0. private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } //线程工厂非空便可. private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } //最终构造器,私有.待介绍完一些基础字段后再述. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; //config初始化值,用并行度与mode取或,显然mode是FIFO时,将有一个第17位的1. this.config = (parallelism & SMASK) | mode; //np保存并行度(正数)的相反数(补码). long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
了解其余线程池源码的朋友能够去回忆其余线程池的构建,不管是调度线程池仍是普通的线程池或者缓存池,他们其实都设置了核心线程数和最大线程数.固然这要看定义"线程池分类"的视角,以Executors入口的api分类,或许能够分类成固定线程池,缓冲池,单线程池,调度池,工做窃取池;但以真正的实现分类,其实只有ThreadPoolExecutor系列(固定线程池,单线程池都直接是ThreadPoolExecutor,调度池是它的子类,缓冲池也是ThreadPoolExecutor,只是阻塞队列限定为SynchronizedQueue)和ForkJoinPool系列(工做窃取池).安全
做者更倾向于用实现的方式区分,也间接参照Executors的api使用用途的区分方式.若是不使用Executors的入口api,不论哪一种ThreadPoolExecutor系列,咱们均可以提供线程池的大小配置,阻塞队列,线程空闲存活时间及单位,池满拒绝策略,线程工厂等,而所谓的缓存池和固定池的区别只是队列的区别.数据结构
调度池的构造参数与ThreadPoolExecutor无异,只是内限了阻塞队列的类型,它虽然是ThreadPoolExecutor的扩展,却不只没有拓充参数,反而减小了两个参数:阻塞队列和最大线程数.阻塞队列被默认设置为内部类DelayQueue,它实现了BlockingQueue,最大线程数则为整数上限,同时新增的对任务的延时或重试等属性则是依托于内部维护的一个FutureTask的扩展,并未增长到构造参数.并发
而到了ForkJoinPool,咱们看到的是大相径庭于ThreadPoolExecutor系列的构建方式.首先根本没有提供核心线程和最大线程数,线程空闲存活时间的参数和阻塞队列以及池满拒绝策略;线程工厂也仅能提供生产ForkJoinWorkerThread的工厂bean;还具有一些ThreadPoolExecutor没有的参数,如未捕获异常处理器,同步异步模式,工做线程前缀(其实别的类型的线程工厂也能够提供线程前缀,默认就是常见的pool-前缀)等.框架
显然从参数看即可猜想出若干不一样于其余线程池的功能.但咱们更关心其中的一些参数设置.
通常的参数都能见名知义,仅有config和ctl难以理解,此处也不详细介绍,只说他们的初值的初始化.
config是并行度与SMASK取与运算再与mode取或,这里并行度最大是15位整数(MAX_CAP=0x7FFF),而SMASK做用于整数后16位,mode在FIFO为1<<16,LIFO是0.很好计算.
ctl实际上是一个控制信号,咱们后面会在具体源码就地解释,它的计算先经过了一个局部变量np.
np的计算方法是将并行度的相反数(补码)转换为长整型.前面简单分析,并行度不会大于MAX_CAP,所以np至少前49位所有是1.
计算ctl时,将np左移AC_SHIFT即为取后16位,将np左移TC_SHIFT即取它的后32位,分别与AC_MASK和TC_SHIFT,表示取np的后16位分别放置于ctl的前16位和33至48位.而ctl的后32位初值为0.
由于生成的ctl前16位和后16位相等,若是仔细用数学验证,能够发现,对前16位和后16位的末位同时加1,当添加了parallel次后,ctl将归0.这也是添加worker限制的重要数理依据.
前面列举了获取ForkJoinPool实例的几种方法,初步展现了构造一个ForkJoinPool的属性,也暴露了一些实现细节,而这些细节依赖于一些字段和成员函数,咱们先从它们开始.
//ForkJoinWorkerThread的线程工厂. public static interface ForkJoinWorkerThreadFactory { //建立新线程要实现的方法. public ForkJoinWorkerThread newThread(ForkJoinPool pool); } //前面看到的默认线程工厂. static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } //建立InnocuousForkJoinWorkerThread的线程工厂,上一文已经介绍过. static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction<ForkJoinWorkerThread>() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } } //空任务 static final class EmptyTask extends ForkJoinTask<Void> { private static final long serialVersionUID = -7721805057305804111L; EmptyTask() { status = ForkJoinTask.NORMAL; } //状态直接是已正常完成. public final Void getRawResult() { return null; } public final void setRawResult(Void x) {} public final boolean exec() { return true; } }
以上是线程工厂和一个默认的EmptyTask.接下来看一些跨池和工做队列的公用常量.
// 与边界有关的常量 static final int SMASK = 0xffff; // 后16位. static final int MAX_CAP = 0x7fff; // 前面在定并行度时参考的最大容量. static final int EVENMASK = 0xfffe; // 后16位验偶数 static final int SQMASK = 0x007e; // 最大64个偶数槽,从第2位至7位共6位,2的6次方. // 与WorkQueue有关 static final int SCANNING = 1; // 对WorkQueue正在运行任务的标记 static final int INACTIVE = 1 << 31; // 标记负数 static final int SS_SEQ = 1 << 16; // 版本号使用,第17位1 // ForkJoinPool和WorkQueue的config有关常量. static final int MODE_MASK = 0xffff << 16; // 能滤取前16位. static final int LIFO_QUEUE = 0;//前面提到过的,非async模式(false),值取0. static final int FIFO_QUEUE = 1 << 16;//async模式(true),值取1. static final int SHARED_QUEUE = 1 << 31; // 共享队列标识,符号位表示负.
以上的字段含义只是粗略的描述,先有一个印象,后面看到时天然理解其含义.
接下来看核心的WorkQueue内部类.
//前面的文章说过,它是一个支持工做窃取和外部提交任务的队列.显然,它的实例对内存部局十分敏感, //WorkQueue自己的实例,或者内部数组元素均应避免共享同一缓存行. @sun.misc.Contended static final class WorkQueue { //队列内部数组的初始容量,默认是2的12次方,它必须是2的几回方,且不能小于4. //但它应该设置一个较大的值来减小队列间的缓存行共享. //在前面的java运行时和54篇java官方文档术语中曾提到,jvm一般会将 //数组放在可以共享gc标记(如卡片标记)的位置,这样每一次写都会形成严重内存竞态. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大内部数组容量,默认64M,也必须是2的平方,但不大于1<<(31-数组元素项宽度), //根据官方注释,这能够确保无需计算索引归纳,但定义一个略小于此的值有助于用户在 //系统饱合前捕获失控的程序. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // unsafe机制有关的字段. private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long QTOP; private static final long QLOCK; private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> wk = WorkQueue.class; Class<?> ak = ForkJoinTask[].class; //top字段的句柄. QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); //qlock字段的句柄. QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); //currentSteal的句柄 QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); //ABASE是ForkJoinTask数组的首地址. ABASE = U.arrayBaseOffset(ak); //scale表明数组元素的索引大小.它必须是2的平方. int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //计算ASHIFT,它是31与scale的高位0位数量的差值.由于上一步约定了scale必定是一个正的2的几回方, //ASHIFT的结果必定会大于1.能够理解ASHIFT是数组索引大小的有效位数. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } //插曲,在Integer类的numberOfLeadingZeros方法,果真一流的程序是数学. public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) //i自己已经是0,毫无疑问地返回32.本例中i是2起,因此不会. return 32; //先将n初始化1.最后会减掉首位1. int n = 1; //i的前16位不存在非零值,则将n加上16并移除i的前16位.将i转换为一个以原i后16位开头的新值. if (i >>> 16 == 0) { n += 16; i <<= 16; } //不论前一步结果如何,若此时i的前8位不存在非零值,则n加上8,i移除前8位.将i转换为原i的后24位开头的新值. if (i >>> 24 == 0) { n += 8; i <<= 8; } //不论前一步结果如何,若此时i的前4位不存在非零值,则n加上4,i移除前4位.将i转换为原i的后28位开头的新值. if (i >>> 28 == 0) { n += 4; i <<= 4; } //不论前一步结果如何,若此时i的前2位不存在非零值,则n加上2,i移除前2位.将i转换为原i的后30位开头的新值. if (i >>> 30 == 0) { n += 2; i <<= 2; } //通过前面的运算,i的前30位的非零值数量已经记入n, //在前一步的基础上,此时i的前1位若存在非零值,则n-1,不然n保留原值. n -= i >>> 31; return n; } //回到WorkQueue // 实例字段 volatile int scanState; // 版本号,小于0表明不活跃,注释解释奇数表明正在扫描,但从代码语义上看正好相反. int stackPred; // 前一个池栈控制信号(ctl),它保有前一个栈顶记录. int nsteals; // 偷盗的任务数 int hint; // 一个随机数,用于决定偷取任务的索引. int config; // 配置,表示池的索引和模式 volatile int qlock; // 队列锁,1表示锁了,小于0表示终止,其余状况是0. volatile int base; // 底,表示下一个poll操做的插槽索引 int top; // 顶,表示下一个push操做的插槽索引 ForkJoinTask<?>[] array; // 存听任务元素的数组,初始不分配,首扩容会分配. final ForkJoinPool pool; // 包含该队列的池,可能在某些时刻是null. final ForkJoinWorkerThread owner; // 持有该队列的线程,若是队列是共享的,owner是null. volatile Thread parker; // 在调用park阻塞的owner,非阻塞时为null volatile ForkJoinTask<?> currentJoin; // 被在awaitJoin中join的task. volatile ForkJoinTask<?> currentSteal; // 字面意思当前偷的任务,主要用来helpStealer方法使用. //工做队列构造器,只初始化线程池,owner等字段. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) //base和top初始均为INITIAL_QUEUE_CAPACITY的一半,也就是2的11次方. base = top = INITIAL_QUEUE_CAPACITY >>> 1; } //返回本队列在池中的索引,使用config的2至4位表示.由于config的最后一位是奇偶位,忽略. final int getPoolIndex() { return (config & 0xffff) >>> 1; } //返回队列中的任务数. final int queueSize() { //非owner的调用者必须先读base,用base-top,获得的结果小于0则取相反数,不然取0. //忽略即时的负数,它并不严格准确. int n = base - top; return (n >= 0) ? 0 : -n; } //判断队列是否为空队.本方法较为精确,对于近空队列,要检查是否有至少一个未被占有的任务. final boolean isEmpty() { ForkJoinTask<?>[] a; int n, m, s; //base大于等于top,说明空了. return ((n = base - (s = top)) >= 0 || //有容量,且刚好计算为1,可能只有一个任务. (n == -1 && //计算为1,再验数组是否是空的. ((a = array) == null || (m = a.length - 1) < 0 || //取该位置元素的值判空,空则说明isEmpty. //取值的方式是取ForkJoinTask.class首地址加上偏移量(数组长度减一(最后一个元素位置,经典案例32-1)与运算top减一左移ASHIFT(索引大小有效位数)位)的值. U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } //将一个任务压入队列,前文提过的fork最终就会压队.但此方法只能由非共享队列的持有者调用. //当使用线程池的"外部压入"externalPush方法时,压入共享队列. final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; //保存当时的base top. int b = base, s = top, n; //若是数组被移除则忽略. if ((a = array) != null) { //数组最后一个下标.如长度32,则m取31这个质数.此时保存一个m,对于保存后其余push操做至关于打了屏障. int m = a.length - 1; //向数组中的指定位置压入该任务.位置包含上面的m和s进行与运算(数组中的位置),结果左移索引有效长度位(索引长度),再加上数组首索引偏移量(起始地址). U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //将top加1. U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { //计算旧的任务数量,发现不大于1个,说明原来极可能工做线程正在阻塞等待新的任务.须要唤醒它. if ((p = pool) != null) //signalWork会根据状况,添加新的工做线程或唤醒等待任务的线程. p.signalWork(p.workQueues, this); } else if (n >= m)//2. //任务数量超出了,对数组扩容. growArray(); } } //添加任务过程主流程无锁,包括可能出现的growArray.当原队列为空时,它会初始化一个数组,不然扩容一倍. //持有者调用时,不须要加锁,但当其余线程调用时,须要持有锁.在resize过程当中,base能够移动,但top否则. final ForkJoinTask<?>[] growArray() { //记录老数组. ForkJoinTask<?>[] oldA = array; //根据老数组决定新容量,老数组空则INITIAL_QUEUE_CAPACITY不然国倍. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) //新大小大于最大数组大小则拒绝. throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; //直接将原来的数组引用替换成新的. ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; //若是是初次分配,就此打住返回a,是扩容,且老数组非空则进入下面的循环拷贝. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { //根据前面的运算,size必定是2的幂,减一用来哈希,这是经典处理办法. int mask = size - 1; do { ForkJoinTask<?> x; //老数组base自增过若干次的获得b,它表明的元素对应的索引. int oldj = ((b & oldMask) << ASHIFT) + ABASE; //用b在新数组中找出索引. int j = ((b & mask) << ASHIFT) + ABASE; //老数组中用索引取出元素. x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); if (x != null && //老数组置空,放入新数组. U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); //每处理完一个task,就将base自增1,直到top为止. } while (++b != t); } //返回新数组. return a; } //存在下一个任务,弹出,顺序是后进先出.此方法仅限非共享队列的owner调用. final ForkJoinTask<?> pop() { ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; //还有元素. if ((a = array) != null && (m = a.length - 1) >= 0) { //1.top至少比base大一.注意,每次循环都会读出新的top,它是volatile修饰的. for (int s; (s = top - 1) - base >= 0;) { //top对应的索引. long j = ((m & s) << ASHIFT) + ABASE; //2.该索引没有元素,break,返回null.并且就表明这个位置的确是null,与竞态无关. //由于此方法仅owner线程使用,不会出现另外一个线程计算了一样的j,且先执行了3的状况. //出现这种状况,则是此位置的任务当先被执行并出栈,或者就从未设置过任务,后续分析这种极端状况. //故若是出现某个任务在数组的中间,提早被执行并置空(非pop或poll方式),那么再对WorkQueue进行pop时将会中断, //留下一部分null以后的任务不能出栈,因此能够容许任务非pop或poll方式查出并执行,但为了能pop出全部任务,不能中间置null. if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) break; //3.有元素,将该索引位置置null.若cas失败,说明元素被取出了, //但下次循环即便在2处break并返回null,也不是由于竞态,由于每次循环到1都会读取新的top, //也就有新的j. if (U.compareAndSwapObject(a, j, t, null)) { //数组位置置null的同时top减1. U.putOrderedInt(this, QTOP, s); return t; } } } //循环退出,说明top位置没有元素,也至关于说明数组为空.显然此方法的另外一个做用是将队列压缩,空队列会将top先降到base+1,再循环最后一次将top降到base. return null; } //若是b是base,使用FIFO的次序尝试无竞态取底部的任务.它会在ForkJoinPool的scan和helpStealer中使用. final ForkJoinTask<?> pollAt(int b) { ForkJoinTask<?> t; ForkJoinTask<?>[] a; if ((a = array) != null) { //和前面同样的的方式计算b对应的索引j int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && //j对应位置有task且当前base==b,尝试将task出队. base == b && U.compareAndSwapObject(a, j, t, null)) { //出队成功base增1.不须要额外的同步,由于两个线程不可能同时在上面的cas成功. //当一切条件匹配(b就是base且j位置有元素),pollAt同一个b只会有一个返回非空的t. //若是多个线程传入的b不相等,在同一时刻只有一个会等于base. base = b + 1; return t; } } return null; } //用FIFO的次序取下一个任务. final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; //1.循环从base取任务,当base增加到top或其余操做重置array为null则终止循环. while ((b = base) - top < 0 && (a = array) != null) { //前面已叙述过取索引的逻辑,使用一个top到base间的数与数组长度-1与运算并左移索引长度位再加上数组基准偏移量.后面再也不缀述. int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //取出task t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); //2.若是发生竞态,base已经不是b,直接开启下一轮循环把新的base读给b. if (base == b) { if (t != null) { //3.当前t是base任务,用cas置空,base+1,返回t. //若是此处发生竞态,则只有一个线程能够成功返回t并重置base(4). //不成功的线程会开启下一轮循环,此时成功线程可能将来的及执行4更新base, //也可能已经更新base,则致使先前失败的线程在2处经过,经5种或判队列空返回,或非空再次循环,而 //在当前成功线程执行4成功后,全部前面失败的线程能够在1处读到新的base,这些线程 //在下一次循环中依旧只会有一个成功弹出t并重置base,直到全部线程执行完毕. if (U.compareAndSwapObject(a, j, t, null)) { //4重置加返回 base = b + 1; return t; } } //5.t取出的是空,发现此时临时变量b(其余成功线程在此轮循环前置的base)已增至top-1,且当前线程又没能成功的弹出t,说明必定会有一个线程 //将t弹出并更新base到top的值,当前线程不必再开下一个循环了,直接break并返回null. //t取出的是空,可是没到top,说明只是被提早执行并置空了,那么继续读取新的base并循环,且若没有其余线程去更改base,array的长度,或者把top降到 //base,则当前线程就永远死循环下去了,由于每次循环都是125且每一个变量都不变.所以为避免循环,每一个任务能够提早执行,但必定不能提早离队(置null). //也就是说:只能用poll或pop方式弹出任务,其余方式得到任务并执行是容许的,但不能在执行后置null,留待后续源码验证一下. else if (b + 1 == top) // now empty break; } } //从循环退出来有两种状况,多是在5处知足退出条件,或者在2处发现b已是脏数据,下轮循环不知足循环条件所致.两种都应该返回null. return null; } //根据mode来取下一个本队列元素.根据模式. final ForkJoinTask<?> nextLocalTask() { //当前WorkQueue的配置了FIFO,则poll,不然pop. //尽管还未看到注册worker的源码,在此提早透露下,ForkJoinPool也有一个config(前面讲构造函数提过) //该config保存了mode信息,并原样赋给了WorkQueue的mode.注意,相应的任务会出队. return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } //根据模式取出下一个任务,可是不出队. final ForkJoinTask<?> peek() { ForkJoinTask<?>[] a = array; int m; //空队,返回null. if (a == null || (m = a.length - 1) < 0) return null; //根据mode定位要取的索引j. int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; //返回读出的值,不出队. return (ForkJoinTask<?>)U.getObjectVolatile(a, j); } //若是参数t是当前队的top,则弹出. final boolean tryUnpush(ForkJoinTask<?> t) { ForkJoinTask<?>[] a; int s; if ((a = array) != null && (s = top) != base && //1.知足非空条件.尝试用t去当看成计算出的索引位置的原任务的值并cas为null来出队. U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //cas成功,说明t确实是top,将top减一返回true. U.putOrderedInt(this, QTOP, s); return true; } //2.cas失败或不知足1的条件,返回false. return false; } //移除并取消队列中全部已知的任务,忽略异常. final void cancelAll() { ForkJoinTask<?> t; if ((t = currentJoin) != null) { //有currentJoin,引用置空,取消并忽略异常. currentJoin = null; ForkJoinTask.cancelIgnoringExceptions(t); } if ((t = currentSteal) != null) { //有currentSteal,引用置空,取消并忽略异常. currentSteal = null; ForkJoinTask.cancelIgnoringExceptions(t); } //除了上面两个,就只剩下数组中的任务了.按LILO的顺序弹出并依次取消,忽略全部异常. while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } // 如下是执行方法. //按FIFO顺序从队首弹出任务并执行全部非空任务. final void pollAndExecAll() { for (ForkJoinTask<?> t; (t = poll()) != null;) //很明显,若是未按严格顺序执行,先执行中间的一个任务, //再调用本方法,则会半路停止. t.doExec(); } //移除并执行完全部本队列的任务,若是是先进先出,则执行前面的pollAndExecAll方法. //不然pop循环执行到空为止.按前面的分析,只要坚持只能pop或poll弹出,其余方式执行任务但不能置空的原则, //能够保证pop或poll出现空的状况只能是竞态发生的状况. final void execLocalTasks() { int b = base, m, s; ForkJoinTask<?>[] a = array; //初始知足条件,top至少比base大1.队列非空. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //不是FIFO模式. if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask<?> t;;) { //原子getAndSet,查出并弹出本来的task if ((t = (ForkJoinTask<?>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) //弹出的task是空,break.说明整个工做流程中,若是未保证严格有序, //如先从中间的某个任务开始执行而且出队了,再调用execLocalTasks,会致使中间停顿. //只执行不出队,则至少不会中断.出现t是null的状况只能是竞态或末尾. break; //top减一,执行任务. U.putOrderedInt(this, QTOP, s); t.doExec(); //若是base大于等于top,则停止. if (base - (s = top - 1) > 0) break; } } //是FIFO模式,pollAndExecAll. else pollAndExecAll(); } } //重点入口方法来了,前面留下诸多关于执行任务是否出队的讨论,下面来分析入口方法. //该方法的入口是每一个工做线程的run方法,所以只有一个线程. final void runTask(ForkJoinTask<?> task) { //传入task是空直接不理会. if (task != null) { //标记成忙.scanState是WorkQueue的成员变量,每一个WorkQueue只有一个值, //前面说过,通常状况下,每一个线程会有一个WorkQueue,因此某种状况来说也能够标记为 //当前ForkJoinWorkerThread繁忙. //SCANNING常量值是1,这个操做实质上就是将scanState变量的个位置0,也就是变成了偶数并标记它要忙了. //显然偶数才表示忙碌,这也是为何前面以为官方注释scanState是奇数表示"正在扫描"很奇怪. scanState &= ~SCANNING; //将currentSteal设置为传入的任务,并运行该任务,若该任务内部进行了分叉,则进入相应的入队逻辑. (currentSteal = task).doExec(); //执行完该任务后,将currentSteal置空.将该task释放掉,帮助gc. U.putOrderedObject(this, QCURRENTSTEAL, null); //调用前面提到的,根据mode选择依次pop或poll的方式将本身的工做队列内的任务出队并执行的方法. execLocalTasks(); //到此,本身队列中的全部任务都已经完成.包含偷来的任务fork后又入队到本身队列的子任务. //取出owner线程.处理偷取任务有关的一些信息. ForkJoinWorkerThread thread = owner; if (++nsteals < 0) //发现当前WorkQueue偷来的任务数即将溢出了,将它转到线程池. transferStealCount(pool); //取消忙碌标记. scanState |= SCANNING; if (thread != null) //执行afterTopLevelExec勾子方法,上一节中介绍ForkJoinWorkerThread时已介绍. thread.afterTopLevelExec(); } //方法结束,注意,没有任何操做将task从所在的数组中移除,不论这个task是哪一个WorkQueue中的元素. //同时,此方法原则上讲能够屡次调用(尽管事实上就一次调用),入口处和出口处分别用忙碌标记来标记scanState,但重复标记显然不影响执行. } //若是线程池中已经初始化了用于记录的stealCounter,则用它加上当前WorkQueue的nsteals/或最大整数(发生溢出时). //并初始化当前WorkQueue的nsteals. final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { //线程池中存放了stealCounter,它是一个原子整数. int s = nsteals; nsteals = 0; //恢复0. //若nsteals是负,增长最大整数,不然增长nsteal sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } //若是task存在,则将它从队列中移除并执行,发现有位于顶部的取消任务,则移除之,只用于awaitJoin. //若是队列空而且任务不知道完成了,则返回true. final boolean tryRemoveAndExec(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; int m, s, b, n; //进入if的条件,存在非空任务数组,参数task非空. if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //循环条件,队列非空.从s开始遍历到b,也就是从顶到底.后进先出. while ((n = (s = top) - (b = base)) > 0) { //1.内层循环. for (ForkJoinTask<?> t;;) { //2.从顶开始的索引j,每次向下找一个. long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) //3.取出的是空,返回值取决于top是否是内层循环是第一次运行,外循环每次会将s更新为新top, //内循环则会每次将s减一.内循环只跑了一次的状况,显然会返回true. //显然这种状况下top也没有被其余线程更新,内循环又是第一次跑,那么将足以说明当前队列为空,该为false. //true的状况,向下遍历了几个元素打到了底,未进入46 10这三种要重开启一轮外循环的状况,也没找到task. //无论怎样,发现空任务就返回. return s + 1 == top;// 比预期短,第一个或第n个出现了空值,但循环条件未false else if (t == task) { //找到的任务t不是空,且是目标任务. boolean removed = false; if (s + 1 == top) { //4.发现是首轮内循环,s+1==top成立,进行pop操做,将task弹出并将top减一. //显然,task是最顶任务,能够用pop方式,将它置空. if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); //5.置removed为true. removed = true; } } //6.不是首轮循环,并且base没有在处理期间发生改变. else if (base == b) //7.尝试将task替换成一个EmptyTask实例.成功则removed是true, //这样虽然该任务出了队,但在队上还有一个空的任务,而不会出现前面担忧的中间null //的状况,也不改变top或base的值. removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) //8.只要任务成功出队(不管是4仍是7,则执行. task.doExec(); //9.只要找到任务,退出内循环,回到外循环重置相应的条件. break; } //10.本轮内循环没找到匹配task的任务. else if (t.status < 0 && s + 1 == top) {//官方注释是取消. //11.若t是完成的任务且是首轮内循环且top未变更,将该任务出队并令top减一. if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); //12.只要进入此分支就退出内循环. break; } if (--n == 0) //13.内循环每执行到此一次,就说明有一次没找到目标任务,减小n(开始时的base top差值).达0时返回false中止循环. //即每一个内循环都只能执行n次,进入外循环时重置n. return false; } //14.结束了任何一轮内循环时,发现目标task已经完成,则中止外循环返回false. if (task.status < 0) return false; } } //15.task参数传空,或者当前WorkQueue没有任务,直接返回true. return true; } //简单梳理一下tryRemoveAndExec的执行流程和生命周期. //a.显然,一上来就判队列的空和参数的空,若是第一个if都进不去,按约定返回true. //b.通过1初始化一个内层循环,并初始化了n,它决定内循环最多跑n次,若是内循环一直不break(9找到任务或12发现顶部任务是完成态),也假定通常碰不到14(发现目标任务完成了) //也没有出现几种return(3查出null,14某轮内循环目标task发现被完成了),那么最终只会耗尽次数,遍历到底,在13处return false(肯定此轮循环task不在队列) //c.若是出现了几种break(9,12),9其实表明查到任务,12表明顶部任务已完成(官方说取消),那就会中止内循环,从新开启一轮外循环,初始化n,继续重新的top到base遍历(b). //但此时,可能找不到task了(它已经在上一轮内循环出队或被替换成代理),但也可能实际上未出队(该task不是top,即4,base也发生了改变形成7未执行),那么可能在本轮循环 //找到任务,在b中进入相应的break,而且成功移除并会进入d,也可能没进入break而是再重复一次b. //d.若是某一次break成功删除了任务,那么外循环更新了n,base,top,重启了一次内循环,可是全部找到task的分支不会再有了,若是接下来再也不碰到被完成(取消)的顶部任务11-12, //一样也没发现目标task完成了(不进14),那么最终的结果就是n次内循环后n下降到0,直接return false. //e.从b-d任何一次内循环在最后发现了task结束,当即返回false.不然,它可能在某一次内循环中弹出并执行了该任务,却可能一直在等待它完成,所以这个机制可让等待task完成前, //帮助当前WorkQueue清理顶部无效任务等操做. //此方法适用于不论共享或者独有的模式,只在helpComplete时使用. //它会弹出和task相同的CountedCompleter,在前一节讲解CountedCompleter时已介绍过此方法. //父Completer仅能在栈链上找到它的父和祖先completer并帮助减挂起任务数或完成root,但在此处 //它能够帮助栈链上的前置(子任务),前提是要popCC弹出. final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) { int s; ForkJoinTask<?>[] a; Object o; //当前队列有元素. if (base - (s = top) < 0 && (a = array) != null) { //老逻辑从顶部肯定j. long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if ((o = U.getObjectVolatile(a, j)) != null && (o instanceof CountedCompleter)) { //当前队列中存在类型为CountedCompleter的元素.对该completer栈链开启一个循环. CountedCompleter<?> t = (CountedCompleter<?>)o; for (CountedCompleter<?> r = t;;) { //对该CountedCompleter及它的completer栈元素进行遍历,每个遍历到的临时存放r. //找到r==task,说明有一个completer位于task的执行路径. if (r == task) { //mode小于0,这个mode其实有误解性,它的调用者实际上是将一个WorkQueue的config传给了这个mode. //而config只有两处初始化,一是将线程注册到池的时候,初始化WorkQueue, //二是外部提交的任务,使用externalSubmit时新建的WorkQueue,config会是负值且没有owner. //它也说明是共享队列,须要有锁定机制.. if (mode < 0) { //另外一个字段qlock派上了用场,将它置为1表示加锁. if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { //加锁成功,在top和array这过程当中未发生变更的状况下,尝试 //将t出队,此时t是栈顶上的元素,它的completer栈链前方有task. if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); U.putOrderedInt(this, QLOCK, 0); return t; } //不论出队成功仍是失败,解锁. U.compareAndSwapInt(this, QLOCK, 1, 0); } } //非共享队列,直接将t出列. else if (U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); return t; } //只要找到,哪怕两处cas出现不成功的状况,也是竞态失败,break终止循环. break; } //r不等于task,找出r的父并开始下轮循环,直到root或找到task为止. else if ((r = r.completer) == null) // try parent break; } } } //空队列,顶部不是Completer或者不是task的子任务,返回null. return null; } //尝试在无竞态下偷取此WorkQueue中与给定task处于同一个completer栈链上的任务并运行它, //若不成功,返回一个校验合/控制信号给调用它的helpComplete方法. //返回规则,成功偷取则返回1;返回2表明可重试(被其余小偷击败),若是队列非空但未找到匹配task,返回-1, //其余状况返回一个强制负的基准索引. final int pollAndExecCC(CountedCompleter<?> task) { int b, h; ForkJoinTask<?>[] a; Object o; if ((b = base) - top >= 0 || (a = array) == null) //空队列,与最小整数(负值)取或做为信号h h = b | Integer.MIN_VALUE; // to sense movement on re-poll else { //从底部取索引j long j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用该索引取task取出null,说明被捷足先登了,信号置为可重试. if ((o = U.getObjectVolatile(a, j)) == null) h = 2; // retryable //取出的非空任务类型不是CountedCompleter.说明不匹配,信号-1 else if (!(o instanceof CountedCompleter)) h = -1; // unmatchable else { //是CountedCompleter类型 CountedCompleter<?> t = (CountedCompleter<?>)o; for (CountedCompleter<?> r = t;;) { //基本同上个方法的逻辑,只是上个方法t取的是top,这里取base. //r从t开始找它的父,直到它自己或它的父等于task.将它从底端出队. if (r == task) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { //出队成功,由于咱们找的是base,且竞态成功,直接更新base便可. base = b + 1; //出队后执行该出队的任务.返回1表明成功. t.doExec(); h = 1; // success } //base被其余线程修改了,或者cas竞态失败.(实际上是一个状况),信号2,能够重新的base开始重试. else h = 2; // lost CAS //只要找到task的子任务就break,返回竞态成功或可重试的信号. break; } //迭代函数,当前r不是task,将r指向它的父,直到某一个r的父是task或者是null进入else if. else if ((r = r.completer) == null) { //可以进来,说明r已经指向了root,却没有找到整条链上有这个task,返回信号为未匹配到. h = -1; // unmatched break; } } } } return h; } //若是当前线程拥有此队列且明显未被锁定,返回true. final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; //前面提过的scanState会在一上来runTask时和1的反码取与运算,直到运行完任务才会反向运算. //这个过程,scanState的最后一位会置0,但这与此判断条件关系不大. //前面对scanState有所注释,小于0表明不活跃. return (scanState >= 0 && //队列处于活跃态且当前线程的状态不是阻塞,不是等待,不是定时等待,则返回true. (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } }
到此终于WorkQueue内部类的代码告一段落.
这一段介绍了WorkQueue的内部实现机制,以及与上一节有关的提到的CountedCompleter在帮助子任务时处于WorkQueue的实现细节(彷佛默认状况下即asnycMode传true时只会从当前工做线程队列取顶部元素,从其余随机队列的底部开取,有可能能够重复取,具体细节到ForkJoinPool的helpComplete相关源码再说),以及构建好的WorkQueue会有哪些可能的状态和相应的字段,以及若干模式(同步异步或者LIFO,FIFO等),出队入队的操做,还提出了队列中元素为何中间不能为空,若是出现要将中间元素出队怎么办?别忘了答案是换成一个EmptyTask.
不妨小结一下WorkQueue的大体结构.
1.它规避了伪共享.
2.它用scanState表示运行状态,版本号,小于0表明不活跃维护了忙碌标记,也用scanState在runTask入口开始运行任务时标记为忙碌(偶数),结束后再取消忙碌状态(奇数).注释解释奇数表明正在扫描,但从代码语义上看正好相反
3.它维护了一个能够扩容的数组,也维护了足够大的top和base,[base,top)或许能够形象地表示它的集合,pop是从top-1开始,poll从base开始,当任务压入队成功后,检查若top-base达到了数组长度,也就是集合[base,top)的元素数达到或者超过了队列数组长度,将对数组进行扩容,因使用数组长度-1与哈希值的方式,扩容先后原数组元素索引不变,新压入队列的元素将在此基础上无缝添加,所以扩容也规避了出现中间任务null的状况.初始容量在runWorker时分配.
4.它维护了偷取任务的记录和个数,并在溢出等状况及时累加给池.它也维护了阻塞者线程和主人线程.
5.它可能没有主人线程(共享队列),或有主人线程(非共享,注册入池时生成)
6.它维护了队列锁qlock,但目前仅在popCC且当前为共享队列状况下使用,保证争抢的同步.
7.其余一些字段如config,currentJoin,hint,parker等,须要在后续的线程池自身代码中结合前面的源码继续了解,包含stackPred听说保持前一个池栈的运行信号.
WorkQueue本质也是一个内部类,它虽然定义了一系列实现,但这些实现方法的调度仍是由ForkJoinPool来实现,因此咱们仍是要回归到ForkJoinPool自身的方法和公有api上,遇到使用上面WorkQueue定义好的工具方法时,咱们再来回顾.
前面已经看了一些影响WorkQueue的位于ForkJoinPool的常量,再来继续看其余的ForkJoinPool中的一些常量.
//默认线程工厂.前面提过两个实现 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //是否容许启动者在方法中杀死线程的许可,咱们忽略这方面的内容. private static final RuntimePermission modifyThreadPermission; //静态的common池 static final ForkJoinPool common; common池的并行度. static final int commonParallelism; //tryComensate方法中对构造备用线程的创造. private static int commonMaxSpares; //池顺序号,建立工做线程会拼接在名称上. private static int poolNumberSequence; //同步方法同,递增的池id. private static final synchronized int nextPoolId() { return ++poolNumberSequence; } // 如下为一些静态配置常量. //IDLE_TIMEOUT表明了一个初始的纳秒单位的超时时间,默认为2s,它用于线程触发静止停顿以等待新的任务. //一旦超过了这个 时长,线程将会尝试收缩worker数量.为了不某些如长gc等停顿的影响,这个值应该足够大 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec //为应对定时器下冲设置的空闲超时容忍度. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms //它是commonMaxSpares静态初始化时的初值,这个值远超普通的须要,但距离 //MAX_CAP和通常的操做系统线程限制要差很远,这也使得jvm可以在资源耗尽前 //捕获资源的滥用. private static final int DEFAULT_COMMON_MAX_SPARES = 256; //在block以前自旋等待的次数,它在awaitRunStateLock方法和awaitWork方法中使用, //但它事实上是0,所以这两个方法其实在用随机的自旋次数,设置为0也减小了cpu的使用. //若是将它的值改成大于0的值,那么必须设置为2的幂,至少4.这个值设置达到2048已经能够 //耗费通常上下文切换时间的一小部分. private static final int SPINS = 0; //种子生成器的默认增量.注册新worker时详述. private static final int SEED_INCREMENT = 0x9e3779b9;
上面都是一些常量的声明定义,下面看一些与线程池config和ctl有关的常量,以及前面构造器提过的变量.
// 高低位 private static final long SP_MASK = 0xffffffffL;//long型低32位. private static final long UC_MASK = ~SP_MASK;//long型高32位. // 活跃数. private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位开始. private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48表明一个活跃数单位. private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64) // 总数量 private static final int TC_SHIFT = 32;//移位偏移量,33位开始. private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32表明一个总数量 private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位 //与运行状态有关的位,显而后面的runState是个int型,这些移位数也明显是int的范围. //SHUTDOWN显然必定是负值,其余值也都是2的幂. private static final int RSLOCK = 1;//run state锁,简单来讲就是奇偶位. private static final int RSIGNAL = 1 << 1;//2 运行状态的唤醒. private static final int STARTED = 1 << 2;//4,启动 private static final int STOP = 1 << 29;//30位,表明停. private static final int TERMINATED = 1 << 30;//31位表明终止. private static final int SHUTDOWN = 1 << 31;//32位表明关闭. //实例字段. volatile long ctl; // 表明池的主要控制信号,long型 volatile int runState; // 能够锁的运行状态 final int config; // 同时保存了并行度和模式(开篇的构造函数) int indexSeed; // 索引种子,生成worker的索引 volatile WorkQueue[] workQueues; // 工做队列的注册数组. final ForkJoinWorkerThreadFactory factory;//线程工厂 final UncaughtExceptionHandler ueh; // 每个worker线程的未捕获异常处理器. final String workerNamePrefix; // 工做线程名称前缀. volatile AtomicLong stealCounter; // 表明偷取任务数量,前面提过,官方注释说也用做同步监视器
仅仅看这些字段的简单描述是没法完全搞清楚它们的含义的,仍是要到应用的代码来看,咱们继续向下看ForkJoinPool中的一些方法.
//尝试对当前的runState加锁标志位,并返回一个runState,这个runState多是原值(无竞态)或新值(竞态且成功). //不太准确的语言能够说是"锁住"runState这个字段,其实不是,从代码上下文看, //该标志位被设置为1的期间,尝试去lock的线程能够去更改runState的其余位,好比信号位. //而lockRunState成功的线程则是紧接着去更改ctl控制信号,工做队列等运行时数据,故能够称runState在锁标志这一块 //能够理解为运行状态锁. private int lockRunState() { int rs; //runState已是奇数,表示已经锁上了,awaitRunState return ((((rs = runState) & RSLOCK) != 0 || //发现本来没锁,尝试将原rs置为rs+1,即变为奇数. !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //原来锁了或者尝试竞态加锁不成功,等待加锁成功,不然直接返回rs. awaitRunStateLock() : rs); } //自旋或阻塞等待runstate锁可用,这与上面的runState字段有关.也是上一个方法的自旋+阻塞实现. private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //每轮循环重读rs. if (((rs = runState) & RSLOCK) == 0) { //1.发现rs仍是偶数,尝试将它置为奇数.(锁) if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { //2,锁成功后发现扰动了,则扰动当前线程,catch住不符合安全策略的状况. if (wasInterrupted) { try { //2.1扰动.它将影响到后面awaitWork方法的使用. Thread.currentThread().interrupt(); } catch (SecurityException ignore) { } } //2.2返回的是新的runStatus,至关于原+1,是个奇数. //注意,此方法中只有此处一个出口,也就是说必需要锁到结果. return ns; } } //在1中发现被锁了或者2处争锁竞态失败. else if (r == 0) //3.全部循环中只会执行一次,若是简单去看,nextSecondarySeed是一个生成 //伪随机数的代码,它不会返回0值.r的初值是0. r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { //4.有自旋次数,则将r的值进行一些转换并开启下轮循环.默认spins是0,不会有自旋次数. //从源码来看,自旋的惟一做用就是改变r的值,使之可能从新进入3,也会根据r的结果决定是否减 //少一次自旋. //r的算法,将当前r的后6位保留,用r的后26位与前26位异或被保存为r的前26位(a). //再将(a)的结果处理,r的前21位保持不变,后11位与前11位异或并保存为r的后11位(b). //再将(b)的结果处理,r的后7位保持不变,用前25位与后25位异或并保存为r的前25位(c) //个中数学原理,有兴趣的研究一下吧. //显然,自旋次数并非循环次数,它只能决定进入6中锁代码块前要运行至少几轮循环. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) //通过上面的折腾r还不小于0的,减小一个自旋次数. //自旋次数不是每次循环都减一,但减到0以后不表明方法中止循环,而是进入2(成功)或者6(阻塞). --spins; } //某一次循环,r不为0,不能进入3,自旋次数也不剩余,不能进入4.则到此. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) //5.线程池的runState表示还未开启,或者还未初始化偷锁(stealCounter),说明 //还没完成初始化,此处是初始化时的竞态,直接让出当前线程的执行权.等到从新获取执行权时, //从新循环,读取新的runState并进行. Thread.yield(); // initialization race else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {//可重入 //6.没能对runState加锁,也不是5中的初始化时竞态的状况,尝试加上信号位,以stealCounter进行加锁. //显然,这种加信号位的加法不会由于信号位而失败,而会由于runState的其余字段好比锁标识位失败,这时 //从新开始循环便可. synchronized (lock) { //明显的double check if ((runState & RSIGNAL) != 0) { //6.1当前pool的runState有信号位的值,说明没有线程去释放信号位. try { //6.2runState期间没有被去除信号位,等待. lock.wait(); } catch (InterruptedException ie) { //6.3等待过程当中发生异常,且不是记录一个标记,在2处会因它中断当前线程. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else //6.4当前runState没有信号位的值,说明被释放了,顺便唤醒等待同步块的线程.让他们继续转圈. lock.notifyAll(); } } } } //解锁runState,前面解释过,这个锁能够理解为对runState的锁标志位进行设定,而设定成功的结果就是能够改信号量ctl. //它会解锁runState,并会用新的runState替换. private void unlockRunState(int oldRunState, int newRunState) { //首先尝试cas.cas成功可能会致使上一个方法中进入同步块的线程改走6.4唤醒阻塞线程. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { //cas不成功,直接强制更改. Object lock = stealCounter; runState = newRunState;// 这一步可能清除掉信号位.使上一个方法中已进入同步块的线程改走6.4 if (lock != null) //强制更换为新的运行状态后,唤醒全部等待lock的线程. synchronized (lock) { lock.notifyAll(); } } }
上面的几个方法是对runState字段进行操做的,并利用了信号位,锁标识位,运行状态位.
显然,虽然能够不精确地说加锁解锁是对runState的锁标识位进行设置,严格来讲,这倒是为ctl/工做队列等运行时数据服务的(后面再述),显然精确说是对运行时数据的修改权限加锁.
一样的,加锁过程采用自旋+阻塞的方式,整个循环中同时兼容了线程池还在初始化(处理方式让出执行权),设定了自旋次数(处理方式,随机数判断要不要减小自旋次数,自旋次数降0前不会阻塞)这两种状况,也顺便在阻塞被扰动的状况下暂时忽略扰动,只在成功设置锁标识位后顺手负责扰动当前线程.
简单剥离这三种状况,加锁过程是一轮轮的循环,会尝试设置锁标识位,成功则返回新标识,不成功则去设置信号位(可能已经有其余线程设置过了),设置信号位也是使用原子更改,即便其余线程设置过信号位,原子更改也会成功,惟一能形成失败的是runState的其余位发生变动,并且极可能是由于锁标识位被释放的缘由.
unlock操做也并不复杂,若是传入的newRunState参数依旧表明了锁,那么不会有任何效果.这里只考虑有效果的状况,即取消了锁(注释说必须取消锁标识位,unlockRunState的方法确实都这么作的),若没有其余线程竞态修改runState,则解锁直接经过一个cas成功,也不须要去唤醒其余线程.不然在解锁操做尝试去用cas释放锁标识位的时候没有成功,说明在unlock操做的线程读取到runState后又有其余线程对它进行了更改,那么直接暴力重置为newState并唤醒阻塞线程.
这彷佛存在一个bug.若是加锁时出现了竞态,若干个加锁线程被阻塞,此时信号位和锁位都有值,过了一会,有一个线程去解锁,在解锁前,它先读取了如今的runState,而后用cas去修改,由于此时没有竞态,解锁线程在读取到runState到cas期间,没有任何线程去改过runState,那么解锁线程直接就会cas成功,而不会去唤醒前面阻塞的线程.
不过还有一个解锁的地方,去尝试加锁的线程,在同步代码块内发现runState的信号位被释放,就当即唤醒全部阻塞的线程,但若是此后没有新的加锁线程进入呢?或者没有人将runState的信号位取消呢?
因而做者仔细查看了runState的设置,没有任何一个地方显式地将信号位释放,所以一度判断在awaitRunStateLock方法中不会有任何线程去唤醒其余等待加锁的线程.
因此在上述方法内部这个底层的层面是不能解决线程锁死的.但在加锁lockRunState和解锁unlockRunState两个方法的调用者处来看,每个线程都是先加锁后解锁,而且在加锁后记录当时的rs,解锁时尝试用该rs去cas,若能成功,说明本身加锁到解锁期间没有任何线程尝试加锁(尝试加锁不成会修改runState致使cas失败).所以直接cas释放就好了,但若是发生了其余线程在它解锁前的加锁阻塞,则前一个线程在解锁时会cas失败,所以将强制转换为本身加锁时记录的rs去除锁标识位的结果(思考:这个结果会不会包含信号位?)并唤醒等待线程.
涉及到信号位的状况有明显的逻辑暗坑,做者在注释中提出大量问题(好比是否是一旦设置了信号位就不会取消,确实代码中没有明显的信号位取消代码),若是不看接下来的这一段,极可能会认为道格大神终于写出了bug.这一块要费大脑分析,前面说过,没有看到任何地方去明确释放信号位,好比runState=rs&~RSIGNAL或者相应的unsafe机制.并且这个机制的安全性不能自我保证,它取决于调用者的实现,此处还未讲到相关代码,鉴于读者可能与做者有一样的急于寻得这个问题答案的心情,咱们先不去罗列更冗长的加解锁使用代码,而先用语言来形容这个过程,原来这也是一件实现很巧妙的事:
开始状态,假定有多个线程去尝试加锁,操做完再解锁,咱们来分析一波,看看信号位是否会重置.
1.首先,第一个线程毫无疑问会读取到原来的runState,并把它赋给一个临时变量rs(有关代码后面会贴),而后在一系列操做后,将rs去除锁标识位(哪怕它根本没置过锁标识位),做为newState调用unlock.
2.若是在第一个线程加锁到解锁期间,rs从未变过,那么该线程会直接cas成功.
3.若是第一个线程加锁到解锁期间,有n个线程尝试加锁,那么他们会阻塞在lockRunState方法的同步块(忽略前面提到的自旋等操做),而且会更改runState到新的值(加信号位),此时第一个线程记录的rs会成为脏数据.因此当第一个线程去解锁时(由于做者看到全部加解锁都是成对出现的,不会有后续的线程在第一个线程解锁前去尝试解锁),cas会失败,它会将本身原来脏数据记录的rs去除锁标识位后强制设置为runState的新值,而后唤醒阻塞的线程.由于它没有阻塞过,设置原来的rs不会包含信号位,至关于清除了阻塞线程设置的信号位.
4.由于刚才cas失败,阻塞的线程在同步块中被唤醒,阻塞在synchronized外的线程进代码块double check发现rs中已经没有信号位了,也帮助唤醒其余线程.
5.阻塞的线程被唤醒后(4中已经进入if后的wait线程)开启了新的循环,有的竞态成功并返回了ns(它与第一个线程返回值的差距是锁标识位是1,但它一样没有信号位),其余线程再次竞态失败阻塞,并更改了runState(加上了标识位).
6.此时可能有(4)被唤醒的某些线程(4中lockRunState进同步块判断)发现runState仍是有信号位的(由于早于它唤醒的至少两个线程,一个成功,一个失败再次设置信号位),它进入if并wait.
7.不论后方阻塞了多少个线程,5中成功的线程再次尝试使用本身在lock成功时返回的ns(rs)去除锁标识位去cas掉当前runState,由于后面有竞态,显然它也会失败,那么强制使用本身前面保存的,干净地未受信号位污染的rs在去除锁标识位后替换了已经被阻塞者设置的runState.而后再次进入唤醒操做.
显然,加锁解锁必须成对出现,仅有加了锁成功的线程才能够解锁,本身加锁本身解,只解本身的锁.万古定律在此也要坚持.
道格大神逻辑强大,做者却不善描述,寥寥几行代码,区区一个信号位问题,却用了上面一整大段来论述.感叹一下差距.
接下来终于能够看工做线程的建立,注册入池和从池中卸载的过程.
//建立工做线程的代码.尝试工做线程的构建和开启,它假定已经有别处维护了预留的增长总数, //建立和启动过程当中出现任何异常,就执行工做线程的卸载.建立线程成功返回true. private boolean createWorker() { //前面构造器传入的factory. ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //建立线程成功 if (fac != null && (wt = fac.newThread(this)) != null) { //启动该线程. wt.start(); //启动也成功,返回true return true; } } catch (Throwable rex) { //出现异常,保存 ex = rex; } //用前面的异常卸载. deregisterWorker(wt, ex); //返回false. return false; } //createWorker的调用者tryAddWorker //尝试添加一个worker,并在完成前增长ctl里面记录的数量(还记得前面的AC参数吗?),但这个 //增长过程是否进行决定于createWorker返回的true仍是false.参数c是一个进入控制信号ctl. //它的总计数为负且没有空闲worker,cas(增长ctl)失败时,若ctl未改变,则能够刷新重试. //不然说明被添加了一个worker,那么它也就不须要再继续了.从方法的实现上看,c彷佛能够传任何值, //若是c传入的值不等于当前ctl,则会多一次循环重读ctl到c. private void tryAddWorker(long c) { boolean add = false;//1.标记add成功与否. do { //2.根据参数c生成一个新的ctrl/nc.c源自参数或者某一次循环读取的ctl. //nc的值计算结果:c加上一个活跃单位1<<48,并对结果保留前16位, //c加上一个总数单位1<<32,并对结果保留第二个16位(33到48位), //nc等于上两步的结果和.显然,nc的后32位所有是0. long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); //3.上述计算过程当中ctl未改变. if (ctl == c) { //4.阻塞加锁并判断是否已在终止. int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //5.加锁成功且未终止,尝试cas掉ctl. add = U.compareAndSwapLong(this, CTL, c, nc); //6.加锁成功,不论cas ctl是否成功,解锁. unlockRunState(rs, rs & ~RSLOCK); //7.若是已stop,break退出添加worker的步骤. if (stop != 0) break; if (add) { //8.加锁成功,cas也成功,线程池未进入终止流程,建立worker. createWorker(); //建立成功当即break. break; } } //9.在2到3中间计算nc过程当中ctl改变,或4处未终止且5处未成功cas,则c读取新的ctl,并判断是否还要添加worker(新ctl存在ADD_WORKER位)且新ctl的后32位不存在数值. //这时为前32位发生了变化,只能在新一轮循环处理.注:ADD_WORKER位是第48位,前面已提到.它是TC_MASK能负责的最高位. } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); }
前面是尝试建立worker和决定建立worker的调度流程.
显然,createWorker没有什么可冗余介绍的,只须要了解上一文中的工做线程和线程工厂相关知识,很明显,它不维护线程数量等规范,只负责在失败状况下解除建立完线程的注册.
核心的逻辑在tryAddWorker中.
显然,tryAddWorker支持传入非ctl的参数c,代价是必定会多一次循环.若是传入的c就是方法外读取的ctl,且未发生竞态,效果一是少一轮循环,效果二是不去判断while循环的条件,即不须要ctl具有ADD_WORKER标记位且ctl的后32位无值.
整体流程:
1.根据传入的c,对它加上active 数单位,总数单位,并去除整数位,获得的结果即nc.
2.判断一次ctl和c是否相等,不相等可能一是竞态,可能二是传参错误.
3.ctl和c相等,阻塞加锁(前面分析过,lockRunState必定要阻塞到加锁成功为止,所以本方法不会让加锁失败的线程去释放锁),前面说过,runState加锁这一块,本人暂时描述其锁定的对象为"运行时状态",可见,工做线程也是运行时状态之一.
4.加锁成功且加锁过程当中没有开始终止,尝试将ctl用cas设置成nc,这样至关于同时增长了一个活跃单位和一个总数单位,并忽略掉整数位.
5.若是加锁期间(3)发生了线程池终止则退出,若是加锁并设置nc成功,则建立线程.
6.在2判断条件不成立时,或4中cas失败,重初始化ctl并验证是否符合循环开始条件,即ctl知足后32位为0且存在ADD_WORKER信号位.若不知足这个条件则直接退出循环,终止添加尝试.
显然,此方法容许传入一个非法的c,只要符合循环条件,它会在白白计算一次nc后开始第二轮循环(2条件失败),while条件后置,而6处的while条件比2处要更加严格,显然,当传入的c就是ctl且在计算nc期间未发生改变,则有机会成功cas掉旧的ctl并建立工做线程,即便不知足ADD_WORKER条件和整数位全0的条件.这是为第一次循环单独开的绿灯,也是do while循环的缘由.那么这个"第一次"的绿灯究意为什么而开?咱们稍后分析.
还有6中的循环条件的32位整数问题,咱们也稍后分析,循环条件的另外一个特殊意义:除第一次循环参数传入c的状况外,其余若干次循环可以成功添加worker(突破2的验证)的条件是前32位不变,显然这取决于其余线程是否成功改了ctl到它计算的nc(这个操做会同时改变前16位和前17-32位).
至于ADD_WORKER位,显然涉及线程个数,到此须要回忆一下最开头列出的构造器.它是第48位,默认是1,显然每添加一个worker,会在第49位和33位加一个1,添加到parallel个时,ctl将归0(重点是tc高位将由1变0),而48位也会由1变0.(没有其余方法干扰的状况)
显然,ForkJoinPool的几种公有构造器最终须要依托一个私有构造函数进行构造,而它的构造器没有显式调用父类的构造方法,构造器参数也没有coreSize之类的能决定核心线程和最大线程数的有关设置.
是否是能够理解为构造ForkJoinPool时传入的并行度值就是线程池的最大线程数量?
不严格说,能够这样理解.但咱们已经讲完tryAddWorker方法,显然,只要在方法外读出c=ctl,并调用tryAddWorker(c),只要此时没有其余线程修改ctl,也没有人终止线程池,显然必定会成功,随后我再次读取新的c=ctl,往复这个过程,每一次都会成功,而实际上的线程数早已大于并发度.
到此终于能够简单理解ADD_WORKER和TC_UNIT,AC_UNIT了.ac字面意思是活跃线程数,TC则是总数,所以用tc的最高位(48)表示,当加入的超过总数,它会溢出.
接下来看线程注册入池的方法.
//将线程注册入池,同时返回一个WorkQueue,前一篇文章中提到过此方法,工做线程会在内部记录这个队列. final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; //设置成守护线程,这样保证用户线程都已释放的状况下关闭虚拟机. wt.setDaemon(true); if ((handler = ueh) != null) //构造器提供了异常处理器 wt.setUncaughtExceptionHandler(handler); //构建工做队列. WorkQueue w = new WorkQueue(this, wt); int i = 0; //取出池config的17-32位保存为mode, //前面提过,config在构造时由并行度(后15位)和模式(第17位)表示,根据17位是否有值决定FIFO或LIFO. //这个与运算进行后,至关于滤掉了并行度信息. int mode = config & MODE_MASK; //建立完队列以后要加锁,尤为后面涉及到可能的数组扩容拷贝,以及一些判断和重设随机数等. int rs = lockRunState(); try { WorkQueue[] ws; int n; //前面构造器咱们看过,没有初始化workQueues,因此若是一个线程此时来注册是被忽略的. //显然,使用它们的方法必定作了相应的保证.咱们后续再看. if ((ws = workQueues) != null && (n = ws.length) > 0) { //队列非空,递增种子. //indexSeed值是0,SEED_INCREMENT是每次相加的增量,它的值默认是0x9e3779b9(2654435769) //这是一个特殊的值,它的使用不只此一处,后面稍微介绍.这样减小碰撞可能性. int s = indexSeed += SEED_INCREMENT; int m = n - 1;//ws数组长度-1,数组长度必定是偶数(后面介绍). i = ((s << 1) | 1) & m;//奇数位i. if (ws[i] != null) { //知足这个条件就是发生碰撞了,i已被占用.初始化probes为0 int probes = 0; //定义步长,数组长度不大于4,步长2,不然取n一半的第2至16位的结果(偶数)再加上2做为步长. int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; //开启循环,每次对i加上步长并与m求与运算,直到无碰撞为止. while (ws[i = (i + step) & m] != null) { //每次循环增长probes,表示对同一个数组最多只循环n次,达到次数要进行扩容重试. if (++probes >= n) { //当前数组已经尝试n次,尚未找到无碰撞点,扩容数组一倍,原位置拷贝. //此处没有任何加锁动做,与循环以外建立好队列以后的代码共享一个锁,也是lockRunState //可见只有指派索引相关的动做才须要加锁. workQueues = ws = Arrays.copyOf(ws, n <<= 1); //重置条件. m = n - 1; probes = 0; } } } //s值保存给队列的hint做为随机数种子.可见,此处至少可说明每一个注册线程时建立的队列都会有不一样的hint,它也算是一个标识. w.hint = s; //队列的配置,i与mode取或,mode只能是0或1<<16 //这个结果是将mode可能存放在队列config的17位,从而和池中的config在模式这一块保持一致. //i必定是一个不大于m(n-1)的奇数,而n必定不超事后16位(后面叙述),它和mode互不影响. //故队列的config至关于同时保存了在池的workQueues数组的索引和所属池的FIFO或LIFO. w.config = i | mode; //初始化scanState,以奇数i(索引)当值.至关于发布了初始屏障. //(不理解?参考runState方法,一上来就将它的末位置0成偶数) w.scanState = i; //新建的队列置于i处. ws[i] = w; } } finally { //解锁. unlockRunState(rs, rs & ~RSLOCK); } //线程名,这里看到一个有趣的事,无符号右移,i必定是个奇数,假定右移后的值是j,则2*j=i. wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); //返回队列给线程. return w; }
如今回过头来梳理一下registerWorker方法.
方法自己并不难理解,代码也较检查,难处在于要串起来前面全部有关内容.包含ForkJoinPool的构造器,包含ForkJoinPool中的config究竟是如何存放的,它进一步影响了新建队列的config初始化.
咱们总结一下有关信息:
1.把线程注册入ForkJoinPool,首先在整个方法的周期内,有一些咱们不须要重点费时间的杂项.如守护线程,异常处理器等设置,紧接着它当即为线程初始化了一个工做队列.
2.在1完成以前,不会进行加锁操做,但后续涉及到将1初始化的工做队列加入到ForkJoinPool内部维护的工做队列数组(workQueues)中,须要为它计算一个索引,而且应对可能的扩容操做,而这些步骤均须要加锁进行.
3.在加锁状态下,也能过了池内队列数组的非空验证,注册方法会尝试使用全局的indexSeed 递增 SEED_INCREMENT来肯定一个变量s,并将2s+1与数组长度减1进行与运算肯定索引值.其中,每一个线程都只会为indexSeed加上一次的SEED_INCREMENT,这也是为了减小冲突,而与workQueues数组长度(必定偶数)减1的与运算结果必定会是一个限于后16位的奇数,这说明,当前线程注册入队时的队列只会放入到ws数组的奇索引上,注册方法最后的代码在对线程取名时,也将线程的号进行了索引位的无符号右移,这也侧面说明了这一点.
关于SEED_INCREMENT,简单说一下,最简单的说法就是just math,纯粹的数学了,用它是减小冲突的一种办法,毫无疑问,s必定是n倍的SEED_INCREMENT,而2s+1与一个奇数进行与运算必定很难冲突,可是个中数学原理,做者倒是无从理解,也但愿有路过的大牛帮助解释.另外SEED_INCREMENT并不是使用一处,至少在咱们曾经数次一瞥的工具ThreadLocalRandom中有所使用,咱们曾屡次见到使用ThreadLocalRandom生成随机数,好比这段代码.
//前面看过此方法. static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else { //原来secondary是0,localInit一个值. localInit(); if ((r = (int)UNSAFE.getLong(t, SEED)) == 0) r = 1; // avoid zero } UNSAFE.putInt(t, SECONDARY, r); return r; } //localInit static final void localInit() { //生成器是一个AtomicLong,从0开始,每次加入PROBE_INCREMENT. int p = probeGenerator.addAndGet(PROBE_INCREMENT); int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); UNSAFE.putInt(t, PROBE, probe); } //初值 private static final int PROBE_INCREMENT = 0x9e3779b9
显然,它也出现了一个从0开始递加的初值,用于后续规避冲突,而这些数是数学中的魔幻,PROBE_INCREMENT与SEED_INCREMENT是同一个值:0x9e3779b9.
做者对这个值十分陌生,也没法从数学概念上对它作出过深理解,它为何能保证n倍的结果乘2+1后可用于在与运算中规避冲突?只能强行理解了.
顺嘴一提,高并发下的两大神器:LongAdder与DoubleAdder,它们会用到一个PROBE,这也与它有关,在做者所在公司的分布式主键生成器中,其实底层也是使用了它.继续回到注册线程.
4.当发生碰撞时,i的递增步step也有神奇的算法,把n超过16位的那一半拿出来加上2,再去除到奇数属性,就是步长了,这意味着步长至少是2(在n不超过16位的状况,已经够大了,步长只有多是2),显然,n在足够大(大于16位)前步长恒定为2,大到16位以上时,步长会随着n变大而变大.仿佛像中学时代的分段方程,或许这样描述更好.
step={n<16位 2;n>16位,前16位的值+2去除奇偶位}
在计算好步长后,紧接着i=((i+step)&(n-1)),显然,step大到16位以上,那么它的前16位将会在新的索引i上有所体现.这个体现也许有什么美妙的数学价值.
5.每当出现索引冲突,依旧重复34且记录循环次数,当循环次数达到n时,由于加了运行状态锁,线程能够放心地操做ForkJoinPool的状态,故能够扩容.当不冲突时,将i(一个奇数)赋给队列的scanState,意味着它初始没有人标记为正在扫描,将队列的config保存i和mode.最后解锁并返回队列给线程.创建相互引用.
注册入池的简短方法至此就简单分析完毕了.它留给咱们一些疑问:
1.为何注册入池的线程生成的WorkQueue只占用ForkJoinPool的workQueues的奇数位?这与工做窃取是否有关?
2.scanState的初始化终于明了,是一个奇数,且它也与前面的SEED_INCREMENT有关,它是否在后面是否也发挥了做用?
3.前面设置的异常处理器的做用?
4.此处解决碰撞,计算队列位于数组的索引等都依赖于workQueues的初始大小(每次扩一倍),它的初始大小又是如何肯定的?建立时机?显然在它未初始化的前提下,队列入数组会静默入池失败,可是队列却成功建立并返回给线程了.
这些问题须要暂时记在大脑,当看到相应的方法时再予解释.
//解除注册操做.它是一个要终止的工做线程的最终回调,或者建立失败时也会回调.在介绍ForkJoinWorkerThread和前面createWorker时提过. //这会从数组中移除worker记录,调整数量.若是池已经处在关闭进行中,尝试帮助完成池的关闭. //参数wt是工做线程,构建失败会是null,ex是形成失败的异常.它也能够是null. final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { //1.处理队列从池的队列数组中的移除. WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { //存在工做线程且该工做线程有队列的逻辑. WorkQueue[] ws; //前面说过,队列的config后16位表示索引,第17位表示mode. int idx = w.config & SMASK; //加运行时状态锁. int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) //简单的置空操做. ws[idx] = null; //解锁. unlockRunState(rs, rs & ~RSLOCK); } //2.处理控制信号中保存的数量. long c; //循环直到减数成功. 哪怕有别的线程在竞态减小,当前方法也要在新的ctl中减小数量. do {} while (!U.compareAndSwapLong //第49位减1. (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | //第33位减1. (TC_MASK & (c - TC_UNIT)) | //保留后32位. (SP_MASK & c)))); //3.该工做线程有队列,且已经在1出了数组. if (w != null) { //把队列锁设定负数. w.qlock = -1; //把队列中记录的偷取任务数加到池中.前面已论述过此方法. w.transferStealCount(this); //取消队列中全部存活的任务. w.cancelAll(); } //4.进入循环尝试帮助关闭池或释放阻塞线程,补偿线程等. for (;;) { WorkQueue[] ws; int m, sp; //4.1 tryTerminate后面介绍,第一个参数true表明无条件当即结束,第二个参数true //表明下次tryTerminate将能够结束. if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) //进入if,说明正在结束或已结束,没什么可作的了. break; //4.2控制信号后32位有数值,进尝试释放逻辑.这不是第一次看到ctl的后32了.对于ctl的前32位, //咱们已经经过构造函数和前面的代码说明,它初始化时与并行度有关,并在后面存放了添加worker数量 //的值(但不能说存放了并行度,由于添加worker会改变相应的位),后32位的真相也开始浮出水面, //在前面的tryAddWorker中,第二轮及之后的循环条件要求后32位不能存在值.并且添加成功也会 //将后32位置0,故tryAddWorker的第一轮循环会清空后32位,与此有所影响. if ((sp = (int)(c = ctl)) != 0) { //后32位有值,尝试release,tryRealease方法会将activeCount数量添加第三个参数的值, //若是第二个参数表明的队列是空闲worker的栈顶,则释放其内的阻塞者. if (tryRelease(c, ws[sp & m], AC_UNIT)) break; //仅有此处释放失败的状况下,开启下一轮循环,其余分支均会退出循环. } else if (ex != null && (c & ADD_WORKER) != 0L) { //5这次解除注册是由于异常,且当前添加worker信号依旧知足,则添加一个worker代替原来并退出. tryAddWorker(c); break; } else // 6.不须要添加补偿worker,退 break; } if (ex == null) //前面记录的异常不存在,帮助清理脏异常节点. ForkJoinTask.helpExpungeStaleExceptions(); else //存在异常,重抛. ForkJoinTask.rethrow(ex); }
deregisterWorker逻辑并不复杂,把队列移出池,减小count,清理queue中的任务,此处又见到了WorkQueue中的另外一个属性的使用,qlock.显然qlock值取-1时,表明队列已经失效了.
队列移除后,方法还会尝试作一些非本职工做.如尝试终结线程池,知足条件则退出循环(显然每一个线程的卸载都尝试触发池的终结);线程池未进入终结过程,则尝试释放parker的逻辑(若是有),尝试成功也会退出循环,此两种状况(tryTerminate或tryRelease)会形成忽略了异常等信息,只有在二者均未成功的前提下,前去考虑参数中的异常.异常的处理逻辑很简单,存在即重抛.
//若是当前活跃线程数过少,尝试去建立或活化一个worker. //参数ws是想要找到被唤醒者的队列数组(也就是任何一个ForkJoinPool的成员变量), //参数q是个非空的队列,则方法只尝试一次,不会重试. final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { //1.添加worker步骤 //ctl小于0,表示active的太少.但彷佛也只能最多加上并行度的数量. if ((sp = (int)c) == 0) { //取ctl的后32位,终于,终于看明白了,这里有一个注释,sp==0表明无闲置worker. //但不表明后32位所有与闲置worker有关. if ((c & ADD_WORKER) != 0L) //ADD_WORKER位有值,说明总worker数量未达到. //通过三重关,添加worker. tryAddWorker(c); //知足添加worker的第一个条件,无闲置worker,不论有没有成功建立新的worker,就都必定会退出循环. break; } //2.不存在空闲worker,验证不知足唤醒流程的状况. if (ws == null) //2.1队列数组都还没初始化,显然池不知足条件. break; if (ws.length <= (i = sp & SMASK))// // 2.2队列数组长度不大于ctl后16位.说明已进入终止态.退出(多像数组的length必定要大于索引) //又一次大揭密,ctl后16位彷佛与队列数组的长度有关,并且存放的是一个索引. //此处隐含条件,ctl后32位不是0,将它的后16位取出来当索引i,要结合1处的条件. break; if ((v = ws[i]) == null) //2.3队列数组长度正常, 使用索引(ctl的后15位)从ws中取不出队列. //说明正在终止,退出. break; //3.知足了唤醒流程. //3.1计算数据,第一个为下一个scanState,在前面的addWorker流程,咱们看到 //scanState的第一个值是在队列数组中的索引.显然索引不能乱变. //新的scanState值计算,老ctl的整数位在17位加1(SS_SEQ)再取它的后31位.显然每次被唤醒都会走一次这个逻辑. int vs = (sp + SS_SEQ) & ~INACTIVE; int d = sp - v.scanState; //屏蔽没必要要的cas. //3.2计算nc,它用老的ctl加一个活跃位(48位),而后只取出前32位,对后32位取出队列v在上次扫描时存放的值(也是当时ctl的后32位). //这里咱们又见到一个熟人:stackPred,接下来会有重要的方法使用它. long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); //3.3d是0说明ctl的后32位相对于原来v中存放的scanState没有变化,那么也就不须要cas. //d不是0,须要cas,用nc替换掉c. if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { //把v的scanDate置换成vs,激活了v. v.scanState = vs; if ((p = v.parker) != null) //有线程阻塞,唤醒. U.unpark(p); //激活成功,退出循环. break; } //4.上述过程没有成功,看q是否提供,若是提供了不循环第二次. if (q != null && q.base == q.top) break; } }
signal这个方法自己逻辑并不复杂,重点在于咱们的几点意外发现.
1.ctl的后16位原来能够表示非终止状态下线程池中的一个WorkQueue的索引.
2.scanState每次singal知足唤醒流程都会被尝试置换(新值取决于sp,会在它的17位加1,并只取后31位.),前提是此时ctl的后32位(sp)与v中的scanState一致(差值d为0)或可以替换ctl为新值.置换新的scanState前会根据d来决定是否更换ctl,如有改变则cas掉ctl,将它替换成新值(原值增长了一个活跃数,并将后32位置为v中上次scan保存的ctl后32位),d是0或d不是0且置换ctl成功,方才去将v的scanState换为vs.这一步成功,发现有阻塞线程则唤醒.
3.参数q全程酱油,惟一做用是信号,上述过程出现失败,如cas不成功等,它非空则不循环第二次.
4.此处也是首次发现只增长活跃数不增长线程总数的状况
此方法的执行只有三种结果:第一是知足活跃数/总数未达到最大,且无闲置数,则建立worker;第二是前一个不知足,且未在终止周期,知足尝试唤醒的条件,会尝试唤醒一个阻塞线程(它是ctl控制信号后32位索引去ws中取的w);第三种状况,不知足前两个条件,也不知足进入下个循环的条件,至关于什么也没作.
下面先来看tryRelease方法和runWorker方法,逐个分析未知字段的含义.
//唤醒并释放worker v(队列),若是它处于空闲worker栈的顶部,此方法是当至少有一个空闲worker //时的一个快速唤醒方式. //它的参数,c应当传入此前读取的ctl,v是一个工做队列,若是不传空,应当传一个worker过来,inc表明活跃数的增长数. //若是成功释放,则返回true. private boolean tryRelease(long c, WorkQueue v, long inc) { //相似上面的signalWork方法的计算方式,sp保存ctl的后32位,vs为队列v的下一个scanState. //值依旧是sp在17位加1并只取结果的31位. int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; //判断是否知足条件,v存在且v的scanState是sp //(言外之意sp保存的是一个v的scanState,别急,咱们离真相愈来愈近了,注释说此条件表明v是栈顶) if (v != null && v.scanState == sp) { //知足了前述的条件,v是当前"栈顶",这个栈顶的含义有些奇怪,没有栈,何来栈顶?别急. //计算新的ctl,算法同上,老ctl加上inc的结果的前32位给nc的前32位,v保存的stackPred做为nc的后32位. //在前面deregisterWorker中,tryRelease方法传入的inc为一个AC_UNIT.至关于增长一个活跃数. long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred); //尝试用前面计算的结果更新为新值. if (U.compareAndSwapLong(this, CTL, c, nc)) { //控制信号成功更新为nc,则将v的scanState保存为vs. v.scanState = vs; if ((p = v.parker) != null) //存在parker,唤醒. U.unpark(p); return true; } } return false; }
在deregisterWorker方法中咱们简单提过tryRelease方法(有调用,在完成),回忆一下该方法在这一块的逻辑,在尝试解除worker注册时,会在完成解注册自己操做后尝试tryTerminate,若是执行后未进入terminate流程,则进入tryRelease,若是抛去中间这些杂在其内的步骤,能够粗放理解为释放一个worker会尝试一次释放,并且条件是sp(即ctl的后32位)非0,表明有"空闲"线程须要释放,并且会用sp做索引调用tryRelease,试图释放掉栈点worker并增长一个活跃数.
显然,tryRelease方法有一个重大的不一样,它彻底是一个"try",对于释放操做只尝试一次,真正重要的操做是否进行彻底取决于一个对ctl的cas,只有它成功,才会进行scanState的更新和线程的唤醒.所以该方法是一个不须要加锁更改ctl的方法.
结合前面总结过的若干方法,咱们再次理解ctl,scanState等字段.
1.ctl的前32位与线程池中的线程数有关,而在这前32位中,前16位是活跃数,后16位记录总数.
2.ctl的后32位稍有些复杂,第32位是表明不活跃的字段(INACTIVE&它不是0).同时它的后16位又能够表明某个队列在队列数组中的索引(signalWork方法中尝试取出它的后16位并从队列数组中取值),奇特的是它的后32位彷佛也能够表明某个队列在队列数组中的索引,并且这个还应该是"栈顶",在deregisterWorker方法中直接用了它的后32位做为索引.
可见,ctl的真相愈来愈近了,但还不够近.
3.队列的scanState也愈来愈明显了,它的初值绝对是在当前队列在池中队列数组的索引(前面register方法中直接将i给了v.scanState),每次的释放的更新则是直接忽略掉sp的首位(不活跃位,即32位置0)并在17位加1.那么此时它还能表明索引吗?
关键点在于,scanState的后16位会不会增长,其实最终的答案将在externalSubmit方法中获得一个重要的补充,你会发现,线程池中的workQueues大小初值绝对不会超过16位,同时在上面的registWorker方法中每当出现冲突碰撞,会尝试对workQueues扩容一倍,并且它并未作出限制,可是registerWork方法须要经tryAddWorker方法建立线程,再由线程对象调用它注册入池,而后你会发现tryAddWorker只能由前面已介绍的deregistWorker与signalWorker调用,后者限定它可以成功的前提是均是sp=(int)c==0,也就是说,只有后ctl的后32位无值才能够添加worker(添加worker时后32位直接被无视,结果直接置0)前者限定为不满后者的条件时,必须是异常的状况且知足ADD_WORKER位未被重置,那么它或许有一次机会添加worker,一旦失败,在tryWorker的循环第二次将不成功.
绕了这么半天,仍是在上一段的开头,就是为了说明scanState既然初始值是一个只有16位的奇数(add worker成功时初始化,非worker的队列还未介绍到),尽管在每次release都在17位尝试加1,目前来看,并不影响它保存本身的索引(后16位),问题就在于这后16位是否足够,每次添加worker,确定要占用一个新的索引,而添加worker完成并在注册时发生冲突必定次数后会扩容(tryAddworker→registWorker),前面论述过,externalSubmit方法将workQueues数组初始化一个坚定不大于short型长度的数组(后面论述externalSubmit会简单再提),而从signalWorker调用tryAddWorker时,ctl的后32位必须为0,从deregisterWorker中虽然容许第一次不判断int(c)==0,但它实际上已经卸载了一个worker.那么至少从目前来看,在tryRelease和signalWork方法中使用ctl的后32位去计算新的scanState,隐含的意思彷佛是ctl的后32位此刻包含了"类"原来的scanState,并且新的scanState只用ctl的后32位的首位和17位,潜台词彷佛就是此时的ctl的后16位绝对能表示v的索引,不然咱们没法用这个新的scanState(vs)去从数组中找到队列.
提示:必定要结合上面signalWork源码注释中的(2.2隐藏条件),综合tryRelease的注释要求参数v是"栈顶",调用它的deregisterWork方法传入的v是vs中索引为sp&(n-1)所得,所以大胆的推测已经成熟.
1.ctl的后16位若是有值,将会是"栈顶"队列在ws中的索引.它被用于释放栈顶资源(如parker)时找到queue,以及对queue中的scanState若干步骤的从新设置,而从新设置时也不会影响后16位.所以索引在注册依始就不会再变,扩容也不会改变它的位置.
2.scanState字面意思是扫描状态,ForkJoinPool还有scan方法未介绍.
那么,就用后面的代码来验证这个绕脑的推理吧.
//runWorker方法是线程运行的最顶层方法,它由ForkJoinWorkerThread在注册成功后调用,也是所有生命周期. final void runWorker(WorkQueue w) { //1.一上来初始化数组,前面说过,WorkQueue内部的任务数组初始化是null. w.growArray(); //2.用seed保留构建时的hint随机数,在registerWorker方法中曾介绍过, //会有一个随机数s是保证每一个队列不一样的,且其中有一个每次增长一个值的成份,该值是个数学中很奇异的数字. //而hint的初值即这个s,它同时也被用于肯定队列在ws中的索引,间接决定是否扩容. int seed = w.hint; //3.初始化r,并避免异或时出现0. int r = (seed == 0) ? 1 : seed; //4.循环. for (ForkJoinTask<?> t;;) { //5.尝试"scan"(终于出现了有没有)队列w,使用随机数r if ((t = scan(w, r)) != null) //6.scan到了一个任务t,则运行它.这是进入一个任务处理的主流程,前面已介绍过WorkQueue的runTask方法. //回忆一下,它会在过程当中把scanState标记为忙碌. w.runTask(t); else if (!awaitWork(w, r)) //7.scan不到,尝试等待任务,若是等待过一段时间还未等待,进入8重置r,继续下轮循环scan.若awaitWork返回false表明应break结束worker. //关于awaitWork的返回咱们后面详解. break; //8.或许只能说just math. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; } }
runWorker是线程run方法中直接调用的,进入业务主逻辑,也结合了前面的runTask方法,初始化数组等方法和将要介绍的与scan有关的逻辑,还再一次用到了那个魔法数字.
传给scan方法的r每一次循环(scan成功并运行,又await到了新的任务)都会从新赋个新值,做者看不懂新值的算法,但在咱们即将去了解的scan方法中使用了r来计算索引,所以做者更关心它的奇偶性.
很明显,r不论初始是奇是偶,新计算的r值能够是奇数也能够是偶数.也就是说,使用r&(n-1)取出的ws中的一个WorkQueue,多是线程注册时生成的一半之一,也可能不是.
接下来介绍scan方法.
//尝试扫描队列,并偷取一个"顶级"的任务.扫描开始于一个随机位置(与r有关),若是在扫描过程当中发生了 //竞态,则移动到一个随机的位置继续,不然线性地扫描,这个过程持续到在全部相同校验和 //(校验和的计算会采样每一个队列的base索引,而base索引会在每次偷的时候移动)的队列上有两次 //连续的空传递,此时worker会尝试对队列进行灭活并从新扫描,若是能找到一个task,则尝试从新 //激活(从新激活能够由别的线程完成),若是找不到task,则返回null用于等待任务.扫描过程应当减小内 //存使用,以及与其余正在扫描的线程的冲突. //参数w为目标队列,r是前面传递的种子,返回task或null. private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; //当前工做线程必须是已经完成注册的,即存在工做队列,且r&m能取得它的队列,不然直接返回null. if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //1.scan方法是在runWorker的循环中调用的,初次调用时,scanState的值是i(前面说过),是个非负值. int ss = w.scanState; //scan方法内部开始循环. 用r&m,即w的索引给origin和k,初始化oldSum和checkSum为0. for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //2.选择队列q存在的逻辑. if ((q = ws[k]) != null) { //2.1 目标队列q非空(自己base到top间至少存在1个,任务数组非空. if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { //计算任务数组的base索引(参考WorkQueue源码). long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //2.2数组中取出base对应task存在,base未改变的逻辑. if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //2.3 初始记录的scanState不小于0,表明存活的逻辑. if (ss >= 0) { //2.4尝试cas掉base处的任务,注意,必定只能从base开始,不会将任务数组中间的元素置空. if (U.compareAndSwapObject(a, i, t, null)) { //cas成功,更新base. q.base = b + 1; if (n < -1) //2.5发现队列q的base到top间不止一个任务元素,则唤醒它可能存在的parker. //重温一下signalWork的简要逻辑,ctl后32位0且知足加worker条件,tryAddWorker, //条件不知足(忽略终止等判断逻辑),则计算新的scanState(使用到原ctl的后32位)和ctl(使用原ctl的前32位和q的stackPred), //在cas为新的ctl成功的前提下,换掉新的scanState. signalWork(ws, q); //2.6 只要2.4成功,返回弹出的任务. return t; } } //2.7 从scanState看已是inactive的状况.尝试活化. else if (oldSum == 0 && w.scanState < 0) //tryRelease前面已介绍过.尝试释放掉栈顶,显然ws[m&(int)c]被视为栈顶,即ctl的后32位(严格来讲彷佛是后16位)表明栈顶的索引. //释放时对ctl的增量是一个AC_UNIT. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //2.8 只要没有进入2.4->2.6,重置origin,k,r,校验和等参数并开启下轮,但整个2工做线用不到,进入3工做线才有用. if (ss < 0) // refresh //可能会有其余抢到同一个队列的worker在2.5/2.7处重活化了scanState,所以当它是inactive的状况,重刷新一次. ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } //2.9校验和增长b checkSum += b; } //3.持续迭代到稳定的逻辑. //这个表达式大概能够理解,线性的增长k,每次加1,直到发现已经从一个origin转满了一圈或n圈. if ((k = (k + 1) & m) == origin) { //条件:scanState表示活跃,或者知足当前线程工做队列w的ss未改变,oldSum依旧等于最新的checkSum(校验和未改变) if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive //3.1知足前面注释的条件,且w已经inactive,终止循环,返回null. break; //3.2又是这一段计算和替换的逻辑,只不过ns(new scanState)要加上非active标记. int ns = ss | INACTIVE; // try to inactivate //3.3尝试计算用来替换的ctl,它的后32位为ss加上非活跃标记,前32位减去一个活跃数单元.(终于到这了,参考前面分析的ctl前32后32位,验证了) long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); //原来ctl的后32位存给队列的stackPred. //注意,此时w.stackPred和新的ctl的后32位都有一个共性,那就是它们的后31位均可以用来运算并计算得w在ws的索引. w.stackPred = (int)c; // hold prev stack top //3.4先把w的scanState换成ns,再用cas换ctl为nc. U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) //替换ctl成功,ss直接指向ns,省去一次volatile读. ss = ns; else //3.5替换失败,再把w的scanState设置回ss. w.scanState = ss; // back out } //3.6每发现回了一轮,校验和置0. checkSum = 0; } } } return null; }
scan方法是个比较重要的方法,结合前面提过的runWorker,再次提示scan方法是在runWorker中循环调用的,固然每一次都伴随着完成一波任务和等待新任务的到来.
单轮scan中也是处在循环的,没有竞态也找到了非空队列的状况下,显然会很容易从base处出队一个合法的任务,即从1->2->2.1->2.2->2.3->2.4(->2.5可选)→2.6,会返回正常查找的任务.
进入2表明某一轮循环开始时找到了队列,这以后主要有三条线,其中一条线是前面说的1->2->2.1->2.2->2.3->2.4(→2.5可选)→2.6的正常线.
第二条线,1->2->2.1->2.3->2.4->2.7(在2.4的cas时发生竞态失败),或1->2→2.1→2.2→2.7(在2.2判断任务为空或者已经有其余线程捷足先登地取走了base),最终结果是执行2.7的归置操做(出现竞态,随机改变索引).
第三条线,1->2->2.1->2.9(2.1处发现目标WorkQueue已是空队列),则增长一次校验和.
这三条线都可以忽略起点(1),由于能够在某次循环时从2开始.
任何一次循环到达3相应的有两处入线,第一条1->2->3(在2处发现ws数组的k索引处尚且没有队列元素,这说明连工做线程注册都没有完成;第二条线是基于前面的第三条线,即1->2->2.1->2.9→(即2.1失败,前面2的步骤,只要进入到2.1,就必定是return或者continue开启下一轮).
单次循环到达3的两条路线的区别是第二种状况是校验和会增长最初获取到该队列时读取的base值(2.1).
从到达3开始分析,有两条大分支,将取ws元素的下标k进行增1操做,判断是否已经完成了一轮(等于上轮记录的origin),未完成一轮,直接再开启一次循环(由于一开始就没读出WorkQueue,因此无竞态,线性加1),从上次记录origin到如今已完成一轮的状况进入3内的分支.
进入3,最后都会将校验和归0(3.6),也就是说每查完一整轮就会让校验和复位0.可能会根据scanState决定是否进入3.1,此处也有分线.
线路1,若当前队列q已是非存活态(scanState是负数且这轮循环未更新,且校验和在本轮循环中未改变),或发现队列的qlock已经被锁定负(前面讲过要灭活),直接break,执行路径3→3.1.
线路2,当q是存活态,由于已经找了一轮,没有意义再去找了,将q灭活,相应的计算新的ctl和scanState的逻辑与前面tryRelease/signalWork的方式正好相反,scanState直接加上灭活标记位并存为ns,且ns将交给新的ctl的后32位,新ctl的前32位则减去一个活跃单元,并把ctl原来后32位的状态存给q的stackPred,这也就是ctl后32位能表示当前"栈顶"的缘由.并且这一过程当中,涉及q在ws中的索引有关的值不受影响,依旧能够用ctl的后32位来找到它.前面说过,当进行release等操做时,能够将栈顶(就是用ctl后32来取)的stackPred取出复位,正是由于这个原理(灭活时存放了此前的ctl后32位).
形象一点理解:
1.灭活一个q,则将它的scanState加上非存活标记位(不影响后面的索引标识,它是32最高位),将ctl的后32位变化后存给它的stackPred,将ctl设置新值,前32位进行减活跃数的逻辑,后32位用新的ns来替换.
2.再灭活一个新的w,重复上一个逻辑,则新的w是"灭活栈"的栈顶,新w的索引会保存在新的ctl里,原ctl中存放的上一个q的索引被置为当前w的stackPred.
3.release或者signal,对栈顶元素有一个相应的操做,将它从新激活,会将它的stackPred通过反向算法交给ctl,而它本身的scanState又简单恢复成包含索引的合法结果(ctl后32位加一个标识位并去除非活跃位的1).release以后的ctl依旧可能存在非零的后32位(这取决于刚出的栈顶是否是栈底),一样signal中的tryAddWorker,只有在ctl后32位干净时才会调用,也说明了这一点.
注意,3.4对ss进行了加上INACTIVE标记位的操做,即令ss变成负值,但在3中并不会在此退出循环,下一轮循环中可能再次进入3.1知足了break条件并退出循环返回null,也可能进入2.9增长校验和,或者在下一次循环中进入2.8从新刷新runState,这时若是此前已经有别的线程在2.7进行了当前worker的释放或者tryRelease/signal等操做(共同的要求:当前worker此时是栈顶),会所以令下一次循环有机会从2.4返回.
终于串了起来,终于搞懂了WorkQueue之间这个又是数组又是栈的结构了.
显然,这种栈的实现方式真是够少见...它确实是个栈的结构,不过栈元素自身维护的数据须要不停地和外界(池)中的ctl进行后面位的交换.
到此,前面的难点与疑问终于清楚了.
同时也能够发现,scan方法,runWorker方法和前面WorkQueue的runTask等方法共同组成了"工做窃取"的调度机制,明显一个线程在注册入池后启动,每一轮大循环都会先从scan到一个task开始,获取不到直接awaitWork,获取到,则先执行task,再执行本身localTask,由于咱们提早介绍了ForkJoinTask和它的若干子类以及doug在官方文档中给咱们的用例,所以你们很容易理解这一点,一个任务在运行过程当中,极可能会有新的任务由它而生并入池,如今没有看到入池的源码,但在前面介绍过,ForkJoinWorkerThread来fork出的任务入池时是入本身的队列,外部线程提交入池的任务则是externalSubmit一类,这两部分的源码都会在后面介绍.
显然一个线程刚刚启动时,它的workQueue彻底是空的,相应的另外一个线程在scan时若获取了它的队列必然会忽略,当前线程也必须先从scan开始,从随机的队列中按必定的规则(冲突时重置一个随机的索引位置,不冲突但发现未注册worker等状况时直接索引加1)去偷取一个原始的任务,并且从base(先入任务)开偷,而后先运行这个偷来的task,再运行本身的本地任务,在运行该task时,可能就会fork出多个子任务入了本身的任务数组,所以再运行本身的本地任务时才有活可干,完成全部本地任务后,runWorker进入下一轮循环,继续scan->waitScan→runTask的流程.
接下来继续看awaitWork方法.
//字面意思,等待有工做可作. //它其实可能会阻塞一个worker偷取任务的过程,若是worker应当关闭则直接返回false. //若是worker已经处于非活睡在态,且引发了线程池的静寂,则检查线程池的终结态,只要当前worker //不是惟一一个worker就等待一段时间.若是等待超时后ctl未改变(前32位的数量信息未变,后32位的栈信息也未变), //则终止当前worker,它可能会唤醒另外一个可能重复这个过程的worker //参数w,调用者worker,r是一个自旋用的随机数种子,若是worker应当关闭,返回false. private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating //一个线程从注册入池起就有队列,若是它为空或者qlock被置为负(-1),应当终结. //前面提过,在deregisterWorker或tryTerminate时会将qlock置-1. return false; //初始化相关值,保留队列中保存的前一个栈,取出队列的ss,赋值自旋数.SPINS在前面分析 //运行状态加锁时介绍过,它的值当前就是0,参考awaitRunState方法,在等待runState锁的时候,也能够根据它先自旋. for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) //1.队列的scanState大于0,回忆一下,前面介绍tryRelease和signal中计算vs的方法,其中一步是与~INACTIVE,而INACTIVE是1<<31 //在前面的scan方法中已经遍历一轮且未找到task又未出现竞态未更改校验和的状况,会将scanState加上INACTIVE. //所以此处scanState忽然不小于0,说明是经历过相似tryRelease或signal的释放唤醒动做,退出循环等待. break; else if (spins > 0) { //2.当前未被活化,依旧处于INACTIVE态,则首先尝试自旋.使用r这个随机数来决定是否对自旋次数减1. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins //2.1自旋次数达到0时作了勾子操做. WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) //2.2自旋次数降到0时,若知足几个条件: //当前队列保存的栈下一个队列的索引(pred)存在,线程池队列非空,pred未溢出队列数组, //取出pred对应的ws的队列(它实际上是当前w在栈向栈底前进一个的元素,它存在说明当前w不是栈底. //若是该元素存在,且它没有阻塞者或它还保持active,则重置自旋次数,继续自旋. spins = SPINS; // continue spinning } } else if (w.qlock < 0) // recheck after spins //3.自旋结束后,再次检查w的队列锁,看它是否是已经被终止了.(deregisterWorker或tryTerminate). return false; else if (!Thread.interrupted()) { //4.若是当前线程还未被扰动. //目前咱们只在一个地方看到过线程扰动的状况:awaitRunStateLock,即当一个线程尝试去修改池的运行时状态,它会去获取一个runState锁, //获取失败,发生竞态,也通过自旋等辅助策略无效的阶段,则会尝试使用stealCounter来看成锁加锁,unlock时也会在确认竞态的状况下去用它唤醒. //而在awaitRunStateLock中阻塞的线程若是正在进行stealCounter.wait时,wait操做被中断,则会扰动当前线程,这将去除进入此分支的可能. //此外,tryTerminate自己也有扰动其余工做线程的步骤.若是用户不在相应的实现代码(如ForkJoinTask的exec函数或CountedCompleter的compute函数) //中手动去扰动当前工做线程,能够理解awaitRunStateLock的扰动事件可能与tryTerminate有关. long c, prevctl, parkTime, deadline; //计算新的活跃数,它是原ctl的前16位(负)加上并行度. int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // pool terminating //5.发现活跃数已降至0,尝试调用tryTerminate,方法返回true代表已终止或正在终止;或发现runState已经进入终结程序. //这两种状况直接返回false,线程执行完毕终止. return false; if (ac <= 0 && ss == (int)c) { // is last waiter //6.前面分析scan方法时讨论过,栈顶元素的scanState体如今ctl的最新后32位,它的stackPred则是ctl以前的后32位值. //进入6,说明当前worker是栈顶,即最后一个等待者. //用pred计算出以前的ctl. prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); //取ctl的17-32位,即worker总数. int t = (short)(c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) //6.1若是发现线程总数大于2,将ctl回滚,返回false让线程终止. return false; // else use timed wait //6.2计算deadLine和parkTime,用于后续的定时等待,暂不终结当前线程,而是做为parker. //IDLE_TIMEOUT最开始说过,它就是起这个做用的一个时间单位,把gc时间也考虑在内,默认为2秒. parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else //7.存在active的worker或当前w不是栈顶. prevctl = parkTime = deadline = 0L; //8.作线程停段的工做. Thread wt = Thread.currentThread(); //把当前线程池做为parker设置给线程,当使用LockSupport.park时,它将被看成一个参数传递(参考Thread类注释,在java方法签名处看不出来). U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport //设置parker. w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park //8.1从新检查非active.合格则停顿. U.park(false, parkTime); //归置. U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) //8.2停顿(或未停顿)重检查发现w被从新active,则退出循环返回true(非false表明不能终结当前线程). break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) //8.3发现没有时间了,ctl也未在等待的时间发生变化,将ctl设置为w入栈前的结果,返回false让终结此线程(相似出栈). return false; // shrink pool } } return true; }
awaitWork方法稍长一些,可是内容大可能是前面已经介绍过的字段,若是前面的有关方法和字段较熟悉,这一块不难理解.
同时,它也验证了我前面对runState,ctl和stackPred的猜想正确.
它的返回值也值得注意,从返回值来讲,它的做用也不止是简单的"等待操做",它返回false会形成线程的终结,而返回true时,runWorker方法会重开一轮,再一次尝试获取任务,
而返回true只能发生在两个break(1和8.2)检查scanState时,这说明w被活化.
接下来看一些与join有关的操做,这些操做大可能是由外部(工做线程以外,甚至线程池以外的线程)调用,也能由其余类(非ForkJoinPool,如已经介绍过的ForkJoinTask和CountedCompleter)进行调用和调度.
//帮助完成,调用者能够是一个池中的工做线程,也能够是池外的.在JDK8版本中,有三处调用: //1.CountedCompleter::helpComplete,该方法的调用由咱们决定. //2.ForkJoinPool::awaitJoin,等待结果的同时能够尝试帮助完成,只由池中线程调用,传入的队列是该线程的队列.该方法由ForkJoinTask的join/invoke/get调用. //3.ForkJoinPool::externalHelpComplete,用于外部线程操做,前面在CountedCompleter的文章已粗略介绍,传入的w为ws中用一个随机数与n-1和0x007e取与运算 //的结果,很明显,即便w不是null,也只能是一个偶数位的元素,这意味着w不会是registerWoker时生成的带有工做线程的WorkQueue.也就是不能帮助池中线程完成本身的队列. //本方法会尝试从当前的计算目标以内偷取一个任务,它使用顶层算法的变种,限制偷出来的任务必须是给定任务的后代,可是也有一些细节要注意. //首先,它会尝试从本身的工做队列中找合格的任务(用前面讲过的WorkQueue::popCC),若不能找到则扫描其余队列,当发生竞态时随机移动指针,依照校验和机制决定是否放弃 //帮助执行(这取决于前面介绍的pollAndExecCC的返回码).参数maxTasks是对外部使用的支持参数,内部调用它会传入0,容许无界的次数(外部调用时,捕获非法的非正数). //参数w,队列,在内部调用的状况下能够理解为当前线程的工做队列,参数maxTasks若是非0,指代最大的可运行的其余任务.退出时方法返回任务状态. final int helpComplete(WorkQueue w, CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int s = 0, m; //变量初始化和验证,队列和参数w必须非空才能进入if. if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && task != null && w != null) { //介绍popCC时曾专门强调过这个mode实际上是config. int mode = w.config; // for popCC int r = w.hint ^ w.top; // arbitrary seed for origin int origin = r & m; // first queue to scan //初始时赋h为1,在每一轮循环中,它取1表明正在正常运行,大于1表明发生了竞态,小于0将增长到校验和,表明pollAndExecCC达到了根元素. //详细参考前面论述过的pollAndExecCC. int h = 1; // 1:ran, >1:contended, <0:hash //初始化条件循环条件,记录origin的值,初始化oldSum和checkSum for (int k = origin, oldSum = 0, checkSum = 0;;) { CountedCompleter<?> p; WorkQueue q; //1.传入的任务已是完成的,break返回s(负). if ((s = task.status) < 0) break; //2.h未通过更改或经历过若干次更改,但在上一轮循环表明了pollAndExecCC成功执行task(h取1),则 //在当轮循环尝试对w进行popCC,并根据mode决定从base仍是top出队. if (h == 1 && (p = w.popCC(task, mode)) != null) { //2.1本队列有知足条件的任务,执行之. p.doExec(); // run local task if (maxTasks != 0 && --maxTasks == 0) //减小maxTask并在它降到0时break.(前提是传入了正数的maxTasks). break; //2.2没降到0,把origin和校验和参数重设为循环初始化的值. origin = k; // reset oldSum = checkSum = 0; } //3.某轮循环h表明出现竞态等问题或不能使用popCC方式从本地队列出任务执行.尝试从其余队列poll执行. else { // poll other queues //3.1 找不出任务,h置0,这将使它在随后的循环中不会再进入2 if ((q = ws[k]) == null) h = 0; //3.2尝试从q的base处poll并执行task.返回-1表明不匹配,对校验和增长h(负数). else if ((h = q.pollAndExecCC(task)) < 0) checkSum += h; //3.3 h大于0,多是等于1但popCC未成功的状况.也多是pollAndExecCC成功了一次或cas失败. if (h > 0) { if (h == 1 && maxTasks != 0 && --maxTasks == 0) //h是1减maxTask,当它达到0终止循环.(前提是没传了正数的maxTasks) break; //h不等于1,通常是poll时cas失败,重置r,origin,checkSum等,开下一轮循环. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift origin = k = r & m; // move and restart oldSum = checkSum = 0; } //3.4前面见过相似的代码,发现已经转完了一轮,校验和未改变过(任何一个队列都未进3.2/3.3,也就是查找任何一个下标ws[k]都是null),break. else if ((k = (k + 1) & m) == origin) { if (oldSum == (oldSum = checkSum)) break; //发现校验和有变动,说明有一轮循环未进入3.1,再次循环. checkSum = 0; } } } } //返架退出循环是task的status. return s; }
显然,task在整个help过程当中不会被执行,一旦某一轮循环发现task已经完成了,那么当即结束循环.此方法可让进行join/get等操做的线程帮助完成一些有关(子任务)的任务.
只要存在非空队列,task未完成,当前线程未能帮助完成maxTask个任务(或初始就指定了0),当前线程就会一直循环去找任务,直到发现task完成了为止.
简单分析一下一轮循环的工做流程.
显然首轮循环(不考虑一上来task的status就小于0的状况)必然能从2进入,在若干轮后某一轮未能pop成功而进入了3,3中若干轮后poll成功,则h从新被置为1,形成下一轮循环又能够进入2的流程.
直到:某一轮task完成了;某一轮maxTask完成了(指定的状况);某一轮再次发现h为0且发现已经对ws全部队列转满了一圈.
仍是再逻辑一下该方法的使用.
1.外部使用ForkJoinTask的get/join等方法时,引用到ForkJoinPool::externalHelpComplete,它调用helpComplete传入的队列必定是偶数索引的队列.非工做线程维护的队列;或引用到ForkJoinPool::awaitJoin,调用helpComplete传入maxTasks是0,意味着可能循环到直到task完成为止.
2.内部线程可能在CountedCompleter::helpComplete中使用此方法,这种状况下,须要咱们在compute方法中进行调用.
接下来看helpStealer方法.
//字面意思:尝试帮助一个"小偷". //本方法会尝试定位到task的偷盗者,并尝试执行偷盗者(可能偷盗者的偷盗者)的任务.它会追踪currentSteal(前面runTask时提过,会将参数task置为currentSteal)-> //currentJoin(当前队列等待的任务,后面会介绍awaitJoin方法),这样追寻一个线程在给定的task的后续工做,它会使用非空队列偷回和执行任务.方法的第一次从等待join调用 //一般意味着scan搜索,由于joiner没有什么更适合作的,这种作法也是ok的.本方法会在worker中留下hint标识来加速后续的调用. //参数w表明caller的队列,task是要join的任务. //方法共分三层循环,最外层是一个do-while循环,其内是两个for循环. private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { //初始化和进入if的条件,没什么可说的. WorkQueue[] ws = workQueues; int oldSum = 0, checkSum, m; if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) { //循环一:方法最外层的while循环.它的条件是task未完成且校验和未发生变化. do { // restart point //每次循环一的起点,校验和重置为0.用j保存w. checkSum = 0; // for stability check ForkJoinTask<?> subtask; WorkQueue j = w, v; // v is subtask stealer //循环二:外部for循环,subtask初始指向参数task,循环条件是subtask未完成. //在每次循环四中会校验当前小偷的队列是否空了,若是空了则换它的小偷继续偷(交给subtask指向). descent: for (subtask = task; subtask.status >= 0; ) { //循环三:内部第一个for循环,初始化变量h,用hint加上奇数位,保证从奇数索引取队列.k初始为0,每次循环结束加2. for (int h = j.hint | 1, k = 0, i; ; k += 2) { //1.循环三内的逻辑. //1.1发现k已经递增到大于最大索引m了,直接终止循环二,若发现task还未完成,校验和也未更改,则进行上面的重置操做并从新开始循环二. if (k > m) // can't find stealer break descent; //1.2i位置标记为h+k的结果与运算m,由于k每次增2,h又是奇数,故保证只取有线程主的队列. if ((v = ws[i = (h + k) & m]) != null) { if (v.currentSteal == subtask) { //发现偷取currentSteal的worker v,将它的索引i交给j(初始为w,在2内会更改成"等待队列"的元素,在while循环中会重置为w)的hint, //方便下一次再进入循环三时的查找.并终止循环三,进入循环四的判断入口. j.hint = i; break; } //1.3存在非空v,可是v未偷取subtask,将v的base加给校验和.这将影响到循环一的判真条件.显然从循环三退出循环二,或后续循环四退出循环二 //将致使循环一也一并因while条件不满而退出. checkSum += v.base; } } //循环四:迭代subtask的循环,它必须经循环三中的1.2走出. //2.到达循环四,必定已经在循环三中找到了一个v,此处会尝试帮助v或者它的产生的"后裔". for (;;) { // help v or descend ForkJoinTask<?>[] a; int b; //2.1相似1.3的逻辑,校验和增长v.base,初始化next,增长校验和意味着,只要从循环四退出了循环二,则最外的循环一的while条件将不知足. checkSum += (b = v.base); //next取v的currentJoin. ForkJoinTask<?> next = v.currentJoin; if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // stale //2.2若是subtask已经是完成态,或发现竞态等状况形成数据已脏,如发现本轮循环中j的当前join已不是当前subtask, //或v的当前steal不是subtask,说明出现了脏数据,直接终止循环二,从新进入while循环重初始化jv. break descent; //2.3发现队列v已空的逻辑. if (b - v.top >= 0 || (a = v.array) == null) { if ((subtask = next) == null) //2.3.1 v已空,且不存在next,即"等待队列"已空,退出循环二,从新while断定循环条件,重初始化jv. break descent; //2.3.2 还有next,将subtask指向next的同时,用v替换掉j.这是明显的迭代语句. //在前面的代码中能够看出,循环一就是为subtask找出小偷v的,关系是v.currentSteal=subtask.同时j.currentJoin=subtask. //由于next=v.currentJoin,将v赋给j后,仍旧知足j.currentJoin=next=subtask,此时break掉循环四,从新开启循环二的新一轮 //正好对v进行从新初始化,而找到v的条件又是v.currentSteal=subtask,也即等于j.currentJoin. //此处break掉的循环四将致使循环二的下轮将在循环三处从新为新的j找到v(v.currentSteal==subtask). j = v; break; } //2.4未进入2.3.1/2.3.2的状况,显然进入这二者会break到循环一或二. //取出base索引位置i和相应的任务元素. int i = (((a.length - 1) & b) << ASHIFT) + ABASE; ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); //2.5,接2.4,判断竞态,v.base!=b说明已经被别的线程将base元素出队.这种状况下直接进入下一轮的循环二. if (v.base == b) { //2.5.1 取出任务t,发现空为脏数据,从while循环从新初始化. if (t == null) // stale break descent; //2.5.2,将t出队并进行后续流程. if (U.compareAndSwapObject(a, i, t, null)) { //2.5.3首先将v的base增1. v.base = b + 1; //2.5.4取出w(方法参数,当前worker)的currentSteal保存到ps. ForkJoinTask<?> ps = w.currentSteal; int top = w.top; do { //2.5.5此循环不和循环一至四一块罗列,由于它本质上只是任务的出队与执行. //首先会尝试将w队列的currentSteal置为刚刚从v的任务数组中出队的t U.putOrderedObject(w, QCURRENTSTEAL, t); //执行t.执行后顺带循环处理本身刚压入队列w的任务.执行后,也跳出当前while循环的状况下会在下次从新判断2.3, //非空继续找base(i),为空则迭代v为next(2.3.2). t.doExec(); // clear local tasks too //2.5.6循环条件,只要参数task还未完成,w新压入了任务,则依次尝试从w中pop元素,和前面的t同样按序执行(此处顺带执行本身的任务). } while (task.status >= 0 && w.top != top && (t = w.pop()) != null); //2.5.7偷了小偷v的base任务并执行成功,则恢复w的currentSteal. U.putOrderedObject(w, QCURRENTSTEAL, ps); if (w.base != w.top) //2.5.8偷完并执行完当前v的base任务或者某一轮的等待队列上的元素v的base任务后,发现本身的队列非空了,就再也不帮助对方,方法return. //能够参考awaitJoin方法,由于helpStealer只在awaitJoin中调用,调用的前提就是w.base==w.top. //这显然与2.5.6有所纠结(尽管一个判断top,一个判断top和base的相等),只要到了2.5.8,队列非空将返回. return; // can't further help } //出队失败同2.5.1同样,竞态失败从新循环二,但在下一轮循环中会在2.5.1break回while循环. } } } //最外层的while循环条件,task未完成,校验和未发生更改. } while (task.status >= 0 && oldSum != (oldSum = checkSum)); } }
helpStealer方法不短,内容和信息也很多,但鉴于前面已经不停地渗透与它有关的"窃取","外部帮助"等概念,此处只再啰嗦一点细节,也解释一些注释中的疑惑.
1.回顾一下currentSteal,在scan方法中会把参数task设置为自身队列的currentSteal.并且runTask在每轮循环会先运行这个task,再运行队列的本地任务,每轮循环都会更新它.
2.若是我创建了一个ForkJoinTask,并fork出若干子任务并join之,或者在外部ForkJoinTask::join等方式,至关于造成了一个"等待队列",即任务之间彼此join,用currentJoin标识(这一块在awaitJoin方法详解).
3.仅有w.base==w.top时才能执行此方法,若是执行过程当中发生条件变化,则在执行完当前小偷v的某个任务后进行检查会发现, 就会回归到本身的任务队列.
4.同1,其实helpStealer方法至关于沿着currentJoin队列进行帮助,首先找到本身w的小偷,帮他执行完剩下的任务,而后顺着它们join的任务去执行.对于等待队列的逐个迭代过程,依靠currentJoin和currentSteal二者的配置,经过currentJoin找到next,也即下一个subtask,再遍历ForkJoinPool中的ws找到currentSteal是subtask的worker,如此迭代并重复上面的全部过程.
5.玩笑式的聊一下这个"反窃取",甚至有点"赔偿"的概念了,一个worker发现/或线程池外的线程去帮助偷了我任务的工做线程(worker,或WorkQueue,即ws的奇数索引位元素)从base处执行,直到执行干净,再找它currentJoin的任务所属于的队列,继续这个完成过程,直到发现本身的任务被完成了为止.可见,我发现你是我这个任务的小偷,我不但要偷你的所有身家,还要偷走偷了你的任务的小偷的所有身家,若是他也被偷过,那我再找他的小偷去偷,直到找回失主(我丢的任务)为止.
由于一个任务的完成要先从出队开始,所以不会出现两次执行的状况,能够放心大胆的窃取.
上面的45是比较简单的形象例子,但也不妨再加上一小段伪代码.
好比我先建立了一个ForkJoinTask子类并new一个对象,在它的exec方法中我fork了多个任务,那么当我去submit到一个ForkJoinPool后,我使用get方法去获取结果,此时美妙的事情就发生了.
1.我提交它入池,它进入了队列,并被一个工做线程(小偷)偷走.
2.它被偷走后,我才去get结果,此时发现task还未结束,我须要等.可是我就这样干等吗?不我要找小偷,我要报复.
3.开始报复,可是我只能偷他的钱库(array),小偷对本身偷来的任务很是重视,它放在currentSteal里面了,我偷不到,只好把他的钱库偷光.
4.偷完他的钱库,我发现个人task(失物)尚未完成,我仍是不能闲着,一不作二不休,我发现小偷也有join的任务,这个被join的任务不在他的队列,也被其余小偷偷走了,那么我找到新的小偷,再偷光它的财产.
5.若是个人task仍是未执行完毕,我再找新的小偷;不然返回便可.或者我每偷走一个小偷的任务时,忽然发现个人仓库提交了新任务,那我就不能再去偷了.
这是外部线程的执行结果.但若是帮助者自己是一个工做线程,那么流程也类似,读者自行捊顺吧.
//字面意思:尝试补偿. //方法会尝试减小活跃数(有时是隐式的)并可能会因阻塞释放或建立一个补偿worker. //在出现竞态,发现脏数据,不稳定,终止的状况下返回false,并可重试.参数w表明调用者. //方法实现比较简单,为简单的if else模式,只有一个分支能够执行. private boolean tryCompensate(WorkQueue w) { //canBlock为返回值. boolean canBlock; WorkQueue[] ws; long c; int m, pc, sp; //1.发现调用者终止了,线程池队列数组为空,或者禁用了并行度,则返回false. if (w == null || w.qlock < 0 || // caller terminating (ws = workQueues) == null || (m = ws.length - 1) <= 0 || (pc = config & SMASK) == 0) // parallelism disabled canBlock = false; //2.发现当前ctl表示有worker正等待任务(空闲,位于scan),则尝试释放它,让它回来工做. else if ((sp = (int)(c = ctl)) != 0) // release idle worker canBlock = tryRelease(c, ws[sp & m], 0L); else { //3.当前全部worker都在忙碌. //3.1计算活跃数,总数,计算方法前面已经论述屡次. int ac = (int)(c >> AC_SHIFT) + pc; int tc = (short)(c >> TC_SHIFT) + pc; //记录nbusy,注释表示用于验证饱合度. int nbusy = 0; // validate saturation for (int i = 0; i <= m; ++i) { // two passes of odd indices WorkQueue v; //3.2nbusy的计算方法,遍历线程池的队列数组(每次增1),验证则以1-3-5这个顺序开始,发现有处于SCANNING态的,就停掉循环,不然加1. if ((v = ws[((i << 1) | 1) & m]) != null) { if ((v.scanState & SCANNING) != 0) break; ++nbusy; } } //3.3若是非稳态(饱合度不是tc的2倍),或者ctl脏读,则返回false. if (nbusy != (tc << 1) || ctl != c) canBlock = false; // unstable or stale else if (tc >= pc && ac > 1 && w.isEmpty()) { //3.4处于稳态且ctl还有效,总worker数大于并行度且活跃数大于1并且当前w又是空的.尝试将ctl减去一个活跃位. long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // uncompensated 反补偿,初看莫名其妙,调用者会在以后增长ac. //返回值为cas是否成功. canBlock = U.compareAndSwapLong(this, CTL, c, nc); } else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares)) //3.5普通ForkJoinPool,总worker数达到MAX_CAP,或common池,总worker数量达到并行度+commonMaxSpares(默认256),抛出拒绝异常. throw new RejectedExecutionException( "Thread limit exceeded replacing blocked worker"); else { // similar to tryAddWorker boolean add = false; int rs; // CAS within lock //3.6.计算新的ctl,增长一个总worker数. long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); //加运行状态锁,池未进入终止态的状况下,进行cas,随后解锁. if (((rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); //cas成功,则建立worker canBlock = add && createWorker(); // throws on exception } } return canBlock; }
此方法相对简单许多,只是根据不一样的当前线程池和参数队列的状态进行不一样的操做.
1.调用者终止或线程池空态,返回false结束.
2.发现当前有worker正在空闲(阻塞等待新任务),释放等待栈顶(前面已经论述并欣赏过这种奇怪的"栈").
3.线程池未进入稳态或者进入时读取的ctl失效,返回false.
4.存在活跃worker且总worker数大于tc,调用者队列实际又是空的,则减去一个活跃位.
5.总线程数超限,抛出异常.
6.其余状况,增长一个总worker数并建立worker.
//前面提过它不少次了,awaitJoin方法会在指定任务完成或者超时前尝试帮助或阻塞自身. //参数w表明调用者,task为目标任务,参数deadline是超时目标(非0).它会返回退出时的任务状态. final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { //返回值. int s = 0; if (task != null && w != null) { //1.若未进入if必然返回0,进入条件是提供了task和w. //保存currentJoin ForkJoinTask<?> prevJoin = w.currentJoin; //将w的currentJoin暂时设置为task. U.putOrderedObject(w, QCURRENTJOIN, task); //若是task是CountedCompleter类型,转化并存放到cc. CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>)task : null; //2.循环. for (;;) { if ((s = task.status) < 0) //2.1目标task已完成,返回task的status. break; if (cc != null) //2.2目标task是CountedCompleter,调用前面介绍过的helpComplete方法,maxTasks不限(0). helpComplete(w, cc, 0); //2.3不然发现队列w已空,或者非空,则尝试从w中移除并执行task,若出现队列w是空且任务不知道是否完成的状况(t.doExec只是执行,不等结果), //此处也会拿到一个true,则调用前面介绍过的helpStealer去帮助小偷. else if (w.base == w.top || w.tryRemoveAndExec(task)) helpStealer(w, task); //2.4.帮助须要时间,double check,同2.1. if ((s = task.status) < 0) break; //2.5计算deadline有关的停顿时间ms. long ms, ns; if (deadline == 0L) ms = 0L;//未指定deadline,ms为0 else if ((ns = deadline - System.nanoTime()) <= 0L) break;//要指定的deadline已经早于当前时间了,break返回上面的status else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L;//用上面的ns计算ms发现负数,重置ms为1 //2.6,调用上面提到过的tryCompensate方法,传入当前worker,若是获得true的返回值,等待超时, //超时结束增长一个活跃位(前面提到tryCompensate方法最后加增长tc并建立worker,不增长ac,或者莫名其妙地减去了一个ac). if (tryCompensate(w)) { task.internalWait(ms); U.getAndAddLong(this, CTL, AC_UNIT); } } //3.最后恢复原来的currentJoin. U.putOrderedObject(w, QCURRENTJOIN, prevJoin); } return s; }
await方法只会在2.1,2.4,2.5三处结束,前两处为发现task结束,后一处是超时.返回的结果必定是返回时task的状态.
接下来看一些专门针对扫描的方法.
//简单的方法,尝试找一个非空的偷盗队列.使用相似简化的scan的方式查取,可能返回null. //若是调用者想要尝试使用队列,必须在获得空后屡次尝试. private WorkQueue findNonEmptyStealQueue() { WorkQueue[] ws; int m; //随机数r int r = ThreadLocalRandom.nextSecondarySeed(); //线程池不具有队列,直接返回null. if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { //循环开始. for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; int b; if ((q = ws[k]) != null) { if ((b = q.base) - q.top < 0) //查到q这个WorkQueue,而且q非空,则将q返回. return q; //查到了空队列q,则校验和加上q的base. checkSum += b; } //前面屡次见过的判断进入第二轮的办法.发现ws从头至尾都是null则break返回null,不然出现非null的空队列则将校验和置0继续循环. if ((k = (k + 1) & m) == origin) { if (oldSum == (oldSum = checkSum)) break; checkSum = 0; } } } return null; } //运行任务,直到isQuiescent,本方法顺带维护ctl中的活跃数,可是全过程不会在任务不能找到的状况下 //进行阻塞,而是进行从新扫描,直到全部其余worker的队列中都不能找出任务为止. final void helpQuiescePool(WorkQueue w) { //保存当前偷取的任务. ForkJoinTask<?> ps = w.currentSteal; // save context //循环开始,active置true. for (boolean active = true;;) { long c; WorkQueue q; ForkJoinTask<?> t; int b; //1.先把本地任务执行完毕(每次循环扫描). w.execLocalTasks(); //2.查找到非空队列的状况,除了2之外的34在cas成功的状况下都会终止循环. if ((q = findNonEmptyStealQueue()) != null) { //2.1通过3中成功减小了活跃数的状况,下一次循环又扫描到了新的非空队列,须要重激活. if (!active) { active = true; //活跃数从新加1. U.getAndAddLong(this, CTL, AC_UNIT); } //2.2再次判断队列非空,并从队列内部数组的base起取出task if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { //将task置于currentSteal U.putOrderedObject(w, QCURRENTSTEAL, t); //执行task t.doExec(); //若是w的偷取任务数溢出,转到池中. if (++w.nsteals < 0) w.transferStealCount(this); } } //3.仍旧保持active的状况. else if (active) { //3.1能进到这里,确定本轮循环未能进入2,说明未能发现非空队列,计算新的ctl即nc,它是原ctl减去一个活跃单位. long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0) //3.2新的活跃数加上并行度还不大于0,即不能溢出,说明没有活跃数,不进行cas了,直接break. //很明显,上面计算nc的方法,首先ctl正常自己是负,若上面表达式为正,惟一的解释是线程池有活跃线程(前面讲过,活跃一个加一个活跃单元,直到并行度为止) //由于两个表达式分别是前16位(在前面再补上16个1)和后16位求和. break; //3.3未能从3.2退出,说明nc表示当前有活跃数存在,进行cas,成功后active置false,不退出循环. //若下轮循环发现新的非空队列,会在2.1处增长回来.若未能发现,会在4处加回来. if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; } //4.前一轮循环进了3.3,当前循环未能进入2.1的状况,判断当前ctl活跃数加上并行度是非正,说明再建立并行度个数的worker也不能溢出.则再加回一个活跃数. else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 && U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) break; } //5.从新恢复currentSteal U.putOrderedObject(w, QCURRENTSTEAL, ps); } //获取并移除一个本地或偷来的任务. final ForkJoinTask<?> nextTaskFor(WorkQueue w) { for (ForkJoinTask<?> t;;) { WorkQueue q; int b; //首先尝试nextLocalTask本地任务. if ((t = w.nextLocalTask()) != null) return t; //获取不到本地任务,尝试从其余队列获取非空队列,获取不到非空队列,返回null. if ((q = findNonEmptyStealQueue()) == null) return null; //获取到了非空队列,从base处取任务,非空则返回,为空则重复循环. if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) return t; } }
上面是一些针对扫描的方法,有前面的基础,理解实现并不困难,再也不缀述.
接下来关注与终止有关的函数.
//前面不止一次提到过tryTerminate,说过它会尝试终止或完成终止. //参数now若是设置为true,则表示在runState进入SHUTDOWN关闭态(负)时无条件终止,不然须要在进入SHUTDOWN同时没有work也没有活跃worker的状况下终止. //若是设置enable为true,则下次调用时runState为负,可直接进入关闭流程(若是有now为true,则当即关). //若是当前线程池进入终止流程或已终止,返回true. private boolean tryTerminate(boolean now, boolean enable) { int rs; //common池不可关. if (this == common) // cannot shut down return false; //1.runState拦截和处理. if ((rs = runState) >= 0) { if (!enable) //1.1对于当前runState非负的状况,若是没有指定enable,返回false. return false; //1.2若是指定了enable,将加运行状态锁并更新runState的首位为1,即runState下次进入时为负.再也不进入1的拦截处理流程. rs = lockRunState(); unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); } //2.终止的处理流程. if ((rs & STOP) == 0) { //2.1.没有指定即刻关闭,检查是否线程池已进入静寂态. if (!now) { //循环重复直到稳态.初始化校验和机制. for (long oldSum = 0L;;) { WorkQueue[] ws; WorkQueue w; int m, b; long c; long checkSum = ctl;//校验和取ctl if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) //2.1.1当前线程池还有活跃的worker(前面解释过).此时应返回false return false; if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) //2.1.2线程池已经没有队列,直接break进入后续流程. break; //2.1.3从0开始遍历到ws的最后一个队列. for (int i = 0; i <= m; ++i) { if ((w = ws[i]) != null) { if ((b = w.base) != w.top || w.scanState >= 0 || w.currentSteal != null) { //只要发现任何一个队列非空,或队列未进入非活跃态(负)或队列仍有偷来的任务未完成. //尝试释放栈顶worker并增长一个活跃数.并返回false,能够据此从新检查. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); return false; } //队列非null可是空队列,给校验和增长base. checkSum += b; if ((i & 1) == 0) //发现非worker的队列,直接让外部禁用. w.qlock = -1; } } //2.1.4校验和一轮不变,break掉进入后置流程.即2.1.3中每一次取ws[i]都是null. if (oldSum == (oldSum = checkSum)) break; } } //2.2.到这一步,已经保证了全部的关闭条件.若尚未给运行状态锁加上stop标记, //则给它加上标记.此时再有其余线程去尝试关闭,会进不来2这个分支. if ((runState & STOP) == 0) { rs = lockRunState(); // enter STOP phase unlockRunState(rs, (rs & ~RSLOCK) | STOP); } } //3.通过前面的阶段,已完成预处理或now检查,可进入后置流程. int pass = 0; // 3 passes to help terminate for (long oldSum = 0L;;) { // or until done or stable WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m; long checkSum = ctl; //3.1前面解释过这个状态表示当前无活跃worker. if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || (ws = workQueues) == null || (m = ws.length - 1) <= 0) { if ((runState & TERMINATED) == 0) { //在确保无worker活跃的状况,直接将线程池置为TERMINATED.并唤醒全部等待终结的线程. rs = lockRunState(); unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); synchronized (this) { notifyAll(); } } //到此必定是终结态了,退出循环,结束方法返回true. break; } //3.2内循环处理存在活跃worker的状况.从第一个队列开始遍历. for (int i = 0; i <= m; ++i) { if ((w = ws[i]) != null) { //3.2.1对每一个非null队列,增长一次校验和并禁用队列. checkSum += w.base; w.qlock = -1; if (pass > 0) { //3.2.2内循环初次pass为0不能进入. //pass大于0,取消队列上的全部任务,清理队列. w.cancelAll(); if (pass > 1 && (wt = w.owner) != null) { //3.2.3 pass大于1而且队列当前存在owner,扰动它. if (!wt.isInterrupted()) { try { wt.interrupt(); } catch (Throwable ignore) { } } if (w.scanState < 0) //3.2.4若是w表明的worker正在等待任务,让它取消停顿,进入结束流程. U.unpark(wt); } } } } //3.3若是校验和在几轮(最大为3或m的最大值)循环中改变过,说明并未进入稳态.将oldSum赋值为新的checkSum并重置pass为0. if (checkSum != oldSum) { oldSum = checkSum; pass = 0; } //3.4pass从未被归置为0,稳态增长到大于3且大于m的状况,不能再帮助了,退出循环返回true. else if (pass > 3 && pass > m) break; //3.5pass未到临界值,加1. else if (++pass > 1) { long c; int j = 0, sp; //每一次进入3.5都会执行一次循环.若是ctl表示有worker正在scan,最多m次尝试release掉栈顶worker. //由于最多只有m个worker在栈中阻塞.所以3.4是合理的. while (j++ <= m && (sp = (int)(c = ctl)) != 0) tryRelease(c, ws[sp & m], AC_UNIT); } } return true; }
tryTerminate方法的实现并不复杂,不过这里有一点须要注意的地方:从方法中返回true,至少能够理解为进入了终止流程,但不必定表明已终止(即便是now的状况),由于仅看方法的后半点,返回true时,线程池必定已经进入stop(从3.4break),或完成了terminated(从3.1break).
显然,线程池的关闭必然会先经历STOP,而后再TERMINATED,故前面全部的使用线程池的方法都是直接先判断stop,由于若是线程池terminated了,那么必定先stop.一样,还有一个shutdown标记位来标记runState是否已进入负值,它小于0时(SHUTDOWN是最高位),则不能再接收新的任务.
其实从调用者能够看出来它的几种执行状况.
显然,在对线程进行解除注册时,等待任务时和提交任务时,now和enable均会传入false,若是没有其余地方提交调用了shutdown将runState的首位置1,这三个方法没法经过注释(1)处的代码拦截.
shutdown会用enable的方式,将当前还没有将runState置负的状态置负,使得下一次调用deregisterWorker,awaitWork,externalSubmit,shutdown四个方法均能走后置的逻辑.
shutdownNow则两个参数均会置true,会走完上面的全部逻辑.
下面来看externalSubmit等外部操做的方法.
//字面意思,从外面提交一任务入池.有前面的基础后,此方法很容易理解. //此方法会处理一些不常见的case,好比辅助进行池的一些初始化过程(首次提交任务), //若是发现是首次外部线程提交任务,在ws的目标索引位置为空或者出现竞态,它会尝试建立新的共享队列. //参数task是目标任务,调用者必须保证非空. private void externalSubmit(ForkJoinTask<?> task) { int r; //初始化一个用于定位的随机数r,前面曾简单介绍过它和localInit,许多公司的分布式id也是有它的成份. //而这个随机数与线程至关于绑定在了一块儿,所以,能够使用它表示一个线程特有的东西. if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } //循环尝试压入任务. for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { //1.发现此时线程池的运行状态已经进入SHUTDOWN,帮助终止线程池,并抛出拒绝异常. tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //2.发现线程池还未初始化,辅助初始化.STARTED为第三位. else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; //加锁. rs = lockRunState(); try { //double check并尝试初台化stealCounter.它在awaitRunStateLock(尝试加锁)的时候会用来wait, //同时也处理了初始化阶段的竞态,还记得在awaitRunStateLock方法中发现stealCounter为null时的注释(初始化竞态)吗? if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); //建立workQueues数组,大小是2的幂.并保证至少有两个插槽(语义理解,若是是4个的话,两个share两个独有). int p = config & SMASK; //下面这个比较好理解,总之,最小的状况下,n也会是(1+1)<<1=4,这样保证有两个位置给SHARE两个位置给工做线程. //n的初始值取决于并行度. int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; //新的运行状态. ns = STARTED; } } finally { //完成了辅助初始化,则解锁,并置runState加上STARTED标识. unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //3.某轮循环发现早已完成初始化,使用本线程的随机数r计算索引,发现ws[k]存在.说明已被别的线程在此初始化了一个队列. //注意索引k的值的计算,它与m进行与运算,保证不大于m,同时与SQMASK,即share-queue mask,它的值是0X007e,前面说过, //很明显,它是整数的2至7位,保证了共享队列只能放在ws的偶数位. else if ((q = ws[k = r & m & SQMASK]) != null) { //3.1对队列加锁. if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //取队列的数组和top ForkJoinTask<?>[] a = q.array; int s = q.top; //初始化提交或者扩容. boolean submitted = false; try { // locked version of push //||左边的语句指符合添加元素的条件,右边表示若是不符合添加条件,则进行扩容. if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { //符合添加条件或扩容成功,取top对应的索引j. int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //向top放入task. U.putOrderedObject(a, j, task); //给top加1. U.putOrderedInt(q, QTOP, s + 1); //标记为已提交. submitted = true; } } finally { //释放qlock. U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { //若是提交成功,则尝试唤醒top或建立一个worker(若是太少).并返回. signalWork(ws, q); return; } } //竞态失败,标记move move = true; // move on failure } //3.2计算出的位置没有queue,且runState未锁,建立一个新的. else if (((rs = runState) & RSLOCK) == 0) { // create new queue //共享队列没有owner. q = new WorkQueue(this, null); //随机数就用线程的随机数r. q.hint = r; //config的第32位置1表示共享队列 q.config = k | SHARED_QUEUE; //队列的scanState直接置为INACTIVE,很明显,参考前面的描述, //它没有工做线程,也不会参与活化和scan阻塞的过程,也不会将本身的scanState压入ctl后32位作栈元素. q.scanState = INACTIVE; //加锁. rs = lockRunState(); if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) //仍旧符合添加条件,池未终结,将q赋给ws[k],不然的话,可能在下一轮循环进入1帮助终止, //也可能进入2用现成的队列内的任务数组添加元素到top.也可能在4处发现竞态,并最终致使5处重初始化r并从新循环找索引. ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } //4.标记繁忙. else move = true; // move if busy //5.本轮循环经历2的竞态失败或4的繁忙,从新初始化一个r供下轮循环使用. if (move) r = ThreadLocalRandom.advanceProbe(r); } }
这个方法的逻辑相对简单,用到的方法和字段基本都是前面说过的.
它的最终结果只有两个:
1.任务提交入池,并唤醒正在scan的栈顶worker或建立一个新的worker(空闲太多).
2.终止了线程池并抛出拒绝异常.
看一看有关的几个简短方法.
//尝试将给定的task添加到一个提交者当前的队列中,若是还须要额外的初始化操做等,使用上面的externalSubmit. //咱们知道,绝大多数的状况下,不须要初始化线程池的任务数组(整个线程池就一次),不须要初始化一个工做队列(每一个ws一个位置只一次). //所以它至关于先尝试用最简单直接的办法将任务压入队列,若是ws存在而队列须要初始化,或者池自己就没有完成初始化,再使用externalSubmit. //参数task是要提交的任务,调用者自己必须保证它非空. final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; //老办法,初始的随机数.运行状态. int r = ThreadLocalRandom.getProbe(); int rs = runState; //快速压入的代码分支,条件是队列数组ws已分配,非空,且根据r计算出来索引位取出的队列 //存在且已完成初始化,线程池未进入SHUTDOWN,而且可以对队列进行加锁. if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //快速入队. ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && //第二个条件是没有到扩容条件. (am = a.length - 1) > (n = (s = q.top) - q.base)) { //计算出top的索引j,并将当前任务放入,将top加1 int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) //发现原来的队列长度很短,有可能有worker正在scan,尝试唤醒一个worker或添加一个worker signalWork(ws, q); //只要成功压入,返回. return; } //最后解锁. U.compareAndSwapInt(q, QLOCK, 1, 0); } //未能成功压栈,缘由多是线程池未初始化,工做队列未初始化,队列达到扩容阈值等.使用externalSubmit进行. externalSubmit(task); } //尝试弹出外部提交者的任务,找到队列,非空时加锁,最后调整top,每次进行都会检查失败,尽管不多失败. //在前面ForkJoinTask和CountedCompleter等文章中曾引用过相关方法,此方法能够令等待任务的线程 //自行将任务出队并执行,而不是在池内线程还忙碌的状况下干等.可是该队列可能被其余外部线程放置了新的栈顶 //且看内部方法实现,当且仅当task是栈顶才有用. final boolean tryExternalUnpush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s; //当前线程生成随机数r. int r = ThreadLocalRandom.getProbe(); if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && (w = ws[m & r & SQMASK]) != null && (a = w.array) != null && (s = w.top) != w.base) { //进入if的条件.线程池已初始化,ws存在,w存在且w非空队列.注意,仍旧取的偶数索引. //计算当前最顶部元素的索引j long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; //尝试加锁qlock,加锁成功进入. if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { //进一步check队列w的top和w的array未变. if (w.top == s && w.array == a && //队列w的顶部元素就是参数task U.getObject(a, j) == task && //成功将task出队 U.compareAndSwapObject(a, j, task, null)) { //将top减1并释放锁,返回true. U.putOrderedInt(w, QTOP, s - 1); U.putOrderedInt(w, QLOCK, 0); return true; } //加锁前已有更改或者task自己就不是顶部任务,直接解锁.返回false. U.compareAndSwapInt(w, QLOCK, 1, 0); } } //默认返回fasle return false; } //外部提交者helpComplete.介绍CountedCompleter提过此方法. //当目标任务task是CountedCompleter类型时能够手动调用CountedCompleter::helpComplete,它会调用此处,ForkJoinTask::get也有调用. //此方法能够令外部线程在等待task时帮助completer栈链上它的子孙任务完成,从而加速task的完成. final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int n; int r = ThreadLocalRandom.getProbe(); return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 : //ws未初始化,返回0,不然返回helpComplete的结果,取w的方式不变. helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks); }
终于能够看对外公有的api了,咱们使用ForkJoinPool的公有方法:
//invoke方法会尝试将task压入池,但也会当即join等待,压入池的方法即前面介绍过的externalPush,一样join方法也可能会 //致使当前线程自身完成了任务(池中工做线程忙碌而当前线程当即从队列中获取了该任务). //执行结束后返回该任务的执行结果,当出现异常时,直接从新抛出.但也可能抛出拒绝异常(拒绝入队). public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); } //安排给定任务的执行,异步进行. public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); } //继承自AbstractExecutorService的方法列表 //execute方法,传入runnable,使用前面文章介绍的ForkJoinTask.RunnableExecuteAction适配器. public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); } //submit一个task,返回它自己. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } //对callable的适配,前面也提过. public <T> ForkJoinTask<T> submit(Callable<T> task) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); externalPush(job); return job; } //对task和result的适配. public <T> ForkJoinTask<T> submit(Runnable task, T result) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); externalPush(job); return job; } //对submit一个Runnable的适配.避免重复包装.由于ForkJoinTask也能够实现runnable. //典型的场景,先submit一个runnable,获得返回的job,再将job给submit进去. public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } //执行全部任务.同样先入队再执行,可能出现本外部线程又偷回来执行的状况. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); //标记是否有异常. boolean done = false; try { for (Callable<T> t : tasks) { ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); //把全部任务包成适配器并加入futures列表. futures.add(f); //压入池. externalPush(f); } for (int i = 0, size = futures.size(); i < size; i++) //对每个任务进行静默等待. ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); //上面的循环成功退出,置true返回. done = true; return futures; } finally { if (!done) //发现是异常退出,则依次取消任务. for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(false); } }
截止到此,ForkJoinPool中的主体难点方法已所有介绍完毕,下面选看一些周边的有助于理解的简单方法.
//估计当前线程池中正在运行(偷任务或运行任务)的线程,也就是未阻塞等待任务的线程.它会过分估计正在运行的线程数. public int getRunningThreadCount() { int rc = 0; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { if ((w = ws[i]) != null && w.isApparentlyUnblocked()) //只取ws奇数索引的worker,只要它isApparentlyUnblocked,即未进入waiting,blocking,wating_timed. ++rc; } } return rc; } //估计当前正在进行偷取或执行任务的线程(未阻塞等待任务),此方法也会过分估计. public int getActiveThreadCount() { //很明显,根据前面咱们研究了很久的逻辑,每release/signal的worker都会增长一个活跃数单元, //初始添加的worker也会增长一个活跃数单元和总数,显然只要有active的,那么r必然是一个溢出的正数. int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; //忽略负值. } //判断线程池此刻是否已经进入静寂态,所谓的静寂态是指当前线程池中全部worker都已经阻塞在等待任务了, //由于没有任何任务可供他们偷取或执行,也没有任何挂起的提交入池的任务.此方法相对保守,并非全部线程都空闲的状况下 //当即会返回true,只有在他们减小了活跃数以后.(也就是保持空闲一段时间) public boolean isQuiescent() { //前面分析过,显然这个表达式不大于0即为不溢出的状况,回忆前面关于scan时,终止时等的下降活跃数. return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0; }
与关闭有关的方法.
//此方法的执行参数,注意,now传false,enable传true. //它的执行结果很简单(能够参考前面的tryTerminate). //1.此前已经调用过tryTerminate并enable,或者调过shutdown,那会致使一次终结. //2.初次调用,前面提交过的任务继续执行,但不会接受新的任务(由于runState首位置1了). //3.commonPool不准关. //4.已关的,调用了也没什么效果.但第二次调用时,已经在过程当中的任务可能受此影响取消. public void shutdown() { checkPermission(); tryTerminate(false, true); } //它会尝试当即取消和中止全部的任务,拒绝后续提交的任务.若是是common池则无效果. //若是已经关闭,再调用无影响.正在被提交入池或正在执行的任务(在调用此方法执行时)可能会被取消,也可能不会(取决于时机,可能早于取消过程而完成执行). //它会取消掉已存在的任务或未执行的任务.方法总会返回一个空的list.(与其余executor不一样) public List<Runnable> shutdownNow() { checkPermission(); tryTerminate(true, true); return Collections.emptyList(); } //很好理解,runState的31位是1,而仅有在shutdown方法中全部worker都已闲置或ws为空才会加上此位.显然此时全部任务都已完成. public boolean isTerminated() { return (runState & TERMINATED) != 0; }
//正在关闭中,必定有STOP标记位,没有TERMINATED位.
public boolean isTerminating() { int rs = runState; return (rs & STOP) != 0 && (rs & TERMINATED) == 0; } //已SHUTDOWN,首位标记,显然只要shutdown方法调用并传enable为true必定会有此结果. public boolean isShutdown() { return (runState & SHUTDOWN) != 0; } //等待一个shutdown请求后全部的任务完成或者发生超时,或者当前线程被扰动(第一优先级). //由于common池永远不会随程序调用shutdown而终止,所以使用commonPool调用此方法时, //会直接等效于awaitQuiescence,并且永远会返回false. //返回true,表明当前线程池终止了,false表明超时了. public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) //1.当前线程中断,抛出异常. throw new InterruptedException(); if (this == common) { //2.common池等效于awaitQuiescence并返回false. awaitQuiescence(timeout, unit); return false; } long nanos = unit.toNanos(timeout); if (isTerminated()) //3.发现全部任务已完成返回true. return true; if (nanos <= 0L) //4.已超时返回false. return false; //5.计算deadline并进入循环等待逻辑. long deadline = System.nanoTime() + nanos; synchronized (this) { for (;;) { if (isTerminated()) //5.1循环中发现已达到完成态,返回true. return true; if (nanos <= 0L) //5.2循环时发现超时,false. return false; //5.3循环时减小时间并等待. long millis = TimeUnit.NANOSECONDS.toMillis(nanos); wait(millis > 0L ? millis : 1L); nanos = deadline - System.nanoTime(); } } } //等待静寂.若是当前线程是池内线程,等效于ForkJoinTask::helpQuiesce方法,不然只是等待. public boolean awaitQuiescence(long timeout, TimeUnit unit) { long nanos = unit.toNanos(timeout); ForkJoinWorkerThread wt; Thread thread = Thread.currentThread(); if ((thread instanceof ForkJoinWorkerThread) && (wt = (ForkJoinWorkerThread)thread).pool == this) { //1.线程是ForkJoinWorkerThread,帮助静寂.返回true. helpQuiescePool(wt.workQueue); return true; } //2.不是池内线程,准备计时,初始化若干变量. long startTime = System.nanoTime(); WorkQueue[] ws; int r = 0, m; boolean found = true;//表明发现任务. //3.循环等待静寂或超时. while (!isQuiescent() && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { //3.1有趣的地方,只有本轮没找到任务才会进行超时判断. if (!found) { //3.1.1判断超时了,返回false. if ((System.nanoTime() - startTime) > nanos) return false; //3.1.2没超时,放弃执行权一段时间,不能阻塞在此. Thread.yield(); // cannot block } //改成false. found = false; //4.内循环从数组中间开始,一直递减到0. for (int j = (m + 1) << 2; j >= 0; --j) { ForkJoinTask<?> t; WorkQueue q; int b, k; //4.1取队列从0开始,只要取出了ws的非空队列成员,进入逻辑. if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null && (b = q.base) - q.top < 0) { //4.2 found标记true found = true; if ((t = q.pollAt(b)) != null) //4.3尝试从底部取出任务并执行.至关于帮助静寂 t.doExec(); //进入4.1,即break掉内循环,可能凑巧,执行完一个任务就静寂了. break; } } } //能从while循环break出来或者循环条件为假退出,说明达到静寂. return true; }
上面的代码自己没有什么问题,可是已经涉及到了外部api,ForkJoinTask::helpQuiesce,此api是由咱们决定调用时机的,显然,咱们能够在任何一个入池的ForkJoinTask中执行此方法来帮助ForkJoinPool进入静寂态,帮助执行全部待执行的任务,参考helpQuiescePool方法(会先执行本地任务,再偷其余队的任务执行).
到此,源码只剩下一个blocker了.
//MangedBlocker接口.它是一个为运行在ForkJoinPool中的任务维护并行度的接口 //咱们能够经过拓展它来实如今ForkJoinPool中运行的任务的并行度管理.它只有两个方法. //isReleasable方法会在没有必要阻塞时必定返回true,block方法会在必要时阻塞当前线程, //它内部能够调用isReleasable.而这个调度须要使用ForkJoinPool#managedBlock(ManagedBlocker) //它会尝试去调度,避免长期的阻塞,它容许更灵活的内部处理. public static interface ManagedBlocker { //可能会阻塞一个线程,好比等待监视器,当返回true时表示认为当前没有必要继续block. boolean block() throws InterruptedException; //返回true表示认为没有必要block. boolean isReleasable(); } //运行给定的阻塞任务,当在ForkJoinPool运行ForkJoinTask时,此方法在当前线程阻塞的状况下(调用blocker.block), //认为须要保持必要并行度时安排一个备用线程,方法内重复调用blocker.isReleasable和blocker.block.且前者必在后者前, //它返回false时才会有后者.若是没运行在ForkJoinPool内,那么方法的行为等效于下面这段代码: //while(!blocker.isReleasable()){if(blocker.block() break;} //参数blocker是上面的接口的实现类,在前面的文章CompletableFuture和响应式编程中曾见到一个实现类. public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { ForkJoinPool p; ForkJoinWorkerThread wt; Thread t = Thread.currentThread(); //1.当前是ForkJoinPool池内线程时的逻辑. if ((t instanceof ForkJoinWorkerThread) && (p = (wt = (ForkJoinWorkerThread)t).pool) != null) { WorkQueue w = wt.workQueue; //1.1取出工做队列,进行循环,blocker.isReleasable判断当前并不是没有必要加锁时进入. while (!blocker.isReleasable()) { //1.2要加锁,尝试补偿,它会在此时唤醒一个空闲的线程或建立一个新的线程来补偿当前线程的阻塞. if (p.tryCompensate(w)) { try { //1.3当前线程阻塞等待. do {} while (!blocker.isReleasable() && !blocker.block()); } finally { U.getAndAddLong(p, CTL, AC_UNIT); } break; } } } //2.非池内线程的逻辑.同上面的阻塞逻辑. else { do {} while (!blocker.isReleasable() && !blocker.block()); } }
关于blocker咱们并不陌生,在CompletableFuture和响应式编程一文中,咱们提到了CompletableFuture中内部实现了一个blocker,并使用ForkJoinPool的managedBlock方法管理.还记得这方面的实现吗?
CompletableFuture内部维护了一个相似栈的结构,用内部类Completion和它的子类们实现,而Completion自己是ForkJoinTask的子类.
一样,使用CompletableFuture时,咱们能够在入口方法入包含runAsyc之类的方法,该方法默认会提供一个线程池,而此线程池会由可用核数来决定,会选定一个ForkJoinPool或一个low逼的一任务一线程的线程池.
若是选择了ForkJoinPool,显然能及时补偿一个工做线程的阻塞是很是有必要的,这也是提高性能之举.
到此为止,ForkJoinPool的源码分析完毕.
这篇文章是ForkJoin框架系列的最后一篇,前面分析了ForkJoinPool的代码,它是ForkJoin框架的核心,代码较为复杂,做者我的以为它也是全部线程池中最复杂的一个,下面咱们来总结一下ForkJoinPool和整个ForkJoin框架。
ForkJoinTask是运行在ForkJoinPool的task,它定义了任务自身的入口api,维护了任务的status字段和result,结合ForkJoinPool来实现调度。ForkJoinTask必定会运行在一个ForkJoinPool中,若是没有显式地交它提交到ForkJoinPool,会使用一个common池(全进程共享)来执行任务。
ForkJoinTask支持fork和join,fork就是将当前task入池,join就是等待此task的结束并获取结果。
CountedCompleter是一个另类的ForkJoinTask,它在ForkJoinTask基础上维护了一个栈链,其实在某些视角上即像栈,又像一个不保存子节点的树。同时它也不保存运行结果,使用它去getRawResult只能获得null,可是任务的status会进行维护(委托给父类ForkJoinTask)。并行流是基于它来实现的,调度交由CountedCompleter完成,而原集的分割,结果的合并则由并行流的逻辑实现。
ForkJoinWorkerThread是运行在ForkJoinPool中的线程,它内部会维护一个存放ForkJoinTask的WorkQueue队列,而WorkQueue是ForkJoinPool的内部类。
ForkJoinPool是框架的核心,不一样于其余线程池,它的构建不须要提供核心线程数,最大线程数,阻塞队列等,还增长了未捕获异常处理器,而该处理器会交给工做线程,由该线程处理,这样的好处在于当一个线程的工做队列上的某个任务出现异常时,不至于结束掉线程,而是让它继续运行队列上的其余任务。它会依托于并行度(或默认根据核数计算)来决定最大线程数,它内部维护了WorkQueue数组ws取代了阻塞队列,ws中下标为奇数的为工做线程的所属队列,偶数的为共享队列,虽然名称有所区分,但重要的区别只有一点:共享队列不存在工做线程。
关于工做窃取,线程池外的提交者在join一个任务或get结果时,若是发现没有完成,它不会干等着工做线程,而是尝试自行执行,当执行方法结束,任务尚未完成的状况,它能够帮助工做线程作一些其余工做,好比当任务是CountedCompleter类型时,帮助完成位于栈链前方的子任务,而这个子任务先从当前worker队列的top找,后从其余队列的base找;线程池中的工做线程会在任务入队时被尝试唤醒,会循环执行,每轮循环都会先尝试随机scan到一个任务(该任务可能属于其余线程),执行它,再执行本地任务,如此往复,scan的过程能够理解为一种窃取,当不能窃取时则会inactive;工做窃取时从队列的base开始,工做压入时从pop进入,执行本身队列的任务时,依托于FIFO仍是LIFO的模式。此外,反向帮助小偷(helpStealer)也是一个“反弹式”的工做窃取,它与helpComplete一并属于工做窃取的一部分。
ForkJoinPool维护了一个ctl控制信号,前16位表示活跃worker数,33至48位表示worker总数,后32位能够粗略理解用于表示worker等待队列的栈顶。ForkJoinPool利用这个ctl,WorkQueue的scanState和stackPred以及ws的索引算法维护了一个相似队列(或者叫栈更贴切一些)的数据结构。每当有一个线程偷不到任务,就会存放此前的ctl后置标记位到pred,并将本身的索引交给ctl做为栈顶。相应的唤醒操做则由栈顶起。相应的方法在进行尝试添加worker时,会综合当前是否有阻塞等待任务的线程。
当全部线程都不能窃取到新的任务,进入等待队列时,称之为“静寂态”。
ForkJoinPool对全局全状的修改须要加锁进行,这些操做如修改ctl(改变栈顶,增删活跃数或总数等),处理ws中的元素,扩容ws,关闭线程池,初始化(包含ws的初始化),注册线程入池等。而这个锁就是runState,它除了当锁,也间接表示了运行状态,相应的线程池的SHUTDOWN,STOP,TERMINATED等状态均与其相应的位有关。
线程池的并行度保存在config字段的后16位,config的第17位决定了是FIFO仍是LIFO。而这个并行度也经过间接地取反并计入到ctl的前32位,线程池中判断是否当前有活跃的线程,或者是否已进入寂静态,都是用保存在config的并行度和保存在ctl前32位的活跃数与并行度的运算结果进行相加,判断是否会溢出(正数)来决定的。
ForkJoinPool还提供了补偿机制,用于在线程将要阻塞在执行过程当中前释放掉一个正在空闲的工做线程或建立一个新的工做线程,从而保证了并行度。第一篇文章中提到的CompletableFuture就是将Completion栈(ForkJoinTask)交给ForkJoinPool(取决于并行度)去执行并用它来进行调度。
ForkJoinPool的关闭则可能有多种场景:当一个worker被解除注册时,尝试一次,并不强关,也不指定enable,只有在线程池已经收到关闭信号并处在过程当中时,它才会帮助关闭;工做线程由于scan不到work而不得不进行await,当它发现当前线程池已处于静寂态,也尝试同上的关闭线程池,一样不强行关闭也不指定enable,只有线程池已经收到关闭信号并处在过程当中时,它才会帮助关闭;线程池外提交任务时,发现线程池已收到关闭信号,尝试帮助关闭;手动传入关闭信号,即调用shutdown时,会指定非now,enable,则线程池将收到关闭信号,记录该信号,并进行关闭流程,当下一次再有前述三种状况调用时,必然能够进入到关闭流程;即刻关闭,shutdownNow,它会要求即刻进入关闭,不会进入非now的状况下的release以及等待静寂等操做。
完。