《今天面试了吗》-并发编程之AQS同步工具类

前言

上次面试中问到AQS简直不要太痛苦,全是问的源码。可是源码有时间仍是要看看的,毕竟对于提高咱们的写代码的能力仍是有帮助的。今天的面试紧接上回的AQS,内容是基于AQS实现的四大并发工具类: CyclicBarrier,CountDownLatch,Semaphore和Exchanger,简要分析实现原理,着重讲述如何使用。java

面试环节

  • 面试官:上次聊到AQS,你在开发过程当中用过AQS的几个工具类吗?好比 CyclicBarrier...
  • 我:用过, CyclicBarrier是一个同步辅助类。它容许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序里,这些线程必须不时的互相等待,此时CyclicBarrier 颇有用。由于CyclicBarrier在释放等待线程后能够重用,所以成为循环的屏障。 下面来看下CyclicBarrier的定义:
private final ReentrantLock lock = new ReentrantLock();
  private final Condition trip = lock.newCondition();   //parties变量表示拦截线程的总数量,count变量表示拦截线程的剩余须要数量  private final int parties;   //barrierCommand变量为CyclicBarrier接收的Runnable命令,用于在线程到达屏障时,优先执行barrierCommand,用于处理更加复杂的业务场景。  private final Runnable barrierCommand;   //generation变量表示CyclicBarrier的更新换代  private Generation generation = new Generation(); 复制代码

能够看出CyclicBarrier内部是使用重入锁和Condition的。它有两个构造函数:mysql

/**
 建立一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操做,该操做由最后一个进入barrier的线程执行。  */  public CyclicBarrier(int parties, Runnable barrierAction) {  if (parties <= 0) throw new IllegalArgumentException();  this.parties = parties;  this.count = parties;  this.barrierCommand = barrierAction;  }  /**  建立一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预约义的操做。  */  public CyclicBarrier(int parties) {  this(parties, null);  } 复制代码
  • 面试官:那CyclicBarrier是怎么让线程到达屏障后处于等待状态的呢?
  • 我:使用await()方法,每一个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。当全部线程都到达了屏障,结束阻塞,全部线程可继续执行后续逻辑。
public int await(long timeout, TimeUnit unit)
 throws InterruptedException,  BrokenBarrierException,  TimeoutException {  return dowait(true, unit.toNanos(timeout));  }    private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException,  TimeoutException {  //获取锁  final ReentrantLock lock = this.lock;  lock.lock();  try {  //分代  final Generation g = generation;   //当前generation已损坏,抛出BrokenBarrierException异常  if (g.broken)  throw new BrokenBarrierException();  //若是线程中断,终止CyclicBarrier  if (Thread.interrupted()) {  breakBarrier();  throw new InterruptedException();  }   //进来一个线程,count-1  int index = --count;  //若是count==0表示全部线程均已到达屏障,能够触发barrierCommand任务  if (index == 0) { // tripped  boolean ranAction = false;  try {  final Runnable command = barrierCommand;  if (command != null)  command.run();  ranAction = true;  //唤醒全部等待线程,并更新generation  nextGeneration();  return 0;  } finally {  //若是barrierCommand执行失败,终止CyclicBarrier  if (!ranAction)  breakBarrier();  }  }    for (;;) {  try {  //若是不是超时等待,则调用Condition.await()方法等待  if (!timed)  trip.await();  else if (nanos > 0L)  //若是是超时等待,则调用Condition.awaitNanos()等待  nanos = trip.awaitNanos(nanos);  } catch (InterruptedException ie) {  if (g == generation && ! g.broken) {  breakBarrier();  throw ie;  } else {  Thread.currentThread().interrupt();  }  }   if (g.broken)  throw new BrokenBarrierException();   //generation已经更新,返回Index  if (g != generation)  return index;  //超时等待而且时间已经到了,终止CyclicBarrier,并抛出超时异常  if (timed && nanos <= 0L) {  breakBarrier();  throw new TimeoutException();  }  }  } finally {  //释放锁  lock.unlock();  } 复制代码

若是该线程不是到达的最后一个线程,则它会一直处于等待状态,除非发生如下状况:
一、最后一个到达:即index=0
二、超出了等待时间。
三、其余的某个线程中断当前线程。
四、其余某个线程中断另外一个等待的线程。
五、其余某个线程在等待barrier超时。
六、其余某个线程在此barrier调用reset方法,用于将该屏障置为初始状态。web

  • 面试官:那CyclicBarrier什么场景下用呢?
  • 我:CyclicBarrier适用于多线程合并的操做,用于多线程计算数据,最后合并计算结果的应用场景。举个例子:
public class CyclicBarrierTest {
  private static CyclicBarrier cyclicBarrier;   private static final Integer THREAD_COUNT = 10;   static class CyclicBarrierThread implements Runnable {  @Override  public void run() {  System.out.println(Thread.currentThread().getName()+"到教室了");  try {  cyclicBarrier.await();  } catch (Exception e) {  e.printStackTrace();  }  }  }   public static void main(String [] args) {  cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {  @Override  public void run() {  System.out.println("同窗们都到齐了,开始上课吧...");  }  });   for (int i=0; i< THREAD_COUNT; i++) {  Thread thread = new Thread(new CyclicBarrierThread());  thread.start();  }   } }  复制代码

运行结果以下:面试

  • 面试官:有一个和CyclicBarrier相似的工具类叫CountDownLatch,你能说下吗?redis

  • 我:CyclicBarrier描述的是“容许一组线程相互等待,直到到达某个公共屏障点,才会进行后续任务”,而CountDownLatch所描述的是“在完成一组正在其余线程中执行的操做以前,它容许 一个或多个线程一直等待”。在API中是这样描述的:用给定的计数初始化CountDownLatch。因为调用了countDown方法,因此在当前计数到达零以前,await方法会一直受阻塞。以后,会释放 全部等待的线程,await的全部后续调用都将当即返回。这种现象只出现一次(计数没法被重置。若是须要重置计数,请考虑使用CyclicBarrier)
    CountDownLatch是经过一个计数器来实现的,当咱们在new一个CountDownLatch对象的时候,须要传入计数器的值,该值表示线程的数量。每当一个线程完成本身的任务后,计数器的值就会 减一。当计数器的值变为0时,就表示全部线程均已完成任务,而后就能够恢复等待的线程继续执行了。
    CountDownLatch和CyclicBarrier仍是有一点区别的:
    一、CountDownLatch的做用是容许1或多个线程等待其余线程完成执行;而CyclicBarrier则是容许多个线程互相等待。
    二、CountDownLatch的计数器没法被重置。CyclicBarrier的计数器能够被重置后使用。spring

  • 面试官:你能说下CountDownlatch是怎么实现的吗?sql

  • 我:CountDownlatch内部依赖Sync实现,而Sync继承AQS。以下图:数据库

CountDownlatch仅提供了一个构造方法,以下:编程

public CountDownLatch(int count) {
 if (count < 0) throw new IllegalArgumentException("count < 0");  this.sync = new Sync(count);  } 复制代码

再来看看Sync,是CountDownlatch的一个内部类。mybatis

private static final class Sync extends AbstractQueuedSynchronizer {
 private static final long serialVersionUID = 4982264981922014374L;   Sync(int count) {  setState(count);  }   //获取同步状态  int getCount() {  return getState();  }   //尝试获取同步状态  protected int tryAcquireShared(int acquires) {  return (getState() == 0) ? 1 : -1;  }   //尝试释放同步状态  protected boolean tryReleaseShared(int releases) {  for (;;) {  int c = getState();  if (c == 0)  return false;  int nextc = c-1;  if (compareAndSetState(c, nextc))  return nextc == 0;  }  }  } 复制代码

CountDownLatch内部经过共享锁实现:
一、在建立CountDownLatch实例时,须要传递一个int型参数:count,该参数为计数器的初始值,也能够理解为该共享锁能够获取的总次数。
二、当某个线程调用await()方法,程序首先判断count的值是否为0,若是不为0的话,则会一直等待直到为0为止。
三、当其余线程调用countDown()方法时,则执行释放共享锁状态,使count-1。
四、注意CountDownLatch不能回滚重置。

  • 面试官:那你说下CountDownLatch是怎么用的?
  • 我:
    一、CountDownlatch提供了await()方法,来使当前线程在锁存器递减倒数至0之前一直等待,除非线程被中断,当前线程能够是咱们的一个主线程。 二、CountDownlatch提供了countDown()方法,在子线程执行完后进行操做,递减锁存器的计数,若是计数到达0,则唤醒全部等待的线程(咱们的主线程)。 说完我拿起笔刷刷的写起来:
public class CountDownLatchTest {
  private static final Integer STUDENT_COUNT = 10;   private static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT);   static class TeacherThread implements Runnable {  @Override  public void run() {  System.out.println("老师来了,等"+ STUDENT_COUNT+"位同窗都到教室了才开始上课");  try {  countDownLatch.await();  } catch (InterruptedException e) {  e.printStackTrace();  }  System.out.println(STUDENT_COUNT+"位同窗都到齐了,开始上课!");  }  }   static class StudentThread implements Runnable {  @Override  public void run() {  System.out.println(Thread.currentThread().getName()+"进了教室");  countDownLatch.countDown();  }  }   public static void main(String [] args) {  Thread teacher = new Thread(new TeacherThread());  teacher.start();  for (int i=0; i<STUDENT_COUNT; i++) {  Thread student = new Thread(new StudentThread());  student.start();  }  } } 复制代码
  • 面试官:很好。懂得活学活用。你了解信号量Semaphore吗?
  • 我: 信号量Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch同样,其本质上是一个“共享锁”。在API是这么介绍信号量的:一个计数信号量,从概念上讲,信号量维护了一个许可集。
    一、若有必要,在许可可用前会阻塞每个acquire,而后再获取该许可。
    二、每一个release添加一个许可,从而可能释放一个正在阻塞的获取者。可是不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采起相应的行动。
    下面以一个停车场的例子来阐述Semaphore:
    一、假设停车场有5个停车位,一开始车位都空着,而后前后来了三辆车,车位够,安排进去停车,而后又来三辆,这个时候因为只有两个车位,因此只能停两辆,有一辆须要在外面候着,直到 停车场有空位。
    二、从程序角度讲,停车场就至关于信号量Semaphore,其中许可数为5,车辆至关于线程,当来一辆车,许可数就会减1。当停车场没车位了(许可数==0),其余来的车辆必须等待。若是 有一辆车开车停车场,则许可数+1,而后放进来一辆车。

从上面的分析能够看出:信号量Semaphore是一个非负整数(>=1)。当一个线程想要访问某个共享资源时,它必须先获取Semaphore。当Semaphore>0时,获取该资源并使Semaphore-1。 若是Semaphore的值==0,则表示所有的共享资源已经被线程所有占用,新来的线程必须等待其余线程释放资源。当线程释放资源时,Semaphore则+1。

  • 面试官:你能用Semaphore实现这个停车的例子吗?
  • 我(又刷刷的写起来)
public class SemaphoreTest {
  static class Parking {   private Semaphore semaphore;   Parking(int count) {  semaphore = new Semaphore(count);  }   public void park() {  try {  //获取信号量  semaphore.acquire();  long time = (long) (Math.random()*10+1);  System.out.println(Thread.currentThread().getName()+"进入停车场停车,停车时间:"+time+"秒");  //模拟停车时间  Thread.sleep(time);  System.out.println(Thread.currentThread().getName()+"开出停车场...");  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  //释放信号量(跟lock的用法差很少)  semaphore.release();  }  }  }   static class Car implements Runnable{   private Parking parking;   Car(Parking parking) {  this.parking = parking;  }   /**  * 每辆车至关于一个线程,线程的任务就是停车  */  @Override  public void run() {  parking.park();  }  }   public static void main(String [] args) {  //假设有3个停车位  Parking parking = new Parking(3);   //这时候同时来了5辆车,只有3辆车能够进去停车,其他2辆车须要等待有空余车位以后才能进去停车。  for (int i=0; i<5; i++) {  Thread thread = new Thread(new Car(parking));  thread.start();  }  } } 复制代码

运行结果:

  • 面试官:很好,那我再问你一个,Exchanger交换器知道不?
  • 我: Exchanger是一个同步器,字面上就能够看出这个类的主要做用是交换数据。Exchanger有点相似CyclicBarrier,前面说到CyclicBarrier是一个栅栏,到达栅栏的 线程须要等待必定数量的线程到达后,才能经过栅栏。Exchanger能够当作是一个双向的栅栏。线程1到达栅栏后,会首先观察有没有其余线程已经到达栅栏,若是没有就会等待。 若是已经有其余线程(好比线程2)到达了,就会以成对的方式交换各自携带的信息,所以Exchanger很是适合两个线程之间的数据交换。 以下图:
  • 面试官:那你能跟我举个例子说下Exchanger怎么用吗?
  • 我:固然能够。
public class ExchangerTest {
  static class ThreadA implements Runnable {  private Exchanger<String> exchanger;   ThreadA (Exchanger<String> exchanger) {  this.exchanger = exchanger;  }   @Override  public void run() {  try {  //模拟业务代码  Long time = (long)(Math.random()*10+1)*10;  System.out.println("线程A等待了"+time+"秒");  Thread.sleep(time);  //线程间数据交换  System.out.println("在线程A获得线程B的值:"+ exchanger.exchange("我是线程A"));  } catch (InterruptedException e) {  e.printStackTrace();  }  }  }   static class ThreadB implements Runnable {  private Exchanger<String> exchanger;   ThreadB(Exchanger<String> exchanger) {  this.exchanger = exchanger;  }   @Override  public void run() {  try {  //模拟业务代码  Long time = (long)(Math.random()*10+1)*10;  System.out.println("线程B等待了"+time+"秒");  Thread.sleep(time);  //线程间数据交换  System.out.println("在线程B获得线程A的值:"+ exchanger.exchange("我是线程B"));  } catch (InterruptedException e) {  e.printStackTrace();  }  }  }   public static void main(String [] args) {  Exchanger<String> exchanger = new Exchanger<>();  //线程A和线程B要使用同一个exchanger才有用  Thread threadA = new Thread(new ThreadA(exchanger));  Thread threadB = new Thread(new ThreadB(exchanger));  threadA.start();  threadB.start();  } } 复制代码

运行结果:

往期精彩回顾

今天面试了吗系列
Java并发编程系列:
https://juejin.im/post/5ef764985188252e7c21ad5c
https://juejin.im/post/5ee82c736fb9a047fe5c1af3
redis:
https://juejin.im/post/5dccf260f265da0bf66b626d
spring:
https://juejin.im/post/5e6d993cf265da575b1bd4af
mybatis:
https://juejin.im/post/5e80b6d76fb9a03c3f1e92a2#comment

数据库系列
mysql索引:
https://juejin.im/post/5d67702cf265da03f333664c
数据库锁:
https://juejin.im/post/5dbbc1d06fb9a0205425309e
分库分表:
https://juejin.im/post/5dc77a9451882559465e390b
数据库事务:
https://juejin.im/post/5dcb9c38f265da4d2971038c

线上问题系列
https://juejin.im/post/5ef055b7e51d45740e4275e2

java基础 https://juejin.im/post/5d5e2616f265da03b638b28a https://juejin.im/post/5d427f306fb9a06b122f1b94

相关文章
相关标签/搜索