CyclicBarrier是并发包中提供的一个同步辅助类,可使必定数量的线程所有在栅栏位置处聚集,parties的线程才能继续往下执行。当线程到达栅栏位置时调用await方法,这个方法将阻塞直到全部线程都到达栅栏位置。若是全部线程都到达栅栏位置,那么栅栏将打开,此时全部的线程都将被释放,而栅栏将被重置以便下次使用。CyclicBarrier内部是基于ReentrantLock和Condition(AQS内部类)进行实现,维护一个parties的线程数。和CountDownLatch的区别是一、CyclicBarrier能够被重用。二、CountDownLatch是经过AQS的共享模式进行实现,CyclicBarrier是基于ReentrantLock和Condition,以及内部维护一个parties的线程数进行实现,对ReentrantLock不清楚的,能够看下个人另外一篇对ReentrantLock的源码分析juejin.im/post/5d1b0d…。基于Java8。java
//独占锁,属性generation、 count不是线程安全须要在独占锁加锁操做的前提下,对ReentrantLock不清楚的能够看下个人另外一篇https://juejin.im/post/5d1b0da8f265da1bc5527d36
private final ReentrantLock lock = new ReentrantLock();
//独占锁的条件变量,等待全部线程都到达栅栏位置的全部线程,都是在条件队列中等待全部线程都到达栅栏位置
private final Condition trip = lock.newCondition();
//还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count
private final int parties;
//最后一个到达栅栏的线程须要执行的任务
private final Runnable barrierCommand;
//CyclicBarrier栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程当中,此属性不是线程安全的
private Generation generation = new Generation();
//还未到达栅栏位置的线程数量,有一个线程到达栅栏count就作减1操做,此属性的操做须要在ReentrantLock独占锁的条件下
private int count;复制代码
//栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程当中
private static class Generation {
//若是栅栏被打破,broken就会置为true
boolean broken = false;
}复制代码
/** * 传入parties和barrierAction构造CyclicBarrier实例 * * @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count * @param barrierAction 最后一个到达栅栏的线程须要执行的任务 */
public CyclicBarrier(int parties, Runnable barrierAction) {
//若是传入进来的parties小于等于0,抛出IllegalArgumentException异常
if (parties <= 0) throw new IllegalArgumentException();
//将传入进来的parties赋值给属性parties
this.parties = parties;
//将传入进来的parties赋值给属性count
this.count = parties;
//将传入进来的barrierAction任务赋值给属性barrierAction
this.barrierCommand = barrierAction;
}
/** * 只传入parties构造CyclicBarrier实例 * * @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,从新赋值给count */
public CyclicBarrier(int parties) {
//调用上面介绍的CyclicBarrier构造函数,barrierAction任务传入空
this(parties, null);
}
复制代码
/** * 返回当前线程第几个到达栅栏,当前线程等待其余全部线程到达栅栏,不然会阻塞到其余线程都到达栅栏被唤醒 * * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 */
public int await() throws InterruptedException, BrokenBarrierException {
try {
//等待其余全部线程都到达栅栏,会一直阻塞到全部线程到达栅栏被唤醒,或者其中一个线程被中断破坏了栅栏,也会被唤醒,不支持超时的调用下面介绍的dowait方法
return dowait(false, 0L);
} catch (TimeoutException toe) {//因为是一直阻塞到被唤醒,为此不会抛出超时异常,因为TimeoutException是编译异常,为此须要对其进行捕获
//不会发生,若是发生直接将超时异常封装成Error
throw new Error(toe);
}
}
/** * 返回当前线程第几个到达栅栏,当前线程超时的等待其余全部线程到达栅栏,不然会超时的等待到其余线程都到达栅栏被唤醒,若是超时其余线程还都没有所有到达栅栏,会抛出TimeoutException异常 * * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 * @throws TimeoutException 若是超时其余线程还都没有所有到达栅栏,会抛出TimeoutException异常 */
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
//超时的调用下面介绍的dowait方法
return dowait(true, unit.toNanos(timeout));
}
/** * 返回当前线程第几个到达栅栏,当前线程等待其余全部线程到达栅栏,不然会阻塞到其余线程都到达栅栏被唤醒,支持当前线程超时的等待和非超时的等待 * * @param timed 是否超时等待其余线程到达栅栏 * @param nanos 超时的时间参数 * @return 当前线程到达栅栏的指数,即第几个到达栅栏 * @throws InterruptedException 若是当前线程在等待其余全部的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常 * @throws BrokenBarrierException 若是CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常 */
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
//获取独占锁,因为generation、count属性都不是线程安全的,为此须要在独占锁下才能操做
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//获取CyclicBarrier栅栏的代信息属性generation
final Generation g = generation;
//判断栅栏是否被打破,即generation的broken属性,若是CyclicBarrier栅栏被打破,broken属性为true,CyclicBarrier栅栏不能再被使用
if (g.broken)
//若是CyclicBarrier栅栏被打破,抛出BrokenBarrierException异常
throw new BrokenBarrierException();
//判断当前线程是否被中断,若是被中断,调用下面介绍的breakBarrier方法
if (Thread.interrupted()) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
breakBarrier();
//抛出InterruptedException异常
throw new InterruptedException();
}
//将count先作减一操做,再赋值给index,表示当前线程第几个到达栅栏,在当前线程被唤醒时,作为返回值返回
int index = --count;
//若是当前线程是最后一个到达CyclicBarrier栅栏的线程
if (index == 0) {
//最后一个到达栅栏的线程执行任务是否成功标识,再finally中须要作对应的处理,唤醒其余对待的线程,有可能传入进来的任务执行抛出异常
boolean ranAction = false;
try {
//获取到须要最后一个到达栅栏的线程执行的任务,可能为空
final Runnable command = barrierCommand;
//若是任务不为空
if (command != null)
//当前最后一个到达CyclicBarrier栅栏的线程执行任务,调用任务的run方法
command.run();
//若是任务不为空,执行成功,ranAction标识为置为true
ranAction = true;
//调用下面介绍的nextGeneration方法,唤醒全部等待其余线程都到达栅栏的线程
nextGeneration();
//返回0,由于是最后一个到达栅栏的线程
return 0;
} finally {
//若是最后一个到达CyclicBarrier栅栏的线程执行不为空的任务失败
if (!ranAction)
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
breakBarrier();
}
}
//循环直到其余线程都到达栅栏,或者栅栏不可用,或者等待的线程被中断,或者等待其余线程都到达栅栏超时
for (;;) {
try {
//若是不支持超时
if (!timed)
//调用Condition的await方法,Condition不清楚的能够看个人另外一篇AQS源码对Condition的分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
trip.await();
//若是支持超时,而且超时时间nanos小于0,调用await支持超时会抛出超时异常
else if (nanos > 0L)
//若是支持超时timed为true,而且nanos大于0,调用Condition的await的超时方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {//若是线程在等待其余全部线程到大栅栏的过程当中,被中断
//若是栅栏的代信息对象没有改变,而且代信息对象的broken属性为false,栅栏没有被打破,为何代信息对象会改变,由于调用下面介绍的reset的方法,会改变代信息
if (g == generation && ! g.broken) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
breakBarrier();
//抛出线程被中断异常
throw ie;
} else {
//若是上面的栅栏的代信息已经改变,或者栅栏已经被打破,即generation的属性broken为true,重置线程的中断标志位,由于异常线程的中断标志位会被重置
Thread.currentThread().interrupt();
}
}
//若是CyclicBarrier栅栏已经被打破,即generation的属性broken为true
if (g.broken)
//抛出BrokenBarrierException异常
throw new BrokenBarrierException();
//若是栅栏的代信息已经改变,主动调用reset方法,会改变栅栏的代信息
if (g != generation)
//返回线程第几个到达栅栏
return index;
//若是超时的调用await方法,即timed为true,而且nanos小于等于0
if (timed && nanos <= 0L) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
breakBarrier();
//抛出超时异常
throw new TimeoutException();
}
}
} finally {
//释放独占锁
lock.unlock();
}
}
//重置CyclicBarrier栅栏的代信息,唤醒全部等待其余线程都到达栅栏的线程,以及将count重置为parties
private void nextGeneration() {
//唤醒全部等待其余线程都到达栅栏的线程
trip.signalAll();
//将count重置为parties
count = parties;
//重置CyclicBarrier栅栏的代信息
generation = new Generation();
}
//将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
private void breakBarrier() {
//将generation的broken属性置为true
generation.broken = true;
//将count重置为parties
count = parties;
//唤醒全部等待其余线程都到达栅栏的线程
trip.signalAll();
}
复制代码
//判断CyclicBarrier栅栏是否被打破,即CyclicBarrier栅栏的代信息generation的属性broken是否为true
public boolean isBroken() {
//获取独占锁,因为generation属性不是线程安全的,为此须要在独占锁下才能操做
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//返回CyclicBarrier栅栏的代信息generation的属性broken,若是为true代表栅栏被打破
return generation.broken;
} finally {
//释放独占锁
lock.unlock();
}
}
//重置CyclicBarrier栅栏
public void reset() {
//获取独占锁,因为generation、count属性都不是线程安全的,为此须要在独占锁下才能操做
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//调用上面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count从新赋值为parties属性值,唤醒全部等待其余线程都到达栅栏的线程
breakBarrier(); // break the current generation
//重置CyclicBarrier栅栏的代信息,唤醒全部等待其余线程都到达栅栏的线程,以及将count重置为parties
nextGeneration(); // start a new generation
} finally {
//释放独占锁
lock.unlock();
}
}
//获取有多少线程在等待其余线程到达栅栏
public int getNumberWaiting() {
//获取独占锁,因为count属性都不是线程安全的,为此须要在独占锁下才能操做
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//使用还未到达栅栏位置的起始线程数量parties减去目前还未到达栅栏位置的线程数量,获取到有多少线程在等待其余线程到达栅栏
return parties - count;
} finally {
//释放独占锁
lock.unlock();
}
}
复制代码