CyclicBarrier是JDK1.5提供容许一组线程等待彼此都达到一个共同的障碍点的同步的工具。CyclicBarrier适用于固定大小线程池,能够设置一个Runnable任务,当各线程达到共同的障碍点时触发这个任务。java
//建立线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); //建立屏障 static CyclicBarrier cb = new CyclicBarrier(10,new Runnable() { public void run() { System.out.println("到达屏障"); } }); public static void main(String[] args) { //提交任务 for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } }
如上例子建立线程为10的固定线程池,建立值为10的屏障,并设置一个Runnable任务。运行main方法提交任务当任务提交到10的时候到达屏障点,会运行Runnable任务并输出"到达屏障"。app
CyclicBarrier是利用ReentrantLock和Condition对扣减屏障值操做进行加锁,加锁后释放锁而后阻塞直到屏障值为0被唤醒。ide
下面来看下CyclicBarrier的成员变量工具
private final ReentrantLock lock = new ReentrantLock();
对扣减屏障值操做进行加锁用。oop
private final Condition trip = lock.newCondition();
对扣减屏障值操做后阻塞线程用,源码分析
private final int parties;
屏障值最大值,不可修改。this
private final Runnable barrierCommand;
各线程到达屏障执行的任务。线程
private Generation generation = new Generation();
Generation类型对象,此类型里成员只有一个boolean类型的变量,做用是判断屏障是否被打破。code
private int count;
屏障值,操做扣减用。对象
await()源码以下
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
直接调用dowait(false, 0L)方法,第一个参数表示是否支持等待超时,第二个参数表示超时时长。await()不须要超时这里传了false和0L。由于await()不须要超时TimeoutException这个异常也不可能发生。
下面来看下dowait(false, 0L)的源码
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { final Generation g = generation; //1.若是屏障被打破抛出屏障打破异常 if (g.broken) throw new BrokenBarrierException(); //2.若是当前线程被中断抛出中断异常 if (Thread.interrupted()) { //3.打破屏障 breakBarrier(); throw new InterruptedException(); } //4.屏障值减一 int index = --count; //5.若是减一之后屏障值等于0,就要唤醒全部的阻塞线程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //6.是否配置了任务,若是配置了则执行 if (command != null) command.run(); ranAction = true; //7.若是任务正常运行结束,全部的阻塞线程,并重置屏障值 nextGeneration(); return 0; } finally { //8.若是任务运行出现异常,则打破屏障 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { //9.若是不支持等待超时,则调用await()一直等待 if (!timed) trip.await(); //10.若是支持等待超时,则等待nanos时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //11.若是线程被中断。若是g == generation不成立说明当前线程已经被唤醒,这里说明还没被唤醒的中断就要打破屏障,不然就标记中断让上层处理。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } //11.若是已经打破屏障,抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); //12.g != generation成立说明已经被激活,这里正常结束 if (g != generation) return index; //13.若是超时,则打破屏障抛出超时异常。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁 lock.unlock(); } }
dowait(false, 0L)主要逻辑就是将屏障值count减1,而后进入等待,直到count等于0到达屏障点被唤醒。 这里须要注意的是若是一个线程打破屏障,则全部的线程都会被打破抛出BrokenBarrierException异常,而且屏障被打破后若是想继续使用必须调用reset()方法重置。
下面来看下breakBarrier()方法
private void breakBarrier() { //设置打破屏障状态 generation.broken = true; //将count设置成原来的值 count = parties; //唤醒全部其余线程 trip.signalAll(); }
breakBarrier()就是设置打破屏障状态为ture,而后唤醒因此其余阻塞线程,其余阻塞唤醒后会抛出BrokenBarrierException异常。
下面来看下nextGeneration()方法
private void nextGeneration() { // 唤醒全部其余线程 trip.signalAll(); // 将count设置成原来的值 count = parties; //初始化屏障状态 generation = new Generation(); }
nextGeneration()跟breakBarrier()相似,可是nextGeneration()是从新初始化屏障状态的,因此调用这个方法后CyclicBarrier可重用。
reset()源码以下
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //打破屏障 breakBarrier(); // break the current generation //重置CyclicBarrier状态 nextGeneration(); // start a new generation } finally { lock.unlock(); } }
reset()很简单,先打破屏障,终止各线程等待状态使其余线程抛出BrokenBarrierException异常,而后重置CyclicBarrier状态,使其可重用。这里官方推荐不要重用,从新建立一个CyclicBarrier使用,官方给的缘由也比较含糊。