源码:html
package java.util.concurrent; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class CyclicBarrier { //使用ReentrantLock可重入独占锁 private final ReentrantLock lock = new ReentrantLock(); //建立一个条件队列 private final Condition trip = lock.newCondition(); //经过构造器传入的参数.表示总的等待线程的数量 private final int parties; //当屏障正常打开后运行的程序,经过最后一个调用await的线程来执行 private final Runnable barrierCommand; //当前的Generation。每当屏障失效或者开闸以后都会自动替换掉。从而实现重置的功能 private Generation generation = new Generation(); //实际仍在等待的线程数.当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties private int count; //内部类 private static class Generation { boolean broken = false;//表示当前的屏障是否被打破 } //建立一个CyclicBarrier实例,parties指定参与相互等待的线程数 public CyclicBarrier(int parties) { this(parties, null); } //建立一个CyclicBarrier实例,parties指定参与相互等待的线程数 //barrierAction指定当全部线程到达屏障点以后,首先执行的操做,该操做由最后一个进入屏障点的线程执行。 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //返回参与相互等待的线程数 public int getParties() { return parties; } //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 //直到全部线程都到达屏障点,当前线程才会被唤醒 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 //在timeout指定的超时时间内,等待其余参与线程到达屏障点 //若是超出指定的等待时间,则抛出TimeoutException异常,若是该时间小于等于零,则此方法根本不会等待 public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); } //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 //在timeout指定的超时时间内,等待其余参与线程到达屏障点 //若是超出指定的等待时间,则抛出TimeoutException异常,若是该时间小于等于零,则此方法根本不会等待 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken)//若是当前Generation是处于打破状态则传播这个BrokenBarrierExcption throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier();//若是当前线程被中断则使得当前generation处于打破状态,重置剩余count,而且唤醒状态变量.这时候其余线程会传播BrokenBarrierException throw new InterruptedException(); } int index = --count;//尝试下降当前count if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //当全部参与的线程都到达屏障点,当即去唤醒全部处于休眠状态的线程,恢复执行 nextGeneration(); return 0; } finally { if (!ranAction)//若是运行command失败也会致使当前屏障被打破 breakBarrier(); } } for (;;) { try { if (!timed) //让当前执行的线程阻塞,处于休眠状态 trip.await(); else if (nanos > 0L) //让当前执行的线程阻塞,在超时时间内处于休眠状态 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //唤醒全部处于休眠状态的线程,恢复执行 //重置count值为parties //重置中断状态为false private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); } //唤醒全部处于休眠状态的线程,恢复执行 //重置count值为parties //重置中断状态为true private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } //判断此屏障是否处于中断状态。 //若是由于构造或最后一次重置而致使中断或超时,从而使一个或多个参与者摆脱此屏障点,或由于异常而致使某个屏障操做失败,则返回true;不然返回false public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //将屏障重置为其初始状态 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //唤醒全部等待的线程继续执行,并设置屏障中断状态为true breakBarrier(); //唤醒全部等待的线程继续执行,并设置屏障中断状态为false nextGeneration(); } finally { lock.unlock(); } } //返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
extends Objectjava
一个同步辅助类:它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。api
在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 颇有用。函数
由于该 barrier 在释放等待线程后能够重用,因此称它为循环 的 barrier。this
CyclicBarrier 支持一个可选的 Runnable
命令,在一组线程中的最后一个线程到达以后(但在释放全部线程以前),该命令只在每一个屏障点运行一次。若在继续全部参与线程以前更新共享状态,此屏障操做 颇有用。spa
构造方法摘要.net
CyclicBarrier(int parties) 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。 |
CyclicBarrier(int parties, Runnable barrierAction) 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。 |
方法摘要线程
int |
await() 在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。 |
int |
await(long timeout, TimeUnit unit) 在全部参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。 |
int |
getNumberWaiting() 返回当前在屏障处等待的参与者数目。 |
int |
getParties() 返回要求启动此 barrier 的参与者数目。 |
boolean |
isBroken() 查询此屏障是否处于损坏状态。 |
void |
reset() 将屏障重置为其初始状态。 |
public CyclicBarrier(int parties,Runnable barrierAction)
建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。调试
参数:code
parties
- 在启动 barrier 前必须调用 await()
的线程数
barrierAction
- 在启动 barrier 时执行的命令;若是不执行任何操做,则该参数为 null
抛出:
IllegalArgumentException
- 若是 parties
小于 1
public CyclicBarrier(int parties)
建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。
参数:
parties
- 在启动 barrier 前必须调用 await()
的线程数
抛出:
IllegalArgumentException
- 若是 parties
小于 1
public int getParties()
返回要求启动此 barrier 的参与者数目。
返回:
要求启动此 barrier 的参与者数目
public int await() throws InterruptedException, BrokenBarrierException
在全部 参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。
若是当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生如下状况之一前,该线程将一直处于休眠状态:
reset()
。若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。
若是在线程处于等待状态时 barrier 被 reset()
,或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException
异常。
若是任何线程在等待时被 中断,则其余全部等待线程都将抛出 BrokenBarrierException
异常,并将 barrier 置于损坏状态。
若是当前线程是最后一个将要到达的线程,而且构造方法中提供了一个非空的屏障操做,则在容许其余线程继续运行以前,当前线程将运行该操做。若是在执行屏障操做过程当中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
返回:
到达的当前线程的索引,其中,索引 getParties()
- 1 指示将到达的第一个线程,零指示最后一个到达的线程
抛出:
InterruptedException
- 若是当前线程在等待时被中断
BrokenBarrierException
- 若是 另外一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await
时 barrier 被损坏,抑或因为异常而致使屏障操做(若是存在)失败。
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
在全部 参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。
若是当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生如下状况之一前,该线程将一直处于休眠状态:
reset()
。若是当前线程,在如下状况中的一种时:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。
若是超出指定的等待时间,则抛出 TimeoutException
异常。若是该时间小于等于零,则此方法根本不会等待。
若是在线程处于等待状态时 barrier 被 reset()
,或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException
异常。
若是任何线程在等待时被中断,则其余全部等待线程都将抛出 BrokenBarrierException
,并将屏障置于损坏状态。
若是当前线程是最后一个将要到达的线程,而且构造方法中提供了一个非空的屏障操做,则在容许其余线程继续运行以前,当前线程将运行该操做。若是在执行屏障操做过程当中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
参数:
timeout
- 等待 barrier 的时间
unit
- 超时参数的时间单位
返回:
到达的当前线程的索引,其中,索引 getParties()
- 1 指示第一个将要到达的线程,零指示最后一个到达的线程
抛出:
InterruptedException
- 若是当前线程在等待时被中断
TimeoutException
- 若是超出了指定的超时时间
BrokenBarrierException
- 若是 另外一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者调用 await
时 barrier 被损坏,抑或因为异常而致使屏障操做(若是存在)失败。
public boolean isBroken()
查询此屏障是否处于损坏状态。
返回:
若是屡次调用构造函数或者使用重置函数reset(),在屏障等待的参与者的等待状态会被中断或超时,从而抛出异常。由于异常而致使某个屏障操做失败,则返回 true
;不然返回 false
。
public void reset()
将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException
。
注意:在因为其余缘由形成损坏(broken)以后,实行重置可能会变得很复杂;此时须要使用其余方式从新同步线程,并选择其中一个线程来执行重置。与为后续使用建立一个新 barrier 相比,这种方法可能更好一些。
public int getNumberWaiting()
返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
返回:
当前阻塞在 await()
中的参与者数目。
package com.thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest1 extends Thread { private static int SIZE = 5; private static CyclicBarrier cb; public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // CyclicBarrier的count减1,若count等于0,则唤醒在屏障处等待的全部线程 cb.await(); System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { cb = new CyclicBarrier(SIZE); // 新建5个任务 for (int i = 0; i < SIZE; i++) new CyclicBarrierTest1().start(); } }
运行结果:
Thread-1 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-4 continued.
Thread-3 continued.
Thread-2 continued.
Thread-1 continued.
Thread-0 continued.
package com.thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 extends Thread { private static int SIZE = 5; private static CyclicBarrier cb; public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // CyclicBarrier的count减1,若count等于0,则唤醒在屏障处等待的全部线程 cb.await(); System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { cb = new CyclicBarrier(SIZE, new Runnable() { public void run() {//当其余的线程都已达到barrier,先执行当前任务,再让其余线程继续执行 System.out.println("CyclicBarrier's parties is: " + cb.getParties()); } }); // 新建5个任务 for (int i = 0; i < SIZE; i++) new CyclicBarrierTest1().start(); } }
运行结果:
Thread-0 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-1 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-3 continued. Thread-1 continued. Thread-4 continued. Thread-0 continued. Thread-2 continued.