在前面的几篇文章中详述了ForkJoin框架的若干组分,在相应的官方文档中总会不时地提起"Phaser",一样的,也提到Phaser能够用于帮助运行在ForkJoinPool中的ForkJoinTask运行时保持有效的执行并行度(其实特指其余task都在等待一个phase的前进时).node
熟悉JUC的朋友都知道它的大概组成部分包含:Containers(支持并发的容器),Synchronizers(同步器),Executors(线程池),BlockingQueue(阻塞队列),Atomic(原子类),Lock and Condition(锁).而Phaser和CyclicBarrier,Semaphore等同样是一个同步器.算法
本文主要介绍Phaser的内部实现,粗略介绍使用,它的源码相比于线程池较为简单,但最好能对比其余同步器来了解,读者最好拥有juc其余同步器,原子类,部分ForkJoin框架的基础.编程
同时,本文也会再次提到ForkJoinPool::managedBlock(blocker),以前在ForkJoinPool一文提到了实现和接口,而在CompletableFuture中见到了一个blocker的实现.并发
首先来看一些与Phaser状态有关的简单的常量.框架
//64位整数表示Phaser的状态. private volatile long state; private static final int MAX_PARTIES = 0xffff;//最大parties,后16位表示. private static final int MAX_PHASE = Integer.MAX_VALUE;//最大phase,最大整数值. private static final int PARTIES_SHIFT = 16;//取parties使用的移位数,16 private static final int PHASE_SHIFT = 32;//取phase的移位数,32 private static final int UNARRIVED_MASK = 0xffff; //未到的,取后16位. private static final long PARTIES_MASK = 0xffff0000L; //参加者,17-32位. private static final long COUNTS_MASK = 0xffffffffL; //数量,后32位. private static final long TERMINATION_BIT = 1L << 63;//终止态,首位. // 特殊值. private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.显然,它表示了一个ONE_ARRIVAL信息和PARTY信息. private static final int EMPTY = 1; //对一个state s计算unarrived的count, private static int unarrivedOf(long s) { //直接取整数位,若是等于EMPTY(1)则返回0,不然取后16位. int counts = (int)s; return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } //对一个state,取出parties信息,直接取state的17至32位. private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } //对于一个state,取出phase信息,直接取前32位. private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } //对于一个state,取出arrived信息 private static int arrivedOf(long s) { int counts = (int)s; //state的后32位等于1(EMPTY)返回0,不然返回parties(state的17至32位,参考上面的partiesOf方法)和UNARRIVED(state的后16位)的差. return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); }
上面都是一些常量,没什么可分析的,简单来个总结.函数
Phaser用一个long型的state保存状态信息.工具
state的前32位表示phase,后16位表示unarrivied,17至32位表示parties,parties减去unarrived即arrived.单元测试
下面咱们看一些成员变量和有关函数.测试
//this的父,能够是null表示none private final Phaser parent; //phaser显然是个树的结果,root表明根,若是当前phaser不在树内,则root==this private final Phaser root; //偶数队列和奇数队列.它们存放等待线程栈的头,为了减小当添加线程与释放线程的竞态, //这里使用了两个队列并互相切换,子phaser共享root的队列以加快释放. private final AtomicReference<QNode> evenQ; private final AtomicReference<QNode> oddQ; //决定某个phase的等待线程队列. private AtomicReference<QNode> queueFor(int phase) { //选择队列的方法,若是参数phase是偶数,使用evenQ,不然oddQ. return ((phase & 1) == 0) ? evenQ : oddQ; } //出现arrive事件时的边界异常信息. private String badArrive(long s) { return "Attempted arrival of unregistered party for " + stateToString(s); } //注册时的边界异常信息. private String badRegister(long s) { return "Attempt to register more than " + MAX_PARTIES + " parties for " + stateToString(s); } //他们都用到的stateToString(s),计算参数s对应的phase,parties,arrived. private String stateToString(long s) { return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]"; }
为了便于理解,先来看队列的实现.ui
//表示等待队列的QNode,实现了ManagedBlocker static final class QNode implements ForkJoinPool.ManagedBlocker { //存放所属phaser final Phaser phaser; //所属phase final int phase; //是否可扰动 final boolean interruptible; //是否认时 final boolean timed; //是否已扰动 boolean wasInterrupted; //计时相关 long nanos; final long deadline; //关联线程,当是null时,取消等待. volatile Thread thread; //下一个QNode QNode next; QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) { this.phaser = phaser; this.phase = phase; this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; this.deadline = timed ? System.nanoTime() + nanos : 0L; //取当前线程. thread = Thread.currentThread(); } //isReleasable方法 public boolean isReleasable() { if (thread == null) //1.线程已置空(如2),返回true释放. return true; if (phaser.getPhase() != phase) { //2.发现phaser所处的phase不是构建QNode时的phase了,就置线程为空,返回true. thread = null; return true; } if (Thread.interrupted()) //3.若是当前线程扰动了. wasInterrupted = true; if (wasInterrupted && interruptible) { //4.发现扰动标记,而且QNode配置为可扰动,则置线程null并返回true thread = null; return true; } if (timed) { //5.定时逻辑,还有nanos,计算新的时长. if (nanos > 0L) { nanos = deadline - System.nanoTime(); } if (nanos <= 0L) { //已经到时间,返回true,线程置空. thread = null; return true; } } return false; } //block逻辑 public boolean block() { if (isReleasable()) return true; else if (!timed) //不定时的park LockSupport.park(this); else if (nanos > 0L) //定时的状况. LockSupport.parkNanos(this, nanos); //老规矩 return isReleasable(); } }
前面介绍过CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,这一块的逻辑显然得心应手.
很明显,若是咱们在ForkJoinPool中使用它做为blocker,并在相应的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),将每一个ForkJoinWorkerThread在阻塞前构建一个QNode进入Phaser的等待队列(虽然尚未讲到相关内容,可是Phaser显然不用咱们直接操做内部类QNode),那么它将依照上述逻辑进行补偿,保障有效的并行度.
前面完成了承前启后,预热到此结束,开始看Phaser的核心方法.
//doArrive方法 //它是arrive和arriveAndDeregister方法的主要实现.手动调用这些方法能够加速经过和最小化竞态窗口期. //参数表明要从当前state中减去的调整数值,它的单位依托于业务,当为arrive时减去的单位为ONE_ARRIVAL, //当为arriveAndDeregister时减去的单位为ONE_DEREGISTER. private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { //1.变量s初始化,取决因而否当前Phaser是root.不是root将试图从root同步滞后的state. long s = (root == this) ? state : reconcileState(); //计算phase,前32位. int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //2.负数直接返回.说明原来的state首位就是1,前面的TERMINATE_BIT就是64位置1. return phase; //取count,后32位. int counts = (int)s; //计算unarrived,和前面同样的逻辑. int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0)//2.1 //没有unarrived了,说明不该该调用此方法,抛出异常,信息就是前面介绍过的badArrive throw new IllegalStateException(badArrive(s)); //3.尝试将state减去adjust数. if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { //3.1cas成功后,unarrived余1,则前进一个phase if (unarrived == 1) { //3.1.1取出parties做为下一个state的基础. long n = s & PARTIES_MASK; //3.1.2 下一个unarrived,数值上等于parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { //3.1.3当前Phaser是root,onAdvance返回true,则加上终止信号. if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) //3.1.4 onAdvance返回false,而计算得出的nextUnarrived是0,即没有parties,n加上一个empty(1) n |= EMPTY; else //3.1.5nextUnArrived不是0,加到n上. n |= nextUnarrived; //3.1.6前面的流程完成了state的后32位(parties和unarrived),接下来处理前32位. //限定在MAX_PHASE以内,对当前phase加1. int nextPhase = (phase + 1) & MAX_PHASE; //将nextPhase的值加到n的前32位.并用n去cas掉原来的state,由于有3处入口的cas,此处必定能成功 n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); //更新到新的phase,唤醒等待的waiter. releaseWaiters(phase); } //3.1.7当前Phaser不是root,当nextUnarrived计算得0时,像父传递解除注册,参数ONE_DEREGISTER //会同时减去一个unarrived和一个parties.下轮循环正常应进入3.1.8 else if (nextUnarrived == 0) { phase = parent.doArrive(ONE_DEREGISTER); //完成传递后,将本身的state置empty. UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else //3.1.8,当前Phaser不是root,计算的nextUnarrived非0,像父传递一个arrive事件,减去一个unarrived. phase = parent.doArrive(ONE_ARRIVAL); } //3.2返回当前phase,多是已进入3.1递增的.仅有此处可退出循环. return phase; } } }
关于该方法的执行流程,咱们结合几个周边方法一并分析,先来看注册方法和onAdvance勾子.
//注册和批量注册.参数表明parties和unarrived字段的增长数,它必须大于0. private int doRegister(int registrations) { // 1.用参数计算一个adjust,同时包含parties和arrive. long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; //循环尝试更改. for (;;) { //2.存在parent,则用root的phase调整this的state. long s = (parent == null) ? state : reconcileState(); //取出当前state中保存的counts,parties,unarrived信息. int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) //要注册的数量大于了余量,抛出异常. throw new IllegalStateException(badRegister(s)); //3.计算出phase phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //phase为负说明state为负,即终止态,终止. break; //4.当前state表示的参与数非空的逻辑,当前注册非首次注册. if (counts != EMPTY) { if (parent == null || reconcileState() == s) { //this是root或者从root同步的state不变,继续执行,不然从新循环. if (unarrived == 0) //4.1本轮循环经过原state计算的unarrived为0,说明应等待下一phase,使用root等待 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) //4.2本轮循环未发现应等待下一phase,尝试原子更新,增长adjust到state上. break; } } //5.当前不存在counts,且自身就是root,表明root的首次注册. else if (parent == null) { //5.1计算下一个state,由于没有参与数,使用phase初始化前32位,并使用adjust作后32位. long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) //5.2 cas成功,退出,不成功,下轮循环. break; } //6.是首次注册,但也不是root的逻辑.表明非root的Phaser的首次注册. else { //6.1对当前Phaser加锁并double check,避免同时调用.加锁失败的线程将在后续进入2的逻辑. synchronized (this) { //double check state未发生改变. if (state == s) { //6.2首先向父Phaser注册1. phase = parent.doRegister(1); if (phase < 0) //发现进入终止态,直接中止. break; //6.3向父Phaser注册成功,循环尝试cas掉老的state,新state的算法同上,phase加adjust. //在整个while循环中,再也不考虑phase进入终止态的状况,由于这些操做处于同一个"事务"中, //且因竞态等缘由,若某次cas时计入了负数的phase,方法返回后也能够及时发现. while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { //若是cas不成功,则读取s为新的state,计算新的phase并从新循环. s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } //6.4cas成功后退出循环. break; } //若是if(state==s)判断失败,说明有别的线程有当前线程进入synchronized块前已经加锁并执行了内部的逻辑且稍后释放了锁, //这样当前线程加锁成功,但if判断失败,它会当即释放锁并返回到2. } } } return phase; } //使用root的phase调整this的state,更新滞后的结果.这通常发生在root前进了phase可是 //子phaser尚未作到这一步,这种状况下,子phaser必须完成这个前进的步骤,这一过程当中,phase将 //被置为root的phase,unarrived则会重置为parties,若parties为0,则置为EMPTY.返回结果state. private long reconcileState() { final Phaser root = this.root; long s = state; //不是root才进行下一步. if (root != this) { int phase, p; //cas,phase采用root,parties不变,unarrived重置为parties或EMPTY. while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && //phase滞后于root //尝试cas. !UNSAFE.compareAndSwapLong (this, stateOffset, s, //肯定新state的前32位,使用root的phase. s = (((long)phase << PHASE_SHIFT) | //新phase<0,后32位直接取this的state表示的counts. ((phase < 0) ? (s & COUNTS_MASK) : //phase有效,this的state表示的parties为0,则后32位使用empty (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : //不然,后32位使用parties. ((s & PARTIES_MASK) | p)))))) s = state; } return s; } //onAdvance勾子方法,参数为当前phase和注册的parties数. //默认实现为parties数为0,方法返回true时,调用者会尝试终止Phaser.(参考前面的doArrive).随后调用isTerminated方法将返回true. //执行此方法时抛出的运行时异常或Error将直接上抛给尝试advance相应的phase的线程,这种状况下不会发生phase的advance. //方法的入参表示的是Phaser当前的state(未advance前),所以若在onAdvance方法中执行arrive,regist,waiting这三种操做的行为是不肯定的也不可靠的. //若是当前Phaser是一个级联的成员,那么onAdvance只会由root在每次advance时调用. //方法的默认实现返回true的场景目前只能是通过数次arriveAndDeregister调用形成parties归零的结果.咱们继承Phaser能够轻易地重写此行为, //好比简单粗暴地返回false,那么将永远容许新的注册. protected boolean onAdvance(int phase, int registeredParties) { return registeredParties == 0; }
通过前面的代码分析,已经对Phaser的核心函数doRegister,doArrive有了全面的了解.
二者都会在一开始同步root的phase,且若是出现落后root的状况,同步了新的phase的同时,也会从新初始化unarrived,而且使用parties的值.
doArrive方法会每次调整unarrived数量(也可包含parties数量,若是使用了解除注册),当Phaser调用自身的arrive/arriveAndDeregister时,会作出相应的减小,并根据是否为root而决定向上递归.
Phaser减小自身unarrived信号(也可能同时有parties信号)后,若发现这已是最后一个unarrived信号,则进行接下来的判断:
1.当前Phaser是root,advance并唤醒waiter.(重要的唤醒操做执行点,root一轮完成)
2.当前Phaser不是root,且它已经不具有继续下一轮的条件(计算nextUnarrived为0,即parties已经被arriveAndDeregister置0),则从父Phaser减小一个unarrived和parties.
3.当前Phaser不是root,但它仍具备parties,知足进行下一轮的条件(计算nextUnarrived不是0),则从父Phaser减小一个unarrived,但不减小parties.
显然,子Phaser的最后一个unarrived的消失必定会形成父的unarrived减小,子Phaser不能继续下一phase的register和arrive时,从父Phaser中卸载.
若不是本Phaser的最后一个unarrived信号,则直接结束,至关于只进行了上面的减小信号操做.
doRegister方法的逻辑大体相反,不一样于doArrive,它的参数registrations同时做用于parties和unarrived,即两个位上同时加上registrations参数.它的大体逻辑:
1.当前注册并不是首次注册,且出现unarrived==0,即本轮已经完成了arrive,那么本轮将不能注册,须要等待root更新到下轮.(这也是咱们碰到的第一个阻塞)
2.当前注册并不是首次注册,unarrived也不是0,则在本phase进行注册,增长相应的parties和unarrived.
3.当前注册是root的首次注册,给root的state加上相应的parties和unarrived.
4.当前注册是非root的首次注册,加锁(this),对本身的state加上相应的parties和unarrived(同上,以registrations为单位),而对parent加上一个parties和unarrived单位.
很明显,对于单Phaser的状况很是好理解,每次减小unarrived数量(先不考虑减小parties),则最终致使Phaser自身进入下一个phase,而后从新初始化unarrived到下一轮,unarrived的新值是前一轮剩下的parties数量.
当咱们同时也尝试减小parties数量,即解除parties的注册,最终致使没有parties,那么Phaser将进入终止态.
整个过程当中,只要Phaser没进入终止态,随时能够进行新的注册,并增长parties和unarrived的数量.每一个arrive能够减小unarrived的数量为任何正整数,不必定是1.
对于多Phaser的状况,有两个特殊点:
1.对任意Phaser树中的某一个Phaser调用注册操做,会令自身加上相应参数个parties和unarrived单位,仅会在该Phaser第一次注册时增长父Phaser(极端可能,仅从一个叶子节点第一个注册的状况下可一直递归到root)的parties数和unarrived数各1单位(不论参数是多少).
2.对任意Phaser树中的某一个Phaser调用arrive操做,会令自身减去相应的参数个parties和unarrived单位,同时仅当本Phaser此时是最后一个unarrived时,会减去父Phaser的一个unarrived单位(当前子Phaser仍旧有parties能够构建下一phase),或减去父Phaser一个Parties和unarrived单位.(极端状况下,每一级都是最后一个unarrived时,减小叶子节点的最后一个unarrived会递归到root).
每新增一个子Phaser,父Phaser就会增长一个要完成触发phase的advance前必需要等到arrive的单位;每个子Phaser中全部的arrive完成,父Phaser都将减小一个要等待advance所必需触发的arrive.
目前没有看到await方法,但能够提早说明,等待操做彻底依赖于root是否完成本轮.也就是全部子Phaser都完成了同一轮(arrive打满),才能让父Phaser自己减去一个全部arrive单位,再触发父Phaser本轮的完成,此时对任何已完成的Phaser进入注册,都会进入上述的root.internalAwaitAdvance(phase, null)方法等待root进入下一phase.若是对已经完成全部arrive的Phaser继续进行arrive操做,由于unarrived已是0,则会抛出异常.
因此对于使用子Phaser的场景,若是发生很巧妙的状况,Phaser树上当前子Phaser的arrive结束条件知足了,使得新来的注册只能等待下一轮次,而其余分支的子Phaser又恰恰不能完成本轮次,那么新的phaser.doRegister方法将阻塞在此.
好在咱们使用Phaser可能会相似CyclicBarrier的使用方式,可对每一轮(phase)进行注册并等待(也许只等一轮,那么arrive就要带上deregister),每一轮最后一个线程arrive了,就会中止全部线程的等待,让全部线程继续执行,同时开启了下一轮次,这些线程此时又能够不经注册直接在新的轮次中进行等待,直到最后一个arrive了,再次唤醒全部线程并继续执行,同时Phaser再前进一轮,如此往复.中间使用arrive并deregister的线程会从本轮起减小一个unarrive数量(由于parties也减小了,因此再下一轮初始化unarrive数量时也会减小一次).咱们可让这些线程参与任意的轮次,但要注意的是,若是有线程中途不参加了,必定要解除注册,不然由于每轮初始化时,要等待arrive的数量都是上一轮剩下的parties数量,有线程中止了执行,却不减小parties数,那么下轮全部等待的线程将永远等不到phaser知足唤醒的条件.
上述的过程当中能够明显的看出,目前已介绍的两个重要核心函数:注册和arrive并无直接记录和操做线程的操做,相应的操做在等待方法和唤醒方法中(前面提到过release),咱们稍后介绍.
如今假设一个特殊的使用场景,也能够区别于CyclicBarrie和CountDownLatch的使用.仍是上面的例子,可是咱们准备的线程数与Phaser的parties数/unarrived数不一样(通常前者要多些),会发生什么事?
首先建立了Phaser,不指定最初parties数,并用每一个线程去注册(我甚至能够用一个线程去重复注册,每次的参数registrations还能够不一样,注册的做用并非将当前线程压入队列,而是为本phase设置一个unarrive数量,以控制到达下个phase前必须有多少次arrive的发生),则parties数和unarrived的初值彻底与此有关,是一个依托于咱们随意注册而产生的随意值.那么假定咱们的线程数量大于这个parties数(假定调用注册方法的线程和arrive及等待的线程无关),并令有的线程执行arrive(彻底能够一次arrive减去多个信号量,甚至一个线程屡次arrive),有的线程执行await等待信号advance到下一个phase(一个线程在一个周期只能调用一次),有的线程执行了arrive也等待phase前进(这种状况一个线程一周期也只能一次.其实这些分别对应了还未介绍的arrive,waitAdvance,arriveAndWaitAdvance等方法),单独进行await操做的线程能够是任意数量,执行arrive方法的线程加上执行arrive并wait的操做的线程和必须超过unarrived,这才能唤醒等待线程.
目前这些还比较抽象,等到咱们看过相应的几个方法便了然了.
onAdvance的方法默认实现就是判断本阶段注册的parties数量,若是已是0则说明没有parties了,Phaser应该结束.可是咱们其实能够从新实现,好比参数中同时传入了当前的phase,我能够规定上面的例子中phase最多只有3轮次,那么不论何时arrive,发现了当前phase已进入3轮,Phaser就被终止.固然,这一过程是由root执行的,可是子Phaser的phase会在每次注册和arrive发生时同步root,所以本例中对于phase数的判断能够粗放到全部Phaser,对于parties数则只能做用于root(事实上调用onAdvance的必定是root).
接下来看全量构造方法和若干和上面有关的公有函数.
//初始化一个Phaser,指定parent,指定未到来的参与者数(unarrived parties),但这只是一个初值, //当咱们在任什么时候候调用注册方法时,还会相应的增长. public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) //太大了,超过了后16位能表示的整数. throw new IllegalArgumentException("Illegal number of parties"); //初始phase为0. int phase = 0; this.parent = parent; if (parent != null) { //1.有parent的状况,共享parent的root,队列,并向parent中注册1个parties和unarrived, //同时同步一次phase(表面上是同步了parent的,实际上前面已经看过,会同步root). final Phaser root = parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) phase = parent.doRegister(1); } else { //2.无parent的状况,root就是this,并初始化奇偶等待队列.它使用原子引用的形式存放一个QNode,而QNode咱们前面已介绍. this.root = this; this.evenQ = new AtomicReference<QNode>(); this.oddQ = new AtomicReference<QNode>(); } //统一初始化state,后32位的决定依托于parties,若是parties是0则给予EMPTY,直接无论高32位. //不为0则给予phase设置为前32位,parties设置parties位和unarrived位. this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties); } //注册方法,就是调用doRegister,参数1. //它会向this添加一个unarrived的party,若是正巧root正在进行advance,它须要等待下个phase. //若是this有parent,且它以前没有过注册的parties,则首次注册会触发自身向parent的注册. //若是this已经终止了,那么尝试注册将会无效并返回负值.若是注册的数量大于了最大支持parties(后16位整数), //会抛出IllegalStateException public int register() { return doRegister(1); } //批量注册指定的信号量,并返回最新的phase.规则基本同上. public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) //参数0直接查询最新的phase返回 return getPhase(); return doRegister(parties); } //arrive一个信号,不等待其余arrive事件,返回最新phase(终止态为负). //当前Phaser的arrive事件已满,则对parent来讲也会触发一个arrive.(若是有parent) public int arrive() { return doArrive(ONE_ARRIVAL); } //arrive并解除一个注册parties,也不阻塞等待其余arrive.若是当前Phaser的解除注册操做 //将parties减至0,且this有parent,这将致使parent也减小一个parties(本phaser解除在parent的注册). public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }
接下来要看上面已经作足了铺垫的等待方法了,并结合前面的队列一块看.
//令当前线程"到达"此phaser并等待其余parties,它等效于awaitAdvance(arrive()). //注意,按照道格的注释,若是你在一个未进行注册(调用register)的线程里调用此方法实际上是一个使用错误, //可是从本方法和前面以及后面有关的方法来看,全部记录线程的方法均只与arrive和等待有关,与注册无关. //所以Phaser自己没法规避这种使用错误,咱们彻底可使用另外一个线程去注册,而当前线程去arrive,将两个动做分开. //方法会返回arrive时最新的phase号.终止时会是负值. public int arriveAndAwaitAdvance() { //记录root,开始循环. final Phaser root = this.root; for (;;) { //1.预计算,首先同步state long s = (root == this) ? state : reconcileState(); //计算phase int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //已终结直接返回最终phase. return phase; //计算counts,unarrived int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) //已经没有空余的unarrived信号了,不能再调用arrive,抛出异常. throw new IllegalStateException(badArrive(s)); //2.减余arrive的有关逻辑.尝试cas减去一个arrive if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) //2.1当前要减的信号不是本Phaser的最后一个信号量,调用root的等待方法.参数2是node,传空. return root.internalAwaitAdvance(phase, null); if (root != this) //2.2当前要减的信号量是非root的Phaser的最后一个,递归给parent(虽然用了return,可是parent也可能在进入2.1后阻塞). return parent.arriveAndAwaitAdvance(); //2.3当前要减的信号量是root的最后一个. //2.3.1准备计算下一个状态,先取出state的parties信息. long n = s & PARTIES_MASK; //计算nextUnarrived,它是如今的parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; //2.3.2前进phase逻辑. if (onAdvance(phase, nextUnarrived)) //须要终止,给新state的计算基石n加上终止标记. n |= TERMINATION_BIT; else if (nextUnarrived == 0) //计算的nextUnarrived是0,即没有parties,加上空标记位. n |= EMPTY; else //下一轮能正常进行,加上nextUnarrived位. n |= nextUnarrived; //2.3.3给n加上下一个phase. int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) //用n进行cas不成功,将新的phase返回. //说明一下,由于方法执行到此前已经执行过2的入口cas,减去了最后一个unarrived,所以在2到此的过程当中如有新的注册, //它内部会读到0个unarrived,就会等待下一个phase(参考前面介绍过的注册方法),所以cas失败不会是由于2以后有新的注册. //在整个arrive系列的方法中,最后一次arrive发生后,本Phaser不可能有其余线程再去执行相似2处的减余的状况. //故出现这种状况的缘由目前来看有二,一是还未介绍的强制关闭Phaser的方法,此时也会突兀地改掉state形成cas恰巧失败,二是 //出现一些用户作出的奇葩行为,好比重写了其余公有方法.咱们天然忽略第二种状况,doug大神也是简单注释了一个"terminated". return (int)(state >>> PHASE_SHIFT); // terminated //cas成功,释放等待队列中的线程,返回下一个phase(由于在此过程当中的register会等到advance,此时的phase已是nextPhase了). releaseWaiters(phase); return nextPhase; } //3.减余失败说明出现竞态,直接开启下轮循环从新减余. } } //等待当前Phaser从给定的phase前进结束,若是当前phase不等于给定的phase,或者Phaser已终止当即返回. //1.传入phase为负,返回它自己. //2.传入的phase不是最新的phase,返回最新的. //3.传入了最新的phase,等待到advance并返回advance后的phase. public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) //匹配成功,等root前进.参数node为null return root.internalAwaitAdvance(phase, null); return p; } //参考前面的几个方法,区别是可扰动. public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) //1.参数phase小于0直接返回它自己. return phase; if (p == phase) { //2.参数phase匹配,回忆一个前面介绍的QNode,匹配当前Phaser和phase,配置为可扰动且不计时. QNode node = new QNode(this, phase, true, false, 0L); //3.放入root的等待队列阻塞. p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //4.等待结束,判断是不是扰动形成的结束,前面介绍过QNode的相关逻辑, //它实现了ForkJoinPool.ManagedBlocker,所以在managedBlock方法进行时, //会循环调用问询是否能release,当咱们配置了可扰动且扰动了,就会标记这个wasInterrupted,释放线程引用并返回. //发现此种状况抛出异常. //同时,当发现等待成功,也会结束,释放线程引用并返回,但不带有扰动标记. throw new InterruptedException(); } //5.返回1处以前读取的phase或3处获得的最新phase值. return p; } //同上方法,但带有计时. public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) { //不一样于上面方法的地方,创建的QNode带有计时和等待时长. QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //被扰动的状况. throw new InterruptedException(); else if (p == phase) //时间到了phase没有前进,超时. throw new TimeoutException(); } return p; }
前面的几个核心方法粗略过完,补充一些重要内容.
首先在前面曾分析过有线程阻塞等待下一个phase的状况,并无加上定时等待的考虑.在超时的状况下,阻塞的线程可能会收到异常并退出.
创建QNode能够限定是否认时和可扰动,这取决于咱们使用哪一个方法去await.
除最后一个线程arrive外,全部线程调用这些方法都会减小一个arrive并加入等待队列,直到(1)配置了定时且超时,(2)当前是可扰动等待且使用了Thread.interrupt(),(3)最后一个线程使用上述方法或arrive方法,使得Phaser前进了一个轮次,internalWaitAdvance结束.其中(1)(2)均会迁成arrive线程抛出异常,只有(3)才是正常的状况.
QNode前面已介绍,它是一个blocker,须要调用ForkJoinPool::managedBlock才会起做用(显然root的internalAwaitAdvance必然与此方法有关联).固然这个做用与任务是否运行在ForkJoinPool无关,若是等待phaser前进的线程是运行在ForkJoinPool中的ForkJoinWorkerThread,显然会在internalAwaitAdvance期间进行补偿.这一块可参考前面的"CompletableFuture与响应式编程"和"ForkJoin框架之ForkJoinPool"两篇文章.
另外,这些代码也再次说明了root的做用: (1)对一切非root的Phaser进行等待都会用root的internalAwaitAdvance;(2)每次注册或arrive必定会同步root的最新phase.
其中(1)也间接说明了为何构建Phaser时只有root建立等待队列,全部子Phaser共享.
上面还保留了一个疑问,提到了"强制关闭Phaser"形成arriveAndAwaitAdvance出现cas失败的问题,doug大神直接注释了一个terminated,咱们立刻来看这一块,以及一些周边的公共函数,加深理解,而后再来解决关于等待队列最后的一些问题.
//强制关闭Phaser,让Phaser进入终止态,可是这个过程不影响它已注册的parties,若是此Phaser是 //一个Phaser树中的成员,那么全部phaser集中的Phaser都会关闭,若是它已经关闭,此方法无效.此方法能够 //用于若干任务出现意料以外异常的状况下的协调恢复. public void forceTermination() { // Only need to change root state final Phaser root = this.root; long s; //已经是终止态直接忽略. while ((s = root.state) >= 0) { //直接尝试给root的state加上终止位.显然加上了它,子Phaser在注册和arrive等方法同步回新的phase就是个负数, //所以更改root的phase为负至关于判了全部Phaser的死刑.惟一须要解决的是已经阻塞在root.internalAwaitAdvandce的线程. if (UNSAFE.compareAndSwapLong(root, stateOffset, s, s | TERMINATION_BIT)) { // 加上终止位成功,前后唤醒偶数等待队列和奇数等待队列. releaseWaiters(0); // Waiters on evenQ releaseWaiters(1); // Waiters on oddQ //返回 return; } } } //返回当前phase,直接用root的state去取. public final int getPhase() { return (int)(root.state >>> PHASE_SHIFT); } //查询注册的parties数量.调用前面介绍过的partiesOf public int getRegisteredParties() { return partiesOf(state); }
//查询已经arrived的parties数量.调用介绍过的arriveOf
public int getArrivedParties() { return arrivedOf(reconcileState()); } //查询未arrive的parties数量,调用前面介绍过的unarrivedOf public int getUnarrivedParties() { return unarrivedOf(reconcileState()); } //返回parent public Phaser getParent() { return parent; } //返回root public Phaser getRoot() { return root; } //判断当前Phaser是否终止,直接取root的state是否为负,可见,终止态彻底取决于root. public boolean isTerminated() { return root.state < 0L; }
这些方法都比较简单,只有forceTermination须要再强调一翻,前面介绍arrayAndAwaitAdvance时曾提过在减去最后一个unarrived信号后去cas到下一个phase失败的状况,doug大神简单注释了一句terminated,直接返回了当前的phase(显然只能是负),在周边方法重重加锁的前提下,那一次cas的失败惟一一处就是强制关闭,由于它只改关闭标记位,至关于动了phase,而没有动unarrived标记位和parties标记位.因此重写Phaser的方法要谨慎,极可能不当心打破了这个封装.
从上面的有关方法能够看出,子Phaser的终止态严重依赖于root,目前能够肯定的是root的phase一旦表现出终止态,全部新来的注册,arrive,arrive并await将会当即返回,惟一须要关注的就是root被设置了终止标记后,正陷入等待的线程怎么办的问题.
咱们下面就来看Phaser的等待机制,这里面又能见到道格大神很是有趣的玩法.
//工具方法,移除某个phase的等待者. private void releaseWaiters(int phase) { QNode q; //保存队列中的队首 Thread t; // 保存线程引用. //取队列,用phase的奇偶决定,phase是偶数就取偶数队列,不然取奇数队列.而这个phase其实只用来取队列了,后续的操做与它无关. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; //循环,找出全部phase不等于root的phase的(其实root是最大的,因此就是找出非最新phase加入进来的waiter QNode) while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { //找出了,利用原子引用将head指向next. if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { //发现阻塞者,唤醒线程.回忆下前面实现blocker方法中的isReleaseble和block方法都有将线程置空的操做.(三种状况,唤醒扰动超时都会置空) //可是那些方法并无将表明该阻塞线程的QNode移除队列,所以可能会发现thread已是null(表明无阻塞者)的状况,只须要移除队列便可. q.thread = null; LockSupport.unpark(t); } } } //上面releaseWaiters方法的一个变种,但它只会处理遍历过程当中位于头部的元素,出现正常的等待节点就会当即返回. //此方法在这一块能够有效的减小内存的占用.退出时返回当前的phase. private int abortWait(int phase) { //一样,参数phase只是用来选择要处理的队列. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); //计算最新phase的值p int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) //1.出现q为null表明整队列元素已出队,直接返回p; //或者在出队过程当中head(q)记录的线程引用还在,说明未超时或扰动,且是本phase的等待节点,终止循环并返回最新phase. return p; if (head.compareAndSet(q, q.next) && t != null) { //进入条件,参考1的条件,由于1会直接返回.故进入2的条件实际上是q非空且处于旧的phase.只有这种状况才能够出队. //2.将q出队,置空线程引用,释放线程. q.thread = null; LockSupport.unpark(t); } } } //计算有效cpu,控制自旋. private static final int NCPU = Runtime.getRuntime().availableProcessors(); //常量,每轮arrive等待的字旋数,取决于NCPU,小于2则取1,不小于2取2的8次幂. static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; //珊珊来迟的内部等待方法.它可能会一直阻塞到phase的advance发生(除非取消了等待). //此方法仅限root调用.参数phase表示当前的phase,参数node表示等待节点,用于追踪节点的扰动或超时. //若是是null,表示是一次不可扰动的等待.返回值为当前最新的phase. private int internalAwaitAdvance(int phase, QNode node) { // 1.调用releaseWaiters,传入参数phase的前一个phase,显然这只是决定释放哪个队列.参数绝对实时准确的状况下会先将老的队列释放掉. releaseWaiters(phase-1); //节点入队标记,入队了就会变为true boolean queued = false; //记录每一轮循环的unarrived数量,用于决定是否扩增自旋等待次数. int lastUnarrived = 0; //自旋数,参考上面的计算逻辑. int spins = SPINS_PER_ARRIVAL; long s; int p; //开启循环,直到phase前进为止或内部判断已取消等待. while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { //2.传入node是null,即非可扰动的模式逻辑.只有非可扰动模式才有自旋. if (node == null) { //2.1每轮自读进入都会尝试计算新的unarrived,若是发现出现了变更(变大或者变小), //会将它保存到前面的lastUnarrived. int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) //发现新变化的unarrived<NCPU,扩增自旋次数,继续自旋. //unarrived的变化,若没有大量新的parties注册,会在自旋过程当中变小,反之大量加入注册,大于了NCPU则放弃增长自旋次数. spins += SPINS_PER_ARRIVAL; //2.2,未发现本轮循环unarrived发生变化,或者增长了大量注册,形成大于NCPU的逻辑,首先记录此时的线程扰动状态. boolean interrupted = Thread.interrupted(); //2.3接2.2,若是发现了线程被扰动了,或者经若干次自旋减小次数,自旋次数并未能在2.1进行增长,直至减为0,进入if. if (interrupted || --spins < 0) { // need node to record intr //2.4,知足2.3进入if的条件,再也不继续自旋了,由于参数没有提供node,此处初始化一个node,不定时,不可扰动,并保存扰动状态. //下轮循环将没法进入2. node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } //3.参数传入了node,或者在2.4进入了node的初始化,每一轮循环到此都先判断是否可释放(若能够,内部会置thread为null). else if (node.isReleasable()) //发现node所处的phase已经达到或者取消了,则break掉循环. break; //4.未能在非扰动模式下自旋解决(2)或提早发现node的扰动且未将node入队的状况下,将node入队. else if (!queued) { //选择当前phase表明的队列. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); //这一行不起眼的if条件代码真的是一个悄无声息解决了一个大暗坑的地方,后面说. if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) //double check避免脏入队,入队条件是(1)无头,(2)或者头元素的phase等于参数phase(由于相邻的两个phase绝对不会入同一个队). //知足(1)(2)的同时,还要知足(3),参数phase就是当前的state表示的phase(由于此方法只能root使用,故为root表示的最新phase). //条件知足,入队,取代原来的head,原来head表明的node成为node的next.而条件不知足进入下一循环,极可能while条件就不知足了退出循环. queued = head.compareAndSet(q, node); } //5.已经在某一轮循环入队了,使用ForkJoinPool的managedBlock管理block,其间可能会释放线程引用. else { try { //5.1它内部也有循环,且会调用前面看到过的isReleasable和block实现,显然它一旦结束(包含扰动),必定会形成下轮外循环终止于3处. ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { //5.2出现扰动异常catch住,并保存.下轮循环也会终止在3处. node.wasInterrupted = true; } } } //6.走出上面的while循环,多是root已经advance到下一个phase(2前的循环),也多是传入node的状况下出现了扰动或超时(5)形成(3)知足 if (node != null) { //6.1node存在表明可能已经压入队列,结果要么是已出现扰动或超时(方法结束后会抛出异常),要么是已正常完成. //显然,代码执行到此处就要返回了,阻塞的线程会抛出异常结束(超时或扰动)或继续执行(正常advance), //没有必要去尝试唤醒能执行出前面while循环到达6立刻要返回的线程. if (node.thread != null) //6.2取消node中的线程引用,避免外面的线程尝试唤醒. node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) //6.3若是node自己设置了不可被扰动,但5.2处判断线程自己抛出了扰动异常,却被catch住了,此处扰动本线程. Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) //6.4发现phase并未前进.仍是参数传入的pahse,说明必定是扰动或超时的结果,abortWait对本phase使用的队列进行清理, //而清理的目标前面已论述过,是本队列头部开始的早于本phase的元素.(发现一个不知足条件的就中止了清理). return abortWait(phase); // possibly clean up on abort } //7.退出上面的while循环必定会到此帮助释放早于最新阶段的waiter.注意,是早于最新phase的,参数phase只是决定了选哪一个队列(奇偶). //若是是6.4表明的那种扰动超时状况,此处其实释放的是旧的结果.被唤醒的线程其实通常是执行在5.1处阻塞的.当前线程能运行到此绝对不须要唤醒. releaseWaiters(phase); return p; }
到此Phaser的代码解析已完毕,咱们来分析关于队列,等待和唤醒的问题.
1.Phaser维护了两个"队列",不论加入等待队列仍是弹出等待队列,都是从头部进行,新加入的成员会成功队列的新头,原来的头会成为它的next,弹出时next成为新头.因此至关于一个对头部的"后进先出",考虑官方起名和注释,咱们依旧保持队列这个称呼.
2.唤醒时,会从队列的头部依次弹出node 的phase早于root的最新phase的node,.
3.等待时,入队的node成为新的头.
4.当轮次增长时,会使用和本轮不一样的队列增长元素,同时也会唤醒本轮中等待的node.
由于唤醒和等待同时进行,且各自操做各自的队列(不一样的phase),所以彼此之间没有竞态(尽管一个是头入一个是头出),能够说设计巧妙,下面咱们来脑洞大开,思考一个极端状况.
咱们假设一种极端的phase切换场景,奇数phase大量等待入队,偶数phase则迅速完成.假设当前phase对应的队列是奇数对列,轮次提高完成后,它去释放当前的队列元素,结果未等这个释放操做执行完毕,偶数队列的轮次很快执行完,奇数队列中积压了成千上万个node未能释放,轮次却又切回到了奇数队列,会出现什么事?
显然奇数队列若是一直保持这种极端场景,它会愈来愈庞大,逼近撑爆内存的同时,大量线程也会得不到释放,甚至于老一轮的线程须要等待新一轮的线程去释放.为何老一轮的线程会去等待新一轮的线程释放呢?
releaseWaiter的方法咱们已经看出,它只会释放phase早于最新的node,此时最新压入的元素属于当前最新的phase,显然不知足条件,那么会形成奇数队列中两轮前压入的元素不能获得清除,两轮前就在释放当时积压node的线程(那一轮最后一个arrive)发现不符合清理条件,就直接return并终止了,只能等待本轮最后一个arrive出现后继续进行释放.若是本轮最后一个arrive出现很晚,在下一轮依旧保持如此极端,往返数轮,确实会致使奇数队列中积压大量node,且第一轮就在等待该轮次结束的线程早就知足了释放条件(升到了2轮),事实上多是第n轮才获得释放,这还符合Phaser的定义吗?咱们使用它,就是要保证每一轮单独使用,每一轮次达到条件,线程释放并执行,下一轮次是下一轮次.
然而doug的代码就是这个样子,想遍各类极端,以为可能找到了bug,那么就须要仔细思考了.做者来简述一下这个趟坑的分析过程.
这个问题确实已经获得了极大的规避了,毕竟是个极端状况.
1.线程的唤醒真的很快,尽管此处除了唤醒还包含了原子引用的更新(每次出队都要cas).
2.若是没有注册,显然就没有arrive相关的状况,尽管能够单独调用,但必须保证在arrive时传入的数量此时已经注册了,所以每一轮次(phase)中可能积压等待唤醒的线程的操做必定是在注册以后,可是咱们回忆一下,注册方法的第一步就是要等待完成advance,并且传给internalAwaitAdvance的node会是null,即不能扰动和超时,因此当本轮次阻塞了必定数量的线程后,若是不去arrive,也不考虑超时和扰动的状况,那么线程将一直阻塞.咱们不可能在轮次advance前进行注册,也就不可能在advance以前进行新一phase的arrive.
3.当本轮次的最后一个arrive线程触发了轮次的更新后,才能够开启注册以及新轮次的arrive,可是此时使用了另外一个等待队列,而触发了轮次更新的上一轮的arrive线程将会当即进行前一个队列中积压的线程的唤醒操做.只有该唤醒操做足够慢,且新的轮次极快就完成了的状况,才可能形成在原arrive线程未能及时释放奇数队列的状况下,新一轮次再次向其中添加元素.
4.最重要的还在上面的internalAwaitAdvance方法,那一段被做者标上了入队条件的注释处,要想入队,必须if ((q == null || q.phase == phase) &&加上后面的条件,而这两个条件的限定已经很明显,要想入队,必须知足该等待队列没有元素或者队首是本轮的元素,而该方法又是下一轮首次注册时必须等待完成的,下一轮的arrive又必须发生在下一轮的首次注册以后,所以根本不会出现本轮wait的线程还要等下一轮甚至下N轮的线程去释放的极端状况,哪怕真的去作一个极端测试:让奇数轮大量积压线程,让偶数轮快速切换,而后测试第一轮压入的线程究竟是不是本轮释放的.(做者差点就要写单元测试去作这个极端测试了!)
这一段不经意的if,一个小小的条件,若是不注意真的忽略了,小代码大功效,谁能想到,这么深的暗坑就这样被规避了.
前面已经详述了Phaser的源码以及若干趟坑辛路.其实已经没什么好总结的了,就在此顺便对比常见同步器CyclicBarrier,CountDownLatch,Semaphore的特征和实现.
从使用特征上看:
1.CountDownLatch是一次性的,只能初始化决定parties数量,等待者能够是多个,每次释放都会减小一个信号量,直到归0时为止,最后一个释放者将唤醒其余等待的线程.它也不能继续使用.
2.CyclicBarrier是可重用的,分代的,每一代之间彼此独立,可是每一代的初始parties是相同的,不可在运行期内动态调整,每一代最后一个线程会去开启一下代,并能够在此时运行一个用户指定的action,与此同时唤醒其余线程继续执行.它能够在运行完一代后继续被使用.而且它还支持重置.
3.Semaphore是一个资源量的典型,若是说CountDownLatch和CyclicBarrier或者Phaser都是等到"人够了"再放行,Semaphore倒是起到限流的做用,它控制了有限的令牌数,这个数量不能够动态地更改,在不能acquire到足够的令牌数时,线程将阻塞,直到其余线程释放了足量的令牌数并唤醒它为止.每个持有了令牌的线程均可以唤醒阻塞等待获取的线程.
4.Phaser的功能上不一样很明显,首先它的参与者数量几乎时刻可变(除了正在进入下一phase期间),随时能够增长减小parties数量,每一phase等待者能够是多个,每一phase中,每一个能从internalAwaitAdvance方法中走出循环的线程均可以帮助唤醒,固然最终能进入唤醒操做仍是要归功于最后一个arrive的线程(尽管它arrive后其余线程醒来后也会帮助唤醒).Phaser的唤醒者不必定是参与者.
从实现来看:
1.CountDownLatch借助了aqs来实现parties的释放,它使用cas+park的方式,不使用Lock.
2.CyclicBarrier须要借助重入锁和condition,每个await的线程都要全局加锁,阻塞时await在condition上.
3.Semaphore在实现上相似CountDownLatch,也是基于aqs,只不过它容许获取和释放,对state有增有减,总量不变.也是cas+park的方式阻塞,也不使用Lock
4.Phaser由于功能的要求,不基于AQS(它不能有构建时就固定的state,尽管能够初始化一个state,但它必须支持改变),它依托于原子引用实现了一个内部的队列,相应的等待/入队/唤醒等操做经过cas自旋+park的方式,一样不使用Lock.并利用双队列的方式规避了前一轮的释放和后一轮的响醒的阻塞.
此外还有两点结合前面的推理和自测验证的结论:
1.Phaser中的每个phase是保证了可见性的,经做者自测,在任何使用Phaser的代码中await先后,不会出现串phase读出的乱序状况(侧面说明每一个phase不会依赖后一个或几个phase的释放).
2.Phaser须要对await的线程进行阻塞时,是将它打包成一个node(blocker),利用ForkJoinPool来block的.若是使用Phaser同步的任务是运行在ForkJoinPool中的,它将会利用到相应的补偿机制,经做者自测,这将保证Phaser中block的每个任务必然获得执行,每个阻塞的线程必然获得释放.