CyclicBarrier字面意思是循环屏障,它能够实现线程间的计数等待。当线程到达屏障点时会依次进入等待状态,直到最后一个线程进入屏障点时会唤醒等待的线程继续运行。ide
CyclicBarrier和CountDownLatch相似,区别在于CountDownLatch只能使用一次,当计数器归零后,CountDownLatch的await等方法都会直接返回。而CyclicBarrier是能够重复使用的,当计数器归零后,计数器和CyclicBarrier状态都会被重置。this
CyclicBarrier(int parties):建立CyclicBarrier,指定计数器值(等待线程数量)。线程
CyclicBarrier(int parties, Runnable barrierAction):建立CyclicBarrier,指定计数器值(等待线程数量)和计数器归零后(最后一个线程到达)要执行的任务。游戏
await():阻塞当前线程,直到计数器归零被唤醒或者线程被中断。ip
await(long timeout, TimeUnit unit):阻塞当前线程,直到计数器归零被唤醒、线程被中断或者超时返回。get
等待全部玩家准备就绪,游戏才开始,每一轮游戏的开始意味着CyclicBarrier已经重置,能够开始新一轮的计数。同步
public class Demo { public static void main(String[] args) { //建立CyclicBarrier并指定计数器值为5,以及计数器为0后要执行的任务 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("---游戏开始---"); System.out.println("---五票同意,游戏结束---"); }); Runnable runnable = () -> { //重复使用CyclicBarrier5次 for(int i = 0; i < 5; i++){ System.out.println(Thread.currentThread().getName() + ":准备就绪"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; Thread thread1 = new Thread(runnable, "一号玩家"); Thread thread2 = new Thread(runnable, "二号玩家"); Thread thread3 = new Thread(runnable, "三号玩家"); Thread thread4 = new Thread(runnable, "四号玩家"); Thread thread5 = new Thread(runnable, "五号玩家"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); } } /* * 循环输出5次 * 输出结果: * 一号玩家:准备就绪 * 三号玩家:准备就绪 * 二号玩家:准备就绪 * 五号玩家:准备就绪 * 四号玩家:准备就绪 * ---游戏开始--- * ---五票同意,游戏结束--- * 三号玩家:准备就绪 * 一号玩家:准备就绪 * 五号玩家:准备就绪 * ...... */
在使用CyclicBarrier中,假设总的等待线程数量为5,如今其中一个线程被中断了,被中断的线程将抛出InterruptedException异常,而其余4个线程将抛出BrokenBarrierException异常。源码
BrokenBarrierException异常表示当前的CyclicBarrier已经破损,可能不能等待全部线程到齐了,避免其余线程永久的等待。it
CyclicBarrier是基于显式锁ReentrantLock来实现的,CyclicBarrier不少方法都使用显式锁作了同步处理,await方法的等待唤醒也是经过Condition实现的。io
CyclicBarrier的成员变量:
//显式锁 private final ReentrantLock lock = new ReentrantLock(); //用于显式锁的Condition private final Condition trip = lock.newCondition(); //线程数量 private final int parties; //当全部线程到达屏障点后执行的任务 private final Runnable barrierCommand; //Generation内部有一个broken变量,用于标识CyclicBarrier是否破损 private Generation generation = new Generation(); //用于递减的线程数量,在每一轮结束后会被重置为parties private int count;
await方法里是调用的dowait方法,dowait方法源码:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //若是CyclicBarrier已破损,则抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); //若是当前线程已经中断,则将CyclicBarrier标记为已破损并抛出InterruptedException异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //index == 0表示全部线程都到达了屏障点 if (index == 0) { // tripped boolean ranAction = false; try { //执行线程到齐后须要执行的任务 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒全部等待的线程并重置CyclicBarrier nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //线程没到齐,阻塞当前线程 for (;;) { try { //不带超时时间的等待 if (!timed) trip.await(); //带超时时间的等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
nextGeneration方法:
private void nextGeneration() { //唤醒全部等待的线程 trip.signalAll(); //重置CyclicBarrier count = parties; generation = new Generation(); }