一、CyclicBarrier和CountDownLatch的区别
CountDownLatch是闭锁,只能使用一次,而CyclicBarrier的计数器会重置,可使用屡次,因此CyclicBarrier可以处理更为复杂的场景;java
CyclicBarrier还提供了一些其余有用的方法,好比getNumberWaiting()方法能够得到CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
PS:有一个线程broken了,整组broken;
CyclicBarrier是基于独占锁和阻塞队列实现的,因此并发性能在基因上就有缺陷,应对高并发场景时应谨慎考虑是否使用并发
CountDownLatch容许一个或多个线程等待一组事件完成而继续,而CyclicBarrier容许一个事件等待一个或多个线程完成而继续。
--------------------- app
二、CountDownLatch是使用AQS框架共享锁实现的同步,队列采用的sync同步FIFO队列
CyclicBarrier是使用AQS框架独占锁实现的同步,队列采用了condition阻塞block队列--详见AQS-condition阻塞队列
基于AQS框架的解读,本次正好将AQS的阻塞队列的模式补上。始于栅栏,始于源码
三、源码:框架
package com.ysma.test; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.*; /** * 在最后一个线程抵达而且其余线程也都抵达或者broken了的时候,整个阻塞就盘活了,不在阻塞 * @since 1.5 * @see CountDownLatch * @author Doug Lea 又是这哥们写的,保留这个注释 */ public class CyclicBarrier { /**一个栅栏就是一代,Generation变化一次就表明栅栏完成了一次*/ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // 唤醒通知上一代已经完成 trip.signalAll(); // 重置计数器开启新时代 count = parties; generation = new Generation(); } /** * 设置当前代中断,唤醒全部 * Called only while holding lock. */ private void breakBarrier() { generation.broken = true;//因故中断,标识一下 count = parties;//重置计数器,唤醒全部阻塞线程; trip.signalAll();//PS:并无开启新时代! } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//获取或等待获取资源,ysma-1 try { final Generation g = generation; if (g.broken)//任一broken则break全部 throw new BrokenBarrierException(); if (Thread.interrupted()) {//获取资源后发现本身被中断了 breakBarrier(); throw new InterruptedException(); } int index = --count;//获取资源,计数器减一 if (index == 0) { // 达到临界点,执行barrierCommand,nextGeneration,结束=>放行全部线程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await();//不设置超时,wait,释放cpu else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//wait指定时间 } catch (InterruptedException ie) { if (g == generation && ! g.broken) {//发生异常,判断本身为第一个发起中断者 breakBarrier(); throw ie; } else {//发生异常,本身非第一个中断者 Thread.currentThread().interrupt(); } } if (g.broken)//被唤醒后,检测中断标志broken throw new BrokenBarrierException(); if (g != generation)//若是栅栏已经开启了下一代,结束,放行 return index; if (timed && nanos <= 0L) {//被唤醒后,发现超时了,broken中断 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//解锁,释放资源 } } /**构造器,略*/ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /**构造器,略*/ public CyclicBarrier(int parties) { this(parties, null); } /**获取资源/线程数*/ public int getParties() { return parties; } /**不限制等待*/ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /**限时等待*/ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * 查询栅栏是否已经broken了 * PS:重入锁方式进入查看 */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /**重置 * 重入锁方式进入,break栅栏,开启新时代 * */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /**获取还有多少资源没有就绪*/ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }