CyclicBarrier源码分析

1、简介

          CyclicBarrier是并发包中提供的一个同步辅助类,可使必定数量的线程所有在栅栏位置处聚集,parties的线程才能继续往下执行。当线程到达栅栏位置时调用await方法,这个方法将阻塞直到全部线程都到达栅栏位置。若是全部线程都到达栅栏位置,那么栅栏将打开,此时全部的线程都将被释放,而栅栏将被重置以便下次使用。CyclicBarrier内部是基于ReentrantLock和Condition(AQS内部类)进行实现,维护一个parties的线程数。和CountDownLatch的区别是一、CyclicBarrier能够被重用。二、CountDownLatch是经过AQS的共享模式进行实现,CyclicBarrier是基于ReentrantLock和Condition,以及内部维护一个parties的线程数进行实现,对ReentrantLock不清楚的,能够看下个人另外一篇对ReentrantLock的源码分析juejin.im/post/5d1b0d…。基于Java8。java

2、属性

//独占锁,属性generation、 count不是线程安全须要在独占锁加锁操做的前提下,对ReentrantLock不清楚的能够看下个人另外一篇https://juejin.im/post/5d1b0da8f265da1bc5527d36
private final ReentrantLock lock = new ReentrantLock();
//独占锁的条件变量,等待全部线程都到达栅栏位置的全部线程,都是在条件队列中等待全部线程都到达栅栏位置
private final Condition trip = lock.newCondition();
//还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count
private final int parties;
//最后一个到达栅栏的线程须要执行的任务
private final Runnable barrierCommand;
//CyclicBarrier栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程当中,此属性不是线程安全的
private Generation generation = new Generation();
//还未到达栅栏位置的线程数量,有一个线程到达栅栏count就作减1操做,此属性的操做须要在ReentrantLock独占锁的条件下
private int count;复制代码

3、内部类

//栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程当中
private static class Generation {
        //若是栅栏被打破,broken就会置为true
        boolean broken = false;
}复制代码

4、构造函数

/** * 传入parties和barrierAction构造CyclicBarrier实例 * * @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count * @param barrierAction 最后一个到达栅栏的线程须要执行的任务 */
public CyclicBarrier(int parties, Runnable barrierAction) {
        //若是传入进来的parties小于等于0,抛出IllegalArgumentException异常 
        if (parties <= 0) throw new IllegalArgumentException();
        //将传入进来的parties赋值给属性parties
        this.parties = parties;
        //将传入进来的parties赋值给属性count
        this.count = parties;
        //将传入进来的barrierAction任务赋值给属性barrierAction 
        this.barrierCommand = barrierAction;
}

/** * 只传入parties构造CyclicBarrier实例 * * @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count */
public CyclicBarrier(int parties) {
        //调用上面介绍的CyclicBarrier构造函数,barrierAction任务传入空
        this(parties, null);
}
复制代码

5、等待全部线程到达

/** * 返回当前线程第几个到达栅栏,当前线程等待其余全部线程到达栅栏,不然会阻塞到其余线程都到达栅栏被唤醒 * * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 */
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //等待其余全部线程都到达栅栏,会一直阻塞到全部线程到达栅栏被唤醒,或者其中一个线程被中断破坏了栅栏,也会被唤醒,不支持超时的调用下面介绍的dowait方法 
            return dowait(false, 0L);
        } catch (TimeoutException toe) {//因为是一直阻塞到被唤醒,为此不会抛出超时异常,因为TimeoutException是编译异常,为此须要对其进行捕获
            //不会发生,若是发生直接将超时异常封装成Error
            throw new Error(toe); 
        }
}

/** * 返回当前线程第几个到达栅栏,当前线程超时的等待其余全部线程到达栅栏,不然会超时的等待到其余线程都到达栅栏被唤醒,若是超时其余线程还都没有所有到达栅栏,会抛出TimeoutException异常 * * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 * @throws TimeoutException 若是超时其余线程还都没有所有到达栅栏,会抛出TimeoutException异常 */
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
        //超时的调用下面介绍的dowait方法 
        return dowait(true, unit.toNanos(timeout));
}

/** * 返回当前线程第几个到达栅栏,当前线程等待其余全部线程到达栅栏,不然会阻塞到其余线程都到达栅栏被唤醒,支持当前线程超时的等待和非超时的等待 * * @param timed 是否超时等待其余线程到达栅栏 * @param nanos 超时的时间参数 * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 */
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        //获取独占锁,因为generation、count属性都不是线程安全的,为此须要在独占锁下才能操做
        final ReentrantLock lock = this.lock;
        //加独占锁
        lock.lock();
        try {
            //获取CyclicBarrier栅栏的代信息属性generation
            final Generation g = generation;
            
            //判断栅栏是否被打破,即generation的broken属性,若是CyclicBarrier栅栏被打破,broken属性为true,CyclicBarrier栅栏不能再被使用 
            if (g.broken)
                //若是CyclicBarrier栅栏被打破,抛出BrokenBarrierException异常
                throw new BrokenBarrierException();
            //判断当前线程是否被中断,若是被中断,调用下面介绍的breakBarrier方法
            if (Thread.interrupted()) {
                //调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程 
                breakBarrier();
                //抛出InterruptedException异常
                throw new InterruptedException();
            }
            
            //将count先作减一操做,再赋值给index,表示当前线程第几个到达栅栏,在当前线程被唤醒时,作为返回值返回
            int index = --count;
            //若是当前线程是最后一个到达CyclicBarrier栅栏的线程
            if (index == 0) {  
                //最后一个到达栅栏的线程执行任务是否成功标识,再finally中须要作对应的处理,唤醒其余对待的线程,有可能传入进来的任务执行抛出异常
                boolean ranAction = false;
                try {
                    //获取到须要最后一个到达栅栏的线程执行的任务,可能为空 
                    final Runnable command = barrierCommand;
                    //若是任务不为空
                    if (command != null)
                        //当前最后一个到达CyclicBarrier栅栏的线程执行任务,调用任务的run方法
                        command.run();
                    //若是任务不为空,执行成功,ranAction标识为置为true
                    ranAction = true;
                    //调用下面介绍的nextGeneration方法,唤醒全部等待其余线程都到达栅栏的线程
                    nextGeneration();
                    //返回0,由于是最后一个到达栅栏的线程
                    return 0;
                } finally {
                    //若是最后一个到达CyclicBarrier栅栏的线程执行不为空的任务失败
                    if (!ranAction)
                        //调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
                        breakBarrier();
                }
            }
            
            //循环直到其余线程都到达栅栏,或者栅栏不可用,或者等待的线程被中断,或者等待其余线程都到达栅栏超时 
            for (;;) {
                try {
                    //若是不支持超时 
                    if (!timed)
                        //调用Condition的await方法,Condition不清楚的能够看个人另外一篇AQS源码对Condition的分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
                        trip.await();
                    //若是支持超时,而且超时时间nanos小于0,调用await支持超时会抛出超时异常 
                    else if (nanos > 0L)
                        //若是支持超时timed为true,而且nanos大于0,调用Condition的await的超时方法
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {//若是线程在等待其余全部线程到大栅栏的过程当中,被中断
                    //若是栅栏的代信息对象没有改变,而且代信息对象的broken属性为false,栅栏没有被打破,为何代信息对象会改变,由于调用下面介绍的reset的方法,会改变代信息
                    if (g == generation && ! g.broken) {
                        //调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程 
                        breakBarrier();
                        //抛出线程被中断异常 
                        throw ie;
                    } else {
                        //若是上面的栅栏的代信息已经改变,或者栅栏已经被打破,即generation的属性broken为true,重置线程的中断标志位,由于异常线程的中断标志位会被重置
                        Thread.currentThread().interrupt();
                    }
                }
                //若是CyclicBarrier栅栏已经被打破,即generation的属性broken为true
                if (g.broken)
                    //抛出BrokenBarrierException异常
                    throw new BrokenBarrierException();
                //若是栅栏的代信息已经改变,主动调用reset方法,会改变栅栏的代信息
                if (g != generation)
                    //返回线程第几个到达栅栏
                    return index;
                //若是超时的调用await方法,即timed为true,而且nanos小于等于0
                if (timed && nanos <= 0L) {
                    //调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程 
                    breakBarrier();
                    //抛出超时异常
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放独占锁
            lock.unlock();
        }
}

//重置CyclicBarrier栅栏的代信息,唤醒全部等待其余线程都到达栅栏的线程,以及将count重置为parties
private void nextGeneration() {
        //唤醒全部等待其余线程都到达栅栏的线程
        trip.signalAll();
        //将count重置为parties 
        count = parties;
        //重置CyclicBarrier栅栏的代信息 
        generation = new Generation();
}

//将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
private void breakBarrier() { 
        //将generation的broken属性置为true 
        generation.broken = true;
        //将count重置为parties
        count = parties;
        //唤醒全部等待其余线程都到达栅栏的线程
        trip.signalAll();
}
复制代码

6、其余方法

//判断CyclicBarrier栅栏是否被打破,即CyclicBarrier栅栏的代信息generation的属性broken是否为true
public boolean isBroken() {
        //获取独占锁,因为generation属性不是线程安全的,为此须要在独占锁下才能操做
        final ReentrantLock lock = this.lock;
        //加独占锁
        lock.lock();
        try {
            //返回CyclicBarrier栅栏的代信息generation的属性broken,若是为true代表栅栏被打破
            return generation.broken;
        } finally {
            //释放独占锁
            lock.unlock();
        }
}

//重置CyclicBarrier栅栏
public void reset() {
        //获取独占锁,因为generation、count属性都不是线程安全的,为此须要在独占锁下才能操做
        final ReentrantLock lock = this.lock;
        //加独占锁
        lock.lock();
        try {
            //调用上面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程 
            breakBarrier();   // break the current generation
            //重置CyclicBarrier栅栏的代信息,唤醒全部等待其余线程都到达栅栏的线程,以及将count重置为parties
            nextGeneration(); // start a new generation
        } finally {
            //释放独占锁
            lock.unlock();
        }
}

//获取有多少线程在等待其余线程到达栅栏
public int getNumberWaiting() {
        //获取独占锁,因为count属性都不是线程安全的,为此须要在独占锁下才能操做
        final ReentrantLock lock = this.lock;
        //加独占锁
        lock.lock();
        try {
            //使用还未到达栅栏位置的起始线程数量parties减去目前还未到达栅栏位置的线程数量,获取到有多少线程在等待其余线程到达栅栏 
            return parties - count;
        } finally {
            //释放独占锁
            lock.unlock();
        }
}
复制代码
相关文章
相关标签/搜索