CyclicBarrier能够理解为Cyclic + Barrier, 可循环使用 + 屏障嘛。java
可让一组线程所有到达一个屏障【同步点】,再所有冲破屏障,继续向下执行。编程
public class CycleBarrierTest2 { private static final CyclicBarrier cyclicBarrier = new CyclicBarrier( 2, // 计数器的初始值 new Runnable() { // 计数器值为0时须要执行的任务 @Override public void run () { System.out.println(Thread.currentThread() + " tripped ~"); } } ); public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @SneakyThrows @Override public void run () { Thread thread = Thread.currentThread(); System.out.println(thread + " step 1"); cyclicBarrier.await(); System.out.println(thread + " step 2"); cyclicBarrier.await(); System.out.println(thread + " step 3"); } }); executorService.submit(new Runnable() { @SneakyThrows @Override public void run () { Thread thread = Thread.currentThread(); System.out.println(thread + " step 1"); cyclicBarrier.await(); System.out.println(thread + " step 2"); cyclicBarrier.await(); System.out.println(thread + " step 3"); } }); executorService.shutdown(); } }
测试结果以下:并发
Thread[pool-1-thread-2,5,main] step 1 Thread[pool-1-thread-1,5,main] step 1 Thread[pool-1-thread-1,5,main] tripped ~ Thread[pool-1-thread-1,5,main] step 2 Thread[pool-1-thread-2,5,main] step 2 Thread[pool-1-thread-2,5,main] tripped ~ Thread[pool-1-thread-2,5,main] step 3 Thread[pool-1-thread-1,5,main] step 3
多个线程之间是相互等待的,加入当前计数器值为N,以后N-1个线程调用await方法都会达到屏障点而阻塞,只有当第N个线程调用await方法时,计数器值为0,第N个线程才会唤醒以前等待的全部线程,再一块儿向下执行。app
CyclicBarrier是可复用的,全部线程达到屏障点以后,CyclicBarrier会被重置。ide
public class CyclicBarrier { private static class Generation { boolean broken = false; } /** 独占锁保证同步 */ private final ReentrantLock lock = new ReentrantLock(); /** condition实现等待通知机制 */ private final Condition trip = lock.newCondition(); /** 记录线程个数 */ private final int parties; /* 达到屏障点执行的任务 */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * 记录仍在等待的parties数量, 每一代count都会从初始的parties递减至0 */ private int count; // 指定barrierAction, 在线程达到屏障后,优先执行barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } // 指定parties, 但愿屏障拦截的线程数量 public CyclicBarrier(int parties) { this(parties, null); } }
CyclicBarrier是可复用的,所以使用两个变量记录线程个数,count变为0时,会将parties赋值给count,进行复用。工具
本篇文章阅读须要创建在必定独占锁,Condition条件机制的基础之上,这边推荐几篇前置文章,能够瞅一眼:oop
CyclicBarrier是可复用的,Generation用于标记更新换代。性能
// 屏障的每一次使用都会生成一个新的Generation实例: 多是 tripped or reset private static class Generation { boolean broken = false; }
更新换代: 首先标记一下当前这代不用了, 而后换一个新的。学习
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break掉当前的 nextGeneration(); // 开启一个新的 } finally { lock.unlock(); } }
标记一下broken为true,唤醒一下await等待线程,重置count。测试
private void breakBarrier() { // 标记broken 为true generation.broken = true; // 重置count count = parties; // 唤醒因await等待的线程 trip.signalAll(); }
唤醒一下await等待线程,重置count,更新为下一代。
private void nextGeneration() { // 唤醒因await等待的线程 trip.signalAll(); // 重置count,意味着下一代了 count = parties; // 下一代了 generation = new Generation(); }
当前线程调用await方法时会阻塞,除非遇到如下几种状况:
它内部调用了int dowait(boolean timed, long nanos)
,详细解析往下面翻哈。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
相比于普通的await()方法,该方法增长了超时的控制,你懂的。
增长了一项:若是超时了,返回false。
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 与当前屏障点关联的Generation final Generation g = generation; // broken标志为true,则异常 if (g.broken) throw new BrokenBarrierException(); // 若是被打断,则breakBarrier,并抛出异常 if (Thread.interrupted()) { // 打破: 1 标记broken为true 2 重置count 3 唤醒await等待的线程 breakBarrier(); throw new InterruptedException(); } int index = --count; // 说明已经到达屏障点了 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行一下任务 if (command != null) command.run(); ranAction = true; // 更新: 1 唤醒await等待的线程 2 更新Generation nextGeneration(); return 0; } finally { // 执行失败了,可能被打断了 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 死循环, 结束的状况有:到达屏障点, broken了, 中断, 超时 for (;;) { try { // 超时控制 if (!timed) trip.await(); else if (nanos > 0L) // awaitNanos阻塞一段时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 标记broken为true breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 正常被唤醒, 再次检查当前这一代是否已经标记了broken if (g.broken) throw new BrokenBarrierException(); // 最后一个线程在等待线程醒来以前,已经经过nextGeneration将generation更新 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
以parties为N为例,咱们来看看这一流程。
线程调用dowait方法后,首先会获取独占锁lock。若是是前N-1个线程,因为index != 0
,会在条件队列中等待trip.await() or trip.awaitNanos(nanos)
,会相应释放锁。
第N个线程调用dowait以后,此时index == 0
,将会执行命令command.run()
,而后调用nextGeneration()
更新换代,同时唤醒全部条件队列中等待的N-1个线程。
第N个线程释放锁,后续被唤醒的线程移入AQS队列,陆续获取锁,释放锁。
CountDownLatch基于AQS,state表示计数器的值,在构造时指定。CyclicBarrier基于ReentrantLock独占锁与Condition条件机制实现屏障逻辑。
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器能够使用reset()方法重置,可复用性可以处理更为复杂【分段任务有序执行】的业务场景。
CyclicBarrier还提供了其余有用的方法,如getNumberWaiting
方法能够得到CyclicBarrier阻塞的线程数量。isBroken()
方法用来了解阻塞的线程是否被中断。
CyclicBarrier = Cyclic + Barrier, 可重用 + 屏障,可让一组线程所有到达一个屏障【同步点】,再所有冲破屏障,继续向下执行。
CyclicBarrier基于ReentrantLock独占锁与Condition条件机制实现屏障逻辑。
CyclicBarrier须要指定parties【N】以及可选的任务,当N - 1个线程调用await的时候,会在条件队列中阻塞,直到第N个线程调用await,执行指定的任务后,唤醒N - 1个等待的线程,并重置Generation,更新count。