CyclicBarrier是一个同步辅助类,它容许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。经过它能够完成多个线程之间相互等待,只有当每一个线程都准备就绪后,才能各自继续往下执行后面的操做。html
CyclicBarrier经过计数器来实现的。当某个线程调用await方法时,该线程进入等待状态,计数器加1,当计数器的值达到设置的初始值时,全部因调用await进入等待状态的线程被唤醒,继续执行后续操做。由于CycliBarrier在释放等待线程后能够重用,因此称为循环barrier。CycliBarrier支持一个可选的Runnable,在计数器的值到达设定值后(但在释放全部线程以前),该Runnable运行一次,注,Runnable在每一个屏障点只运行一个。java
CyclicBarrier与CountDownLatch本质上都是依赖于volatile和CAS实现,二者比较以下:app
|CyclicBarrier|CountDownLatch| |:|:| |一个线程(或者多个),等待另外N个线程完成某个事情以后才能执执行|N个线程相互等待,任何一个线程完成以前,全部的线程都必须等待。| |一次性的|能够重复使用| |基于AQS|基于锁和Condition|dom
赛跑时,等待全部人都准备好时,才起跑:oop
public static void main(String[] args) throws InterruptedException { //若是将参数改成4,可是下面barrier的计数器的值为3,这将永远等待下去 CyclicBarrier barrier = new CyclicBarrier(3); Runnable runnable = ()->{ try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(Thread.currentThread().getName() + " 准备好了..."); //barrier的计数器会加1而且全部当前线程会进入等待状态 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 起跑!"); }; ExecutorService executor = Executors.newFixedThreadPool(3);//将固定线程的大小设置为1或者2观察运行结果 executor.submit(new Thread(runnable)); executor.submit(new Thread(runnable)); executor.submit(new Thread(runnable)); executor.shutdown(); }
//parties表示屏障拦截的线程数量,当屏障撤销时,先执行barrierAction,而后在释放全部线程 public CyclicBarrier(int parties, Runnable barrierAction) //barrierAction默认为null public CyclicBarrier(int parties) /* * 当前线程等待直到全部线程都调用了该屏障的await()方法 * 若是当前线程不是将到达的最后一个线程,将会被阻塞。解除阻塞的状况有如下几种 * 1)最后一个线程调用await() * 2)当前线程被中断 3)其余正在该CyclicBarrier上等待的线程被中断 4)其余正在该CyclicBarrier上等待的线程超时 5)其余某个线程调用该CyclicBarrier的reset()方法 * 若是当前线程在进入此方法时已经设置了该线程的中断状态或者在等待时被中断, * 将抛出InterruptedException,而且清除当前线程的已中断状态。 * 若是在线程处于等待状态时barrier被reset()或者在调用await()时 barrier 被损坏, * 将抛出 BrokenBarrierException 异常。 * 若是任何线程在等待时被中断,则其余全部等待线程都将抛出 * BrokenBarrierException 异常,并将 barrier 置于损坏状态。 * 若是当前线程是最后一个将要到达的线程,而且构造方法中提供了一个非空的屏障操做 * (barrierAction),那么在容许其余线程继续运行以前,当前线程将运行该操做。 * 若是在执行屏障操做过程当中发生异常,则该异常将传播到当前线程中, * 并将 barrier 置于损坏状态。 * * 返回值为当前线程的索引,0表示当前线程是最后一个到达的线程 */ public int await() throws InterruptedException, BrokenBarrierException //在await()的基础上增长超时机制,若是超出指定的等待时间,则抛出 TimeoutException 异常。若是该时间小于等于零,则此方法根本不会等待。 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。 public void reset()
对于失败的同步尝试,CyclicBarrier 使用了一种要么所有要么全不 (all-or-none) 的破坏模式:若是由于中断、失败或者超时等缘由,致使线程过早地离开了屏障点,那么在该屏障点等待的其余全部线程也将经过 BrokenBarrierException(若是它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。this
CyclicBarrier基于ReentrantLock和Condition机制实现。除了getParties()方法,CyclicBarrier的其余方法都须要获取锁。.net
在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用await方法时,将拦截的线程数减1,而后判断剩余拦截数是否为初始值parties,若是不是,进入Lock对象的条件队列等待。若是是,执行barrierAction对象的Runnable方法,而后将锁的条件队列中的全部线程放入锁等待队列中,这些线程会依次的获取锁、释放锁。线程
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock();//重入锁 private final Condition trip = lock.newCondition();//等待条件 private final int parties;//拦截的线程数量 private final Runnable barrierCommand; //当屏障撤销时,须要执行的屏障操做 private Generation generation = new Generation(); //当前的Generation。每当屏障失效或者开闸以后都会自动替换掉。从而实现重置的功能 //还能阻塞的线程数(即parties-当前阻塞的线程数),当新建generation或generation被破坏时,count会被重置。由于对Count的操做都是在获取锁以后,因此不须要其余同步措施。 private int count; private static class Generation { boolean broken = false;//当前屏障是否被破坏 }
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;//保存此时的generation if (g.broken)//若是当前屏障被破坏则抛出异常 throw new BrokenBarrierException(); //判断线程是否被中断,若是被中断,调用breakBarrier()进行屏障破坏处理,并抛出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //剩余count递减,并赋值给线程索引,做为方法的返回值 int index = --count; //若是线程索引将为0,说明当前线程是最后一个到达的线程。执行可能存在的屏障操做 barrierCommand,设置下一个Generation。至关于每次开闸以后都进行了一次reset。 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();//同步执行barrierCommand ranAction = true; nextGeneration();//执行成功设置下一个nextGeneration return 0; } finally { if (!ranAction)//若是barrierCommand执行失败,进行屏障破坏处理 breakBarrier(); } } //若是当前线程不是最后一个到达的线程 // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await();//若是没有超时调用Condition的await()方法阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//设置超时时间,调用Condition的awaitNanos()方法阻塞 } catch (InterruptedException ie) { //若是当前线程被中断,则判断是否有其余线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;不然再次中断当前线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //这种捕获了InterruptException以后调用Thread.currentThread().interrupt()是一种通用的方式。其实就是为了保存中断状态,从而让其余更高层次的代码注意到这个中断。 Thread.currentThread().interrupt(); } } //若是屏障被破坏,当前线程抛BrokenBarrierException if (g.broken) throw new BrokenBarrierException(); //若是已经换代,直接返回index(last thread已经执行的nextGeneration,但当前线程尚未执行到该语句) if (g != generation) return index; //超时,进行屏障破坏处理,并抛TimeoutException if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//释放锁 } }
//将当前屏障置为破坏状态、重置count、并唤醒全部被阻塞的线程。 //必须先获取锁,才能调用此方法 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
//唤醒trip上等待的全部线程,设置下一个Generation private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
//重置屏障,先进行屏障破坏处理,再设置下一代generation public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
参考地址:code