咱们知道,CountDownLatch的计数器是一次性的,它不能重置。也就是说,当count值变为0时,再调用await()方法会当即返回,不会阻塞。
本文要说的CyclicBarrier就是一种能够重置计数器的线程同步工具类。CyclicBarrier字面意思是“回环屏障”,它可让一组线程所有到达一个状态后再所有同时往下执行。之因此叫回环是由于当全部线程执行完毕,并重置CyclicBarrier的状态后它能够被重用。而之因此叫屏障是由于当某个线程调用await方法后就会被阻塞,这个阻塞点就称为屏障,等其余全部线程都调用了await方法后,这组线程就会一块儿冲破屏障,并往下执行。java
两个子任务分别执行本身的工做,等它们都执行完后,主任务汇总子任务的结果,并作一些处理,处理完成后两个子任务又继续作其余事情。示例代码:编程
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { try { System.out.println("main task merge subtask result begin"); // simulate merge work Thread.sleep(5000); System.out.println("main task merge subtask result finished"); } catch (InterruptedException e) { // ignore } }); public static void main(String[] args) { Thread thread1 = new Thread(() -> { try { Thread.sleep(4000); System.out.println("thread1 finished its work"); cyclicBarrier.await(); System.out.println("thread1 continue work"); } catch (InterruptedException | BrokenBarrierException e) { // ignore } }); Thread thread2 = new Thread(() -> { try { Thread.sleep(5000); System.out.println("thread2 finished its work"); cyclicBarrier.await(); System.out.println("thread2 continue work"); } catch (InterruptedException | BrokenBarrierException e) { // ignore } }); thread1.start(); thread2.start(); } }
输出结果:多线程
能够看到,线程1和线程2调用await()时,会被阻塞,等主线程任务完成后,线程1和线程2就会冲破屏障,继续往下执行。这里的主线程合并工做是可选的,也就是说能够直接new CyclicBarric(int parties),这种状况下就没有到达屏障后的合并工做,会直接在所有线程到达屏障后同时冲破屏障往下执行。能够比喻成举办同窗聚会的场景。有20我的参加聚会,第1我的到达集合地点后要等其余人,第2个,第3个,...第19我的也须要等,当最后一我的到的时候,所有的20我的就能够出发去嗨皮了。并发
上面介绍的是“屏障”的应用场景,再来看个“回环”的应用场景。app
假设一个任务由阶段1,阶段2,阶段3这三个阶段组成,每一个线程都串行的依次执行阶段1,2,3。当多个线程执行任务时,必须保证等全部线程都执行完阶段1后,才能执行阶段2,一样地,也必须保证全部线程都执行完阶段2后,才能执行阶段3。示例代码:工具
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo2 { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { Thread thread1 = new Thread(() -> { try { System.out.println("thread1 step 1"); cyclicBarrier.await(); System.out.println("thread1 step 2"); cyclicBarrier.await(); System.out.println("thread1 step 3"); } catch (InterruptedException | BrokenBarrierException e) { // ignore } }); Thread thread2 = new Thread(() -> { try { System.out.println("thread2 step 1"); cyclicBarrier.await(); System.out.println("thread2 step 2"); cyclicBarrier.await(); System.out.println("thread2 step 3"); } catch (InterruptedException | BrokenBarrierException e) { // ignore } }); thread1.start(); thread2.start(); } }
输出结果以下:
能够看到,实现了这种同阶段等待的效果。oop
先来看看重要属性:this
private static class Generation { // 屏障是否被打破 boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 on each generation. * It is reset to parties on each new generation or when broken. */ private int count;
能够看到,CyclicBarrier里用了独占锁ReentrantLock实现多线程间的计数器同步,parties表示当多少个线程到达屏障后,冲破屏障往下执行,而count表示当前还剩余多少个线程还未到达屏障,当全部线程都冲破屏障后,它又会在新一轮(new generation)被重置为parties的值。也就是说,count和Generation是用来实现重置效果的。spa
再看看构造方法的属性赋值:线程
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
再来看看关键方法:
await()
public int await() throws InterruptedException, BrokenBarrierException { try { // false表示不设置超时 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
dowait()方法代码以下:
// timed:是否超时等待, nanos:超时时间 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 若是index为0,表示全部线程都已到达了屏障,此时去执行初始化时设定的barrierCommand(若是有的话) if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 唤醒其余线程,并重置进行下一轮 nextGeneration(); // 返回 return 0; } finally { if (!ranAction) breakBarrier(); } } // 不然须要等其余线程都达到屏障 // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 区分超时等待与不超时等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // g != generation 说明被唤醒后已重置了轮次,说明全部线程均已到达线程屏障,能够返回了。 if (g != generation) return index; // 等待超时,抛出超时异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
其中,nextGeneration()方法以下:
private void nextGeneration() { // signal completion of last generation // 唤醒等待在trip条件(即屏障)上的其余全部线程 trip.signalAll(); // set up next generation // 重置count的值为初始值parties count = parties; // 重置当前轮次 generation = new Generation(); }
参考资料:《Java并发编程之美》