Phaser源码分析

1、简介

          Phaser是java7中引入的,是并发包中提供的线程同步辅助工具类,是个阶段屏障, 全部parties线程需等待此阶段屏障的全部线程都到达,才能进入下一阶段屏障持续执行。 CyclicBarrier、CountDownLatch、Phaser 三个都是线程同步辅助工具类,同步辅助三剑客。CountDownLatch不能重用,CyclicBarrier、Phaser均可以重用,而且Phaser 更加灵活能够在运行期间随时加入(register)新的parties,也能够在运行期间随时退出(deregister),支持层级,根root Phaser,父Phaser把每一个子的 Phaser(每一个子的内部也有多个parties线程,也有多阶段) 当作父Phaser的一个parties,父Phaser等待全部的parties都到达父的阶段屏障, 即子Phaser的全部阶段都执行完,即子Phaser都到达父的阶段屏障。唤醒全部的子Phaser的parties线程继续执行下一阶段。 CyclicBarrier也能够作为阶段屏障使用,每一个线程重复作为CyclicBarrier的parties,可是没办法想Phaser那样支持层级。 例如比赛,一个比赛分为3个阶段(phase): 初赛、复赛和决赛,规定全部运动员都完成上一个阶段的比赛才能够进行下一阶段的比赛,而且比赛的过程当中容许退赛(deregister),这个场景就很适合Phaser,这个例子的话CyclicBarrier也能够实现,若是是更复杂如奥运闭幕(父Phaser),需等待各类比赛结束,如跳远(子Phaser,也有多个阶段,初赛、复赛和决赛),田径(子Phaser,也有多个阶段,初赛、复赛和决赛) 等,Phaser就更加适合,你也能够把重造轮子写个CyclicBarrier子类,改形成相似Phaser支持层级的功能。CyclicBarrier能够看个人另外一篇源码分析 juejin.im/post/5d3bf8…,CountDownLatch能够看个人另外一篇juejin.im/post/5d3593…。  java

2、属性

/** * Phaser中的状态,64位的属性state不一样位被用来存放不一样的值,低16位存放unarrived,低32位中的高16位存放parties,高32位的低31位存放phase,最高位存放terminated,即Phaser是否关闭 * * unarrived -- 还没到达阶段屏障的参与者数量,能够在运行期间增长,属性state的低16位存放 (bits 0-15) * parties -- Phaser中的参与者,能够在运行期间增长,低32位中的高16位存放parties,每次进入下一阶段unarrived会被从新赋值为parties值 (bits 16-31) * phase -- 记录Phaser的当前阶段,只有全部的参与者都到达阶段屏障,才能进入下一阶段屏蔽,高32位的低31位存放phase(bits 32-62) * terminated -- 属性state最高位存放terminated,即Phaser是否关闭 (bit 63 / sign) */
private volatile long state;
//Phaser中的最大参与者值
private static final int  MAX_PARTIES     = 0xffff;
//Phaser中的最大阶段值,属性state的高32位的低31位存放phase记录Phaser的当前阶段的最大值private static final int MAX_PHASE = Integer.MAX_VALUE;
//属性state的低32位中的高16位存放parties,即Phaser中的参与者,在下面获取parties时,须要用到的移位操做值
private static final int  PARTIES_SHIFT   = 16;
//属性state的高32位的低31位存放记录Phaser的当前阶段值,在下面获取phase时,须要用到的移位操做值
private static final int  PHASE_SHIFT     = 32;
//属性state的低32位中的低16位存放未抵达当前阶段屏障的参与者个数,与UNARRIVED_MASK值&得未抵达当前阶段屏障的参与者个数
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
//属性state的低32位中的高16位存放parties,即Phaser中的参与者,与PARTIES_MASK值&得屏障的参与者个数
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
//属性state的低32位中的低16位存放未抵达当前阶段屏障的参与者个数,低32位中的高16位存放parties,即Phaser中的参与者,与COUNTS_MASK值&得unarrived和parties两部分值 
private static final long COUNTS_MASK     = 0xffffffffL;
//属性state最高位存放terminated,即Phaser是否关闭,若是想关闭屏蔽state属性值或(|)上TERMINATION_BIT值
private static final long TERMINATION_BIT = 1L << 63;

//阶段屏障的一个参与者到达,即unarrived值减1
private static final int  ONE_ARRIVAL     = 1;
//一个参与者,属性state的低32位的高16位存放parties,只在ONE_DEREGISTER中使用到 
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
//屏障的一个参与者取消注册,即一个参与者退出屏障,即属性state值的低32位的高16位parties值减1,低16位unarrived值减1
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;

//若是在构造Phaser时,传入进来的参与者个数parties等于0,属性state的初始值
private static final int  EMPTY           = 1;

//Phaser支持层级,父Phaser把每一个子的Phaser(每一个子的内部也有多个parties线程,也有多阶段)当作父Phaser的一个parties,父Phaser等待全部的parties都到达父的阶段屏障, 即子Phaser的全部阶段都执行完,即子Phaser都到达父的阶段屏障。唤醒全部的子Phaser的parties线程继续执行下一阶段
private final Phaser parent;

//不论是有层级仍是无层级的Phaser的根Phaser root都是同一个,没有层级是自身,有层级就是父的root
private final Phaser root;
//偶链表,Phaser屏障是有多个阶段,为了防止竞争,偶数的阶段采用偶链表,一个参与者在到达阶段屏障时,还有其余参与者还未到达,自旋一段时间,其他参与者还未到达,将其封装成节点加入链表中
private final AtomicReference<QNode> evenQ;
//奇链表
private final AtomicReference<QNode> oddQ;
//得到可用的处理器个数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//一个参与者在到达阶段屏障时,还有其余参与者还未到达,参与者线程不是直接进入等待状态而是先自旋一段时间,自旋值根据处理器个数
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
//UnSafe的使用,若是不清楚的话,能够看https://juejin.im/user/5bd8718051882528382d8728/shares中的UnSafe介绍
private static final sun.misc.Unsafe UNSAFE;
//属性state的相对偏移量,相对Phaser实例的起始内存位置的相对偏移量,定义成静态的缘由是,属性的相对实例的偏移量都是相等的
private static final long stateOffset;
static {
        try {
            //获取UnSafe实例
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            //Phaser的Class信息
            Class<?> k = Phaser.class;
            //使用UnSafe实例获取Phaser类的属性state的相对偏移量
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
        } catch (Exception e) {
            throw new Error(e);
        }
}复制代码

3、内部类

//一个参与者在到达阶段屏障时,还有其余参与者还未到达,自旋一段时间,其他参与者还未到达,将其封装成节点QNode加入链表中
static final class QNode implements ForkJoinPool.ManagedBlocker {
        //Phaser实例,需拿到state的阶段值phaser,因为state用volatile修饰,java内存模型加上happen-before,保证state的写对后续的读可见
        final Phaser phaser;
        //当前节点所处的屏障Phaser阶段
        final int phase;
        //当前等待节点线程是否可被中断
        final boolean interruptible;
        //当前等待节点线程是否超时等待其余未到达阶段屏障参与者线程到达阶段屏障 
        final boolean timed;
        //当前等待节点线程在等待的过程当中是否被中断
        boolean wasInterrupted;
        //当前等待线程若是支持超时等待,等待的超时时间,单位纳秒
        long nanos;
        //当前等待节点等待的超时的时间点
        final long deadline;
        //当前等待节点的线程,可能为空,当前节点被取消等待
        volatile Thread thread; // nulled to cancel wait
        //因为QNode是个链表,当前等待节点的下一等待节点
        QNode next;
        
        //QNode构造函数,传入phaser实例,phase当前等待节点所处的阶段,interruptible当前节点线程是否可被中断,timed当前等待节点线程是否超时等待,nanos当前等待线程若是支持超时等待,等待的超时时间
        QNode(Phaser phaser, int phase, boolean interruptible,
              boolean timed, long nanos) {
            this.phaser = phaser;
            this.phase = phase;
            this.interruptible = interruptible;
            this.nanos = nanos;
            this.timed = timed;
            this.deadline = timed ? System.nanoTime() + nanos : 0L;
            thread = Thread.currentThread();
        }
        
        //当前等待节点是否被释放
        public boolean isReleasable() {
            //若是等待节点的线程为空,代表当前等待节点被释放
            if (thread == null)
                //返回true,代表当前等待节点被释放
                return true;
            //若是当前等待节点的屏障阶段和当前的phaser的阶段不一致,代表当前等待节点被释放 
            if (phaser.getPhase() != phase) {
                //将等待节点对应的线程置为空
                thread = null;
                //返回true,代表当前等待节点被释放 
                return true;
            }
            //若是节点对应的线程被中断
            if (Thread.interrupted())
                //wasInterrupted置为true,代表节点对应线程被中断
                wasInterrupted = true;
            //若是节点对应线程被中断,而且当前节点支持线程可中断 
            if (wasInterrupted && interruptible) {
                //将等待节点对应的线程置为空
                thread = null;
                //返回true,代表当前等待节点被释放
                return true;
            }
            //当前等待节点线程超时等待其余未到达阶段屏障参与者线程到达阶段屏障 
            if (timed) {
                //属性nanos超时时间大于0
                if (nanos > 0L) {
                    //节点的超时时间点减去当前时间,得到新的超时时间
                    nanos = deadline - System.nanoTime();
                }
                //若是属性nanos超时时间小于0,代表当前等待节点线程等待其他参与者线程到达阶段屏障超时
                if (nanos <= 0L) {
                    //将等待节点对应的线程置为空
                    thread = null;
                    //返回true,代表当前等待节点被释放
                    return true;
                }
            }
            //返回false,代表当前等待节点还未被释放
            return false;
        }
        
        //阻塞节点线程,线程被唤醒时,判断节点是否被释放
        public boolean block() {
            //调用上面的isReleasable判断节点是否被释放 
            if (isReleasable())
                //返回true,节点被释放
                return true;
            //若是节点线程不支持超时等待其他参与者线程到达阶段屏障,LockSupport.park阻塞节点线程,直到调用unpark唤醒
            else if (!timed)
                //调用LockSupport阻塞节点线程
                LockSupport.park(this);
            //若是节点支持超时等待其他参与者线程到达阶段屏障,而且超时时间大于0
            else if (nanos > 0L)
                //调用LockSupport.parkNanos超时的等待其他参与者线程到达阶段屏障,直到超时,或者调用unpark唤醒
                LockSupport.parkNanos(this, nanos);
            //调用上面的isReleasable判断节点是否被释放 
            return isReleasable();
        }
}复制代码

4、构造函数

/** * 建立新的屏障Phaser实例,无参构造Phaser实例,没有注册parties参与者,而且没有父Phaser,初始屏障阶段数字0 */
public Phaser() {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为null,parties参与者初始化为0
        this(null, 0);
}

/** * 建立新的屏障Phaser实例,注册parties参与者数parties,没有父Phaser,初始屏障阶段数字0 * * @param parties 初始参与者数 */
public Phaser(int parties) {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为null,parties参与者初始化为parties
        this(null, parties);
}

/** * 建立新的屏障Phaser实例,没有注册parties参与者,传入父Phaser,初始屏障阶段数字0 * * @param parent 父Phaser */
public Phaser(Phaser parent) {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为parent,parties参与者初始化为0 
        this(parent, 0);
}

/** * 建立新的屏障Phaser实例,注册parties参与者,传入父Phaser,初始屏障阶段数字0 */
public Phaser(Phaser parent, int parties) {
        //若是传入parties是负数,或者值超过MAX_PARTIES,都会抛出IllegalArgumentException异常 
        if (parties >>> PARTIES_SHIFT != 0)
            //抛出IllegalArgumentException异常
            throw new IllegalArgumentException("Illegal number of parties");
        //初始屏障阶段为0
        int phase = 0;
        //属性值parent赋值为传入进来的父Phaser parent
        this.parent = parent;
        //若是传入的父Phaser不为空,代表当前Phaser实例支持层级
        if (parent != null) {
            //获取父的root phaser
            final Phaser root = parent.root;
            //将当前Phaser实例根root phaser赋值为父Phaser的root phaser
            this.root = root;
            //将当前Phaser实例偶链表赋值为根root phaser的偶链表 
            this.evenQ = root.evenQ;
            //将当前Phaser实例奇链表赋值为根root phaser的奇链表
            this.oddQ = root.oddQ;
            //注册parties参与者不等于0,代表有注册parties参与者 
            if (parties != 0)
                //父Phaser把每一个子的 Phaser(每一个子的内部也有多个parties线程,也有多阶段)当作父Phaser的一个parties,为此须要调用父的doRegister方法将当前子Phaser当作一个parties注册到父Phaser中,doRegister方法在下面进行介绍
                phase = parent.doRegister(1);
        }
        //若是传入的父Phaser为空,代表当前Phaser实例不支持层级
        else {
            //将当前Phaser实例根root phaser赋值为自身
            this.root = this;
            //初始化一个新的偶链表
            this.evenQ = new AtomicReference<QNode>();
            //初始化一个新的奇链表 
            this.oddQ = new AtomicReference<QNode>();
        }
        //初始化属性state,若是没有注册的parties参与者,将属性state值赋值为EMPTY
        this.state = (parties == 0) ? (long)EMPTY :
            //不然将phaser左移32位,或(|)上 ,将parties左移16位,或上parties值,即unarrived初始为parties
            ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties);
}复制代码

5、注册参与者

/** * 在Phaser运行期间注册一个参与者 */
public int register() {
        //调用下面介绍的doRegister方法注册一个参与者到Phaser实例中 
        return doRegister(1);
}

//@param registrations 注册参与者数目
private int doRegister(int registrations) {
        // adjustment to state unarrived
        //因为注册参与者,须要调整parties和unarrived的值,为此registrations需左移16位获得parties值,或(|)上registrations(unarrived值) 
        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
        //当前Phaser实例获取父Phaser实例
        final Phaser parent = this.parent;
        //记录当前屏障处于第几个阶段
        int phase;
        //循环,直到注册参与者成功 
        for (;;) {
            //若是当前Phaser实例没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),不然调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致
            long s = (parent == null) ? state : reconcileState();
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值 
            int counts = (int)s;
            //counts高16位存放parties值,低16位存放unarrived值,无符号右移16位获取到parties值
            int parties = counts >>> PARTIES_SHIFT;
            //counts低16位存放unarrived值,counts与(&)上UNARRIVED_MASK(0xffff),获取到低16位存放unarrived值 
            int unarrived = counts & UNARRIVED_MASK;
            //传入进来要注册参与者数目是否大于Phaser实例容许注册的参与者数目,即传入进来要注册的参与者数目加上已经注册的参与者数目大于Phaser实例容许的最大参与者数目MAX_PARTIES
            if (registrations > MAX_PARTIES - parties)
                //传入进来要注册参与者数目大于Phaser实例容许注册的参与者数目,抛出IllegalStateException异常,badRegister方法看下面介绍
                throw new IllegalStateException(badRegister(s));
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值
            phase = (int)(s >>> PHASE_SHIFT);
            //若是phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,若是terminated为1,Phaser关闭
            if (phase < 0)
                //若是Phaser关闭,直接退出循环
                break;
            //若是counts(高16位存放parties值,低16位存放unarrived值)不等于EMPTY,即当前Phaser实例不是注册第一个参与者
            if (counts != EMPTY) {                  // not 1st registration
                //若是当前Phaser实例不为空,或者属性state值没有改变
                if (parent == null || reconcileState() == s) {
                    //若是当前阶段屏障Phaser实例的unarrived等于0,代表当前全部参与者都到达阶段屏障,只是还没使用UnSafe的compareAndSwapLong将unarrived更新成parties,即还没使用CAS将属性state值更新成新的state值,调用internalAwaitAdvance方法先自旋一段等待Phaser实例的进入到下一阶段,而后将当前注册的参与者注册到下一阶段中,不然的话进入等待状态直到Phaser实例进入到下一阶段,internalAwaitAdvance方法能够看下面的介绍
                    if (unarrived == 0)             // wait out advance
                        //使用根root Phaser实例调用下面介绍的internalAwaitAdvance方法,传入当前Phaser实例阶段和节点为null
                        root.internalAwaitAdvance(phase, null);
                    //若是当前Phaser实例的unarrived不等于0,使用UnSafe的cas将传入进来的参与者数目加到属性state中
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adjust))
                        //退出当前循环
                        break;
                }
            }
            //若是counts等于EMPTY,而且当前Phaser实例没有父Phaser,代表还没注册参与者,当前注册的参与者是第一批
            else if (parent == null) {              // 1st root registration
                //使用phase(阶段值)移位32位或(|)上adjust(高16位存放parties值,低16位存放unarrived值)的调整值
                long next = ((long)phase << PHASE_SHIFT) | adjust;
                //使用UnSafe的cas将属性state值更新成新的值next 
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    //退出循环
                    break;
            }
            //若是counts等于EMPTY,而且当前Phaser实例有父Phaser,代表还没注册参与者,当前注册的参与者是第一批,当前Phaser实例作为父Phaser实例的一个parties
            else {
                //加锁,由于存在并发操做,当前Phaser实例作为锁对象
                synchronized (this) {               // 1st sub registration
                    //可能多个线程同时进入到else中,为此须要从新检查下属性state值,由于当前Phaser实例只能被当作一次父Phaser的一个parties参与者
                    if (state == s) {               // recheck under lock
                        //将当前Phaser实例作为父Phaser实例的一个parties参与者
                        phase = parent.doRegister(1);
                        //若是父Phaser的阶段值phase小于0,直接退出循环
                        if (phase < 0)
                            //退出循环
                            break;
                        // finish registration whenever parent registration
                        // succeeded, even when racing with termination,
                        // since these are part of the same "transaction".
                        //循环直到cas将注册进来的参与者注册到当前Phaser实例
                        while (!UNSAFE.compareAndSwapLong
                               (this, stateOffset, s,
                                ((long)phase << PHASE_SHIFT) | adjust)) {
                            //使用CAS更新当前Phaser实例的属性state为新的值失败,从新获取新的属性state值
                            s = state;
                            //从新从属性state值中获取到phase值,从新循环
                            phase = (int)(root.state >>> PHASE_SHIFT);
                            // assert (int)s == EMPTY;
                        }
                        //退出循环
                        break;
                    }
                }
            }
        }
        //返回当前Phaser实例的phase阶段值 
        return phase;
}

//调整当前Phaser实例的属性state
private long reconcileState() {
        //获取当前实例Phaser的根root Phaser实例 
        final Phaser root = this.root;
        //获取当前实例Phaser的属性state值 
        long s = state;
        //若是当前实例Phaser有层级,即root Phaser不是自身实例 
        if (root != this) {
            int phase, p;
            // CAS to root phase with current parties, tripping unarrived
            //若是当前Phaser实例的阶段值phase和根root phase的阶段值不一致,使用CAS将当前Phaser实例属性state替换成新的state值,即当前Phaser实例phase阶段值和root phaser的阶段值一致
            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
                   (int)(s >>> PHASE_SHIFT) &&
                   !UNSAFE.compareAndSwapLong
                   (this, stateOffset, s,
                    s = (((long)phase << PHASE_SHIFT) |
                         ((phase < 0) ? (s & COUNTS_MASK) :
                          (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                           ((s & PARTIES_MASK) | p))))))
                //从新获取当前Phaser实例的属性state值
                s = state;
        }
        //返回调整完的当前Phaser实例的属性state值
        return s;
}

//传入进来要注册参与者数目大于Phaser实例容许注册的参与者数目
private String badRegister(long s) {
        return "Attempt to register more than " +
            MAX_PARTIES + " parties for " + stateToString(s);
}

/** * 在Phaser实例运行期间批量注册参与者 */
public int bulkRegister(int parties) {
        //若是批量注册参与者数目小于0,抛出IllegalArgumentException异常
        if (parties < 0)
            throw new IllegalArgumentException();
        //若是批量注册参与者数目等于0
        if (parties == 0)
            //返回根root Phaser实例的阶段值,即根root Phaser实例的phase值
            return getPhase();
        //调用上面介绍的doRegister方法注册批量参与者到Phaser实例中 
        return doRegister(parties);
}

/** * 获取Phaser实例的阶段值,若是不支持层级root就是自身Phaser实例 */
public final int getPhase() {
        //root Phaser实例state属性高32位的低31位存放phase,为此须要右移32位获得phase值
        return (int)(root.state >>> PHASE_SHIFT);
}
复制代码

6、线程到达阶段屏障

/** * 一个参与者到达阶段屏障 */
public int arrive() {
        //调用下面介绍的doArrive方法,将属性state中的unarrived作减1操做,若是是最后一个到达还需唤醒其余等待其余参与者都到达阶段屏障的参与者线程
        return doArrive(ONE_ARRIVAL);
}

//传入adjust调整值,多是ONE_ARRIVAL或者ONE_DEREGISTER
private int doArrive(int adjust) {
        //获取根root Phaser实例 
        final Phaser root = this.root;
        //循环,直到属性state中的unarrived值作减1操做成功,若是adjust是ONE_DEREGISTER,直到属性state中的unarrived和parties都作减1操做成功 
        for (;;) {
            //若是根root Phaser实例就是自身Phaser实例,即当前Phaser实例没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),不然调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致
            long s = (root == this) ? state : reconcileState();
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值
            int phase = (int)(s >>> PHASE_SHIFT);
            //若是phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,若是terminated为1,Phaser关闭
            if (phase < 0)
                //若是Phaser关闭,直接返回phase
                return phase;
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
            int counts = (int)s;
            //若是counts(高16位存放parties值,低16位存放unarrived值)等于EMPTY,即当前Phaser实例还未注册参与者,不然的话counts & UNARRIVED_MASK获取到低16位存放unarrived值 
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            //若是unarrived小于等于0,代表还没注册一个参与者,就有参与者到达阶段屏障,抛出IllegalStateException异常,badArrive看下面介绍
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //使用CAS更新当前Phaser实例的属性state值,即属性state值更改成减去调整值的新值
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                //若是当前参与者是最后一个到达阶段屏障
                if (unarrived == 1) {
                    //获取到属性state的parties值,作为unarrived的值
                    long n = s & PARTIES_MASK;  // base of next state
                    //将parties值左移16位作为下一阶段的unarrived的值
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    //若是当前Phaser实例没有层级,即根root Phaser实例为自身Phaser实例 
                    if (root == this) {
                        //调用onAdvance方法,判断当前Phaser实例是否须要关闭,即当前Phaser实例阶段是否都执行完,这个onAdvance方法应该被子类重写,内部能够自定义各个不一样阶段执行的方法,好比switch{0:阶段0,1:阶段1,2:阶段2},若是返回true代表当前Phaser实例须要关闭
                        if (onAdvance(phase, nextUnarrived))
                            //若是onAdvance返回true将属性state值的最高位置为1
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            //若是下一阶段的unarrived的值为0,将属性state值的低32位置为EMPTY
                            n |= EMPTY;
                        else
                            //不然的话,将属性state值的低16位unarrived置为nextUnarrived 
                            n |= nextUnarrived;
                        //将phase阶段值加1作为下一阶段值
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        //从新计算属性state值,将下一阶段值左移32位,或(|)上低32位的parties值和unarrived值
                        n |= (long)nextPhase << PHASE_SHIFT;
                        //使用UnSafe的cas更新当前Phaser实例的属性state 
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        //唤醒全部其余等待其余参与者都到达阶段屏障的参与者线程,releaseWaiters方法看下面介绍
                        releaseWaiters(phase);
                    }
                    //当前Phaser实例有层级,即有父Phaser,若是nextUnarrived等于0,即当前Phaser实例没有参与者
                    else if (nextUnarrived == 0) { // propagate deregistration
                        //因为当前Phaser实例作为父Phaser实例的一个parties参与者,当前Phaser实例没有参与者,需将当前Phaser实例作为参与者从父Phaser实例中注销掉
                        phase = parent.doArrive(ONE_DEREGISTER);
                        //使用UnSafe的cas将属性state值的低32位置为EMPTY
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    //当前Phaser实例有层级,即有父Phaser,而且nextUnarrived不等于0,即当前Phaser实例有参与者,因为当前Phaser实例作为父Phaser实例的一个parties参与者,为此当前Phaser实例的一个阶段屏障参与者都到达,当前Phaser实例作为父Phaser实例的一个参与者到达
                    else
                        //当前Phaser实例作为父Phaser的一个参与者到达
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                //返回当前所处的阶段值 
                return phase;
            }
        }
}

//若是unarrived小于等于0,代表还没注册一个参与者,就有参与者到达阶段屏障
private String badArrive(long s) {
        return "Attempted arrival of unregistered party for " +
            stateToString(s);
}

//控制Phaser实例是否继续执行下一阶段,能够自定义子类继承Phaser控制Phaser须要执行几个阶段,不然registeredParties不等于0,Phaser会一直执行下去
protected boolean onAdvance(int phase, int registeredParties) {
        //若是registeredParties等于0,Phaser实例须要关闭
        return registeredParties == 0;
}

/** * 唤醒全部其余等待其余参与者都到达阶段屏障的节点(参与者)线程 * * @param phase Phaser实例阶段值 */
private void releaseWaiters(int phase) {
        //等待节点链表的第一个节点元素 
        QNode q;   // first element of queue
        //节点对应的线程
        Thread t;  // its thread
        //根据传入进来的阶段值获取操做的是偶链表仍是奇链表
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        //循环的获取头节点,头节点不为空而且节点的阶段值和根的阶段值不相等,代表全部的参与者都到达传入进来的阶段屏障
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            //CAS将头节点设置为头节点的下一节点,若是头节点设置为下一节点成功,而且头节点的线程不为空
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                //将头节点的线程置为空
                q.thread = null;
                //唤醒头节点线程,从新循环,直到链表为空或节点的阶段值和根的阶段值相等
                LockSupport.unpark(t);
            }
        }
}

/** * 一个参与者到达阶段屏障而且从屏障中注销移除,属性state中的parties和unarrived值都须要减1,若是是最后一个到达还需唤醒其余等待其余参与者都到达阶段屏障的参与者线程,若是一个参与者从屏障移除,而且屏障没有其余参与者即属性state值的parties等于0,将属性state的低32位置成EMPTY */
public int arriveAndDeregister() {
        //调用上面介绍的doArrive方法,将属性state中的unarrived作减1操做,若是是最后一个到达还需唤醒其余等待其余参与者都到达阶段屏障的参与者线程,而且屏障没有其余参与者即属性state值的parties等于0,将属性state的低32位置成EMPTY
        return doArrive(ONE_DEREGISTER);
}
复制代码

7、等待阶段屏障其他参与者线程所有到达

/** * 一个参与者到达阶段屏障而且等待其余参与者到达阶段屏障 */
public int arriveAndAwaitAdvance() {
        //获取根root Phaser实例
        final Phaser root = this.root;
        //循环直到其余参与者都到达阶段屏障
        for (;;) {
            //若是根root Phaser实例就是自身Phaser实例,即当前Phaser实例不支持层级没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),不然调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致 
            long s = (root == this) ? state : reconcileState();
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值 
            int phase = (int)(s >>> PHASE_SHIFT);
            //若是phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,若是terminated为1,Phaser关闭
            if (phase < 0)
                //若是Phaser关闭,直接返回phase
                return phase;
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
            int counts = (int)s;
            //若是counts(高16位存放parties值,低16位存放unarrived值)等于EMPTY,即当前Phaser实例还未注册参与者,不然的话counts & UNARRIVED_MASK获取到低16位存放unarrived值 
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            //若是unarrived小于等于0,代表还没注册一个参与者,就有参与者到达阶段屏障,抛出IllegalStateException异常,badArrive看上面介绍
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //使用UnSafe的cas将属性state更新成新的值,即属性state中的unarrived值-1,若是更新失败从新循环 
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                          s -= ONE_ARRIVAL)) {
                //若是unarrived大于1,代表还有其余参与者还未到达阶段屏障
                if (unarrived > 1)
                    //根root Phser实例(若是没有层级root就是自身Phaser实例)调用下面的internalAwaitAdvance方法,让当前线程先自旋一段时间,若是自旋一段时间其他参与者还未到达阶段屏障,封装成节点,加入到等待链表中,等待被唤醒,能够看下面对此方法的介绍
                    return root.internalAwaitAdvance(phase, null);
                //若是根root Phaser不是自身Phaser实例,代表当前Phaser实例支持层级,当前Phaser实例作为父Phaser实例的一个参与者注册进去,为此当前Phaser实例(每一个子的内部也有多个parties线程,也有多阶段))的一个阶段执行完成,作为父Phaser的一个参与者,代表当前参与者到达父的一个阶段屏障
                if (root != this)
                    //父Phaser把每一个子的Phaser(每一个子的内部也有多个parties线程,也有多阶段)当作父Phaser的一个parties参与者,为此当子Phaser实例的一个阶段执行完成代表子Phaser实例参与者也到达父Phaser实例的一个阶段
                    return parent.arriveAndAwaitAdvance();
                //获取到属性state的parties值,作为unarrived的值 
                long n = s & PARTIES_MASK;  // base of next state
                //将parties值左移16位作为下一阶段的unarrived的值
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                //调用onAdvance方法,判断当前Phaser实例是否须要关闭,即当前Phaser实例阶段是否都执行完,这个onAdvance方法应该被子类重写,内部能够自定义各个不一样阶段执行的方法,好比switch{0:阶段0,1:阶段1,2:阶段2},若是返回true代表当前Phaser实例须要关闭
                if (onAdvance(phase, nextUnarrived))
                    //若是onAdvance返回true将属性state值的最高位置为1,关闭Phaser实例
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    //若是下一阶段的unarrived的值为0,将属性state值的低32位置为EMPTY 
                    n |= EMPTY;
                else
                    //不然的话,将属性state值的低16位unarrived置为nextUnarrived 
                    n |= nextUnarrived;
                //将phase阶段值加1作为下一阶段值
                int nextPhase = (phase + 1) & MAX_PHASE;
                //从新计算属性state值,将下一阶段值左移32位,或(|)上低32位的parties值和unarrived值 
                n |= (long)nextPhase << PHASE_SHIFT;
                //使用UnSafe的cas更新当前Phaser实例的属性state,若是更新失败,代表有其余线程调用关闭Phaser实例,由于CAS更新属性state值只有最后一个参与者到达阶段屏障才会执行
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT); // terminated
                //唤醒全部其余等待其余参与者都到达阶段屏障的参与者线程,releaseWaiters方法看上面介绍 
                releaseWaiters(phase);
                //返回下一阶段值 
                return nextPhase;
            }
        }
}

//让当前线程先自旋一段时间,若是自旋一段时间其他参与者还未到达阶段屏障,封装成节点,加入到等待链表中,等待被唤醒
private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        ////唤醒全部其余等待其余参与者都到达传入阶段的上一阶段屏障的参与者线程
        releaseWaiters(phase-1);          // ensure old queue clean
        //节点是否加入到链表的标志位,true为节点已加入到链表中
        boolean queued = false;           // true when node is enqueued
        //记录最近的unarrived值,即最近还有几个参与者还未到达阶段屏障值
        int lastUnarrived = 0;            // to increase spins upon change
        //线程自旋的次数
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        //循环,直到传入进来的阶段值和当前Phaser实例的阶段值不相等,代表最后一个参与者已经到达阶段屏障,其他等待参与者线程需被唤醒 
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            //若是传入的节点为空
            if (node == null) {           // spinning in noninterruptible mode
                //从属性state中获取到低16位的unarrived值
                int unarrived = (int)s & UNARRIVED_MASK;
                //若是unarrived值和lastUnarrived值不相等,代表又有参与者到达阶段屏障
                if (unarrived != lastUnarrived &&
                    //若是unarrived小于NCPU,即还未到达的阶段屏障的参与者数目小于可用的处理器个数
                    (lastUnarrived = unarrived) < NCPU)
                    //把自旋值再加上SPINS_PER_ARRIVAL
                    spins += SPINS_PER_ARRIVAL;
                //获取当前线程是否被中断 
                boolean interrupted = Thread.interrupted();
                //若是线程被中断,或者已经自旋完
                if (interrupted || --spins < 0) { // need node to record intr
                    //建立新的节点,QNode能够看上面的介绍
                    node = new QNode(this, phase, false, false, 0L);
                    //将线程是否被中断标志位赋值给节点
                    node.wasInterrupted = interrupted;
                }
            }
            //判断节点是否被释放,如节点线程被中断,或者等待其余参与者到达超时,节点的isReleasable方法能够看上面介绍
            else if (node.isReleasable()) // done or aborted
                //若是节点被释放,退出循环
                break;
            //若是节点还未加入到链表中,将新建节点加入到链表中 
            else if (!queued) {           // push onto queue
                //根据传入进来的阶段值获取操做的是偶链表仍是奇链表
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                //获取头节点,作为新建节点的下一节点,新建节点作为链表新的头节点
                QNode q = node.next = head.get();
                //若是头节点为空空链表,或者节点的阶段值和传入进来的阶段值相等,而且传入进来的阶段值和当前Phaser实例的阶段值相等
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    //使用CAS将新建节点作为头节点
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    //使用ForkJoinPool的managedBlock方法让节点node对应的线程进入等待状态 
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    //节点线程在等待其他参与者到达阶段屏障的过程当中被其余线程中断,将节点标识线程是否被中断wasInterrupted 置为true
                    node.wasInterrupted = true;
                }
            }
        }
        
        //若是节点不为空 
        if (node != null) {
            //若是节点线程不为空,代表线程不是被正常唤醒,有多是被中断唤醒
            if (node.thread != null)
                //将当前节点线程置为空 
                node.thread = null;       // avoid need for unpark()
            //节点线程在等待其他参与者到达阶段屏障的过程当中被其余线程中断,而且节点不支持线程中断,因为线程在被其余线程中断唤醒,抛出InterruptedException异常,线程的中断标志位被重置
            if (node.wasInterrupted && !node.interruptible)
                //线程在被其余线程中断唤醒,抛出InterruptedException异常,线程的中断标志位被重置,为此当前线程须要再次调用interrupt()保留中断标志位
                Thread.currentThread().interrupt();
            //若是线程不是被最后一个参与者到达阶段屏障唤醒,即当前Phaser实例的阶段值和传入的阶段值相等
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                //将唤醒和当前Phaser实例阶段值不相等的节点,abortWait看下面方法的介绍
                return abortWait(phase); // possibly clean up on abort
        }
        //唤醒全部其余等待其余参与者都到达阶段屏障的参与者线程,releaseWaiters方法看上面介绍 
        releaseWaiters(phase);
        //返回传入的阶段值 
        return p;
}

//ForkJoinPool的managedBlock方法,阻塞线程,直到线程被唤醒
public static void managedBlock(ManagedBlocker blocker) throws InterruptedException {
        ForkJoinPool p;
        ForkJoinWorkerThread wt;
        Thread t = Thread.currentThread();
        //if这部分等分析ForkJoinPool再解析
        if ((t instanceof ForkJoinWorkerThread) &&
            (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
            WorkQueue w = wt.workQueue;
            while (!blocker.isReleasable()) {
                if (p.tryCompensate(w)) {
                    try {
                        do {} while (!blocker.isReleasable() &&
                                     !blocker.block());
                    } finally {
                        U.getAndAddLong(p, CTL, AC_UNIT);
                    }
                    break;
                }
            }
        }
        else {
            //循环调用节点QNode的isReleasable判断节点是否被释放,QNode的block方法让线程进入等待状态,不清楚的能够看QNode内部的这两个方法介绍 
            do {} while (!blocker.isReleasable() &&
                         !blocker.block());
        }
}

//传入阶段值,唤醒和当前Phaser实例阶段值不相等的节点
private int abortWait(int phase) {
        //根据传入进来的阶段值获取操做的是偶链表仍是奇链表 
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        //循环,直到唤醒全部等待在传入进来的phase阶段值的节点线程 
        for (;;) {
            Thread t;
            //获取链表头节点 
            QNode q = head.get();
            //root Phaser实例state属性高32位的低31位存放phase,为此须要右移32位获得phase值
            int p = (int)(root.state >>> PHASE_SHIFT);
            //若是链表中没有等待节点,或者头节点线程不为空和节点阶段值和当前Phaser实例阶段值相等,直接返回+
            if (q == null || ((t = q.thread) != null && q.phase == p))
                return p;
            if (head.compareAndSet(q, q.next) && t != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
}

public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
}

public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, false, 0L);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
        }
        return p;
}

public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long nanos = unit.toNanos(timeout);
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, true, nanos);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
            else if (p == phase)
                throw new TimeoutException();
        }
        return p;
}复制代码

8、关闭屏障

public void forceTermination() {
        // Only need to change root state
        final Phaser root = this.root;
        long s;
        while ((s = root.state) >= 0) {
            if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                          s, s | TERMINATION_BIT)) {
                // signal all threads
                releaseWaiters(0); // Waiters on evenQ
                releaseWaiters(1); // Waiters on oddQ
                return;
            }
        }
}复制代码

9、其余方法

public final int getPhase() {
        return (int)(root.state >>> PHASE_SHIFT);
}

public int getRegisteredParties() {
        return partiesOf(state);
}

public int getArrivedParties() {
        return arrivedOf(reconcileState());
}

public int getUnarrivedParties() {
        return unarrivedOf(reconcileState());
}

public Phaser getParent() {
        return parent;
}

public Phaser getRoot() {
        return root;
}

public boolean isTerminated() {
        return root.state < 0L;
}

public String toString() {
        return stateToString(reconcileState());
}复制代码
相关文章
相关标签/搜索