疯狂创客圈 经典图书 : 《Netty Zookeeper Redis 高并发实战》 面试必备 + 面试必备 + 面试必备 【博客园总入口 】html
疯狂创客圈 经典图书 : 《SpringCloud、Nginx高并发核心编程》 大厂必备 + 大厂必备 + 大厂必备 【博客园总入口 】java
入大厂+涨工资必备: 高并发【 亿级流量IM实战】 实战系列 【 SpringCloud Nginx秒杀】 实战系列 【博客园总入口 】面试
从字面上的意思能够知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。编程
它的做用就是会让全部线程都等待完成后才会继续下一步行动。多线程
现实生活中咱们常常会遇到这样的情景,在进行某个活动前须要等待人所有都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等所有人都到齐了才出发,比赛时要等运动员都上场后才开始。并发
在JUC包中为咱们提供了一个同步工具类可以很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类能够实现一组线程相互等待,当全部线程都到达某个屏障点后再进行后续的操做。下图演示了这一过程。ide
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来讲,要简单不少,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。高并发
看以下示意图,CyclicBarrier 和 CountDownLatch 是否是很像,只是 CyclicBarrier 能够有不止一个栅栏,由于它的栅栏(Barrier)能够重复使用(Cyclic)。工具
public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction)
解析:this
parties 是参与线程的个数
第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要作的任务
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
解析:
线程调用 await() 表示本身已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的缘由多是其中一个线程 await() 时被中断或者超时
2.3.1 需求
一个线程组的线程须要等待全部线程完成任务后再继续执行下一次任务
2.3.2 代码实现
public class CyclicBarrierDemo {
static class TaskThread extends Thread { CyclicBarrier barrier; public TaskThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep(1000); System.out.println(getName() + " 到达栅栏 A"); barrier.await(); System.out.println(getName() + " 冲破栅栏 A"); Thread.sleep(2000); System.out.println(getName() + " 到达栅栏 B"); barrier.await(); System.out.println(getName() + " 冲破栅栏 B"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { int threadNum = 5; CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 完成最后任务"); } }); for(int i = 0; i < threadNum; i++) { new TaskThread(barrier).start(); } }
}
打印结果:
Thread-1 到达栅栏 A Thread-3 到达栅栏 A Thread-0 到达栅栏 A Thread-4 到达栅栏 A Thread-2 到达栅栏 A Thread-2 完成最后任务 Thread-2 冲破栅栏 A Thread-1 冲破栅栏 A Thread-3 冲破栅栏 A Thread-4 冲破栅栏 A Thread-0 冲破栅栏 A Thread-4 到达栅栏 B Thread-0 到达栅栏 B Thread-3 到达栅栏 B Thread-2 到达栅栏 B Thread-1 到达栅栏 B Thread-1 完成最后任务 Thread-1 冲破栅栏 B Thread-0 冲破栅栏 B Thread-4 冲破栅栏 B Thread-2 冲破栅栏 B Thread-3 冲破栅栏 B
从打印结果能够看出,全部线程会等待所有线程到达栅栏以后才会继续执行,而且最后到达的线程会完成 Runnable 的任务。
能够用于多线程计算数据,最后合并计算结果的场景。
而 CyclicBarrier 基于 Condition 来实现的。由于 CyclicBarrier 的源码相对来讲简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
在CyclicBarrier类的内部有一个计数器,每一个线程在到达屏障点的时候都会调用await方法将本身阻塞,此时计数器会减1,当计数器减为0的时候全部因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面咱们先看看CyclicBarrier有哪些成员变量。
//同步操做锁 private final ReentrantLock lock = new ReentrantLock(); //线程拦截器 private final Condition trip = lock.newCondition(); //每次拦截的线程数 private final int parties; //换代前执行的任务 private final Runnable barrierCommand; //表示栅栏的当前代 private Generation generation = new Generation(); //计数器 private int count; //静态内部类Generation private static class Generation { boolean broken = false; }
上面贴出了CyclicBarrier全部的成员变量,能够看到CyclicBarrier内部是经过条件队列trip来对线程进行阻塞的,而且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,之后随着每次await方法的调用而减1,直到减为0就将全部线程唤醒。CyclicBarrier有一个静态内部类Generation,该类的对象表明栅栏的当前代,就像玩游戏时表明的本局游戏,利用它能够实现循环等待。barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,须要转到下一局。在转到下一局游戏以前会将全部阻塞的线程唤醒,在唤醒全部线程以前你能够经过指定barrierCommand来执行本身的任务。我用一图来描绘下 CyclicBarrier 里面的一些概念:
接下来咱们看看它的构造器。
//构造器1 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //构造器2 public CyclicBarrier(int parties) { this(parties, null); }
CyclicBarrier有两个构造器,其中构造器1是它的核心构造器,在这里你能够指定本局游戏的参与者数量(要拦截的线程数)以及本局结束时要执行的任务,还能够看到计数器count的初始值被设置为parties。
CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。
//非定时等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //定时等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
能够看到不论是定时等待仍是非定时等待,它们都调用了dowait方法,只不过是传入的参数不一样而已。下面咱们就来看看dowait方法都作了些什么。
//核心等待方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) { throw new BrokenBarrierException(); } //检查当前线程是否被中断 if (Thread.interrupted()) { //若是当前线程被中断会作如下三件事 //1.打翻当前栅栏 //2.唤醒拦截的全部线程 //3.抛出中断异常 breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值减1 int index = --count; //计数器的值减为0则需唤醒全部线程并转换到下一代 if (index == 0) { boolean ranAction = false; try { //唤醒全部线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; //唤醒全部线程并转到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将全部线程唤醒 if (!ranAction) { breakBarrier(); } } } //若是计数器不为0则执行此循环 for (;;) { try { //根据传入的参数来决定是定时等待仍是非定时等待 if (!timed) { trip.await(); }else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其余线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操做 Thread.currentThread().interrupt(); } } //若是线程由于打翻栅栏操做而被唤醒则抛出异常 if (g.broken) { throw new BrokenBarrierException(); } //若是线程由于换代操做而被唤醒则返回计数器的值 if (g != generation) { return index; } //若是线程由于时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } ``` } finally { lock.unlock(); } }
上面贴出的代码中注释都比较详细,咱们只挑一些重要的来说。能够看到在dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,若是等于0的话就会先去执行以前指定好的任务,执行完以后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将全部线程唤醒,将计数器的值从新设为parties,最后会从新设置栅栏代次,在执行完nextGeneration方法以后就意味着游戏进入下一局。若是计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)仍是trip.await()方法,这两方法对应着定时和非定时等待。若是在等待过程当中当前线程被中断就会执行breakBarrier方法,该方法叫作打破栅栏,意味着游戏在中途被掐断,设置generation的broken状态为true并唤醒全部线程。同时这也说明在等待过程当中有一个线程被中断整盘游戏就结束,全部以前被阻塞的线程都会被唤醒。线程醒来后会执行下面三个判断,看看是否由于调用breakBarrier方法而被唤醒,若是是则抛出异常;看看是不是正常的换代操做而被唤醒,若是是则返回计数器的值;看看是否由于超时而被唤醒,若是是的话就调用breakBarrier打破栅栏并抛出异常。这里还须要注意的是,若是其中有一个线程由于等待超时而退出,那么整盘游戏也会结束,其余线程都会被唤醒。下面贴出nextGeneration方法和breakBarrier方法的具体代码。
最后,咱们来看看怎么重置一个栅栏:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
咱们设想一下,若是初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 以前,咱们调用 reset 方法,那么会发生什么?
首先,打破栅栏,那意味着全部等待的线程(3个等待的线程)会唤醒,await 方法会经过抛出 BrokenBarrierException 异常返回。而后开启新的一代,重置了 count 和 generation,至关于一切归零了。
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch 参与的线程的职责是不同的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是同样的。
CyclicBarrier 的源码实现和 CountDownLatch 截然不同,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现的。由于 CyclicBarrier 的源码相对来讲简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
疯狂创客圈 - Java高并发研习社群,为你们开启大厂之门