上次面试中问到AQS简直不要太痛苦,全是问的源码。可是源码有时间仍是要看看的,毕竟对于提高咱们的写代码的能力仍是有帮助的。今天的面试紧接上回的AQS,内容是基于AQS实现的四大并发工具类: CyclicBarrier,CountDownLatch,Semaphore和Exchanger,简要分析实现原理,着重讲述如何使用。java
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); //parties变量表示拦截线程的总数量,count变量表示拦截线程的剩余须要数量 private final int parties; //barrierCommand变量为CyclicBarrier接收的Runnable命令,用于在线程到达屏障时,优先执行barrierCommand,用于处理更加复杂的业务场景。 private final Runnable barrierCommand; //generation变量表示CyclicBarrier的更新换代 private Generation generation = new Generation(); 复制代码
能够看出CyclicBarrier内部是使用重入锁和Condition的。它有两个构造函数:mysql
/**
建立一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操做,该操做由最后一个进入barrier的线程执行。 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** 建立一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预约义的操做。 */ public CyclicBarrier(int parties) { this(parties, null); } 复制代码
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { //分代 final Generation g = generation; //当前generation已损坏,抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); //若是线程中断,终止CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //进来一个线程,count-1 int index = --count; //若是count==0表示全部线程均已到达屏障,能够触发barrierCommand任务 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒全部等待线程,并更新generation nextGeneration(); return 0; } finally { //若是barrierCommand执行失败,终止CyclicBarrier if (!ranAction) breakBarrier(); } } for (;;) { try { //若是不是超时等待,则调用Condition.await()方法等待 if (!timed) trip.await(); else if (nanos > 0L) //若是是超时等待,则调用Condition.awaitNanos()等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //generation已经更新,返回Index if (g != generation) return index; //超时等待而且时间已经到了,终止CyclicBarrier,并抛出超时异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁 lock.unlock(); } 复制代码
若是该线程不是到达的最后一个线程,则它会一直处于等待状态,除非发生如下状况:
一、最后一个到达:即index=0
二、超出了等待时间。
三、其余的某个线程中断当前线程。
四、其余某个线程中断另外一个等待的线程。
五、其余某个线程在等待barrier超时。
六、其余某个线程在此barrier调用reset方法,用于将该屏障置为初始状态。web
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier; private static final Integer THREAD_COUNT = 10; static class CyclicBarrierThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()+"到教室了"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String [] args) { cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() { @Override public void run() { System.out.println("同窗们都到齐了,开始上课吧..."); } }); for (int i=0; i< THREAD_COUNT; i++) { Thread thread = new Thread(new CyclicBarrierThread()); thread.start(); } } } 复制代码
运行结果以下:面试
面试官:有一个和CyclicBarrier相似的工具类叫CountDownLatch,你能说下吗?redis
我:CyclicBarrier描述的是“容许一组线程相互等待,直到到达某个公共屏障点,才会进行后续任务”,而CountDownLatch所描述的是“在完成一组正在其余线程中执行的操做以前,它容许 一个或多个线程一直等待”。在API中是这样描述的:用给定的计数初始化CountDownLatch。因为调用了countDown方法,因此在当前计数到达零以前,await方法会一直受阻塞。以后,会释放 全部等待的线程,await的全部后续调用都将当即返回。这种现象只出现一次(计数没法被重置。若是须要重置计数,请考虑使用CyclicBarrier)。
CountDownLatch是经过一个计数器来实现的,当咱们在new一个CountDownLatch对象的时候,须要传入计数器的值,该值表示线程的数量。每当一个线程完成本身的任务后,计数器的值就会 减一。当计数器的值变为0时,就表示全部线程均已完成任务,而后就能够恢复等待的线程继续执行了。
CountDownLatch和CyclicBarrier仍是有一点区别的:
一、CountDownLatch的做用是容许1或多个线程等待其余线程完成执行;而CyclicBarrier则是容许多个线程互相等待。
二、CountDownLatch的计数器没法被重置。CyclicBarrier的计数器能够被重置后使用。spring
面试官:你能说下CountDownlatch是怎么实现的吗?sql
我:CountDownlatch内部依赖Sync实现,而Sync继承AQS。以下图:数据库
CountDownlatch仅提供了一个构造方法,以下:编程
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 复制代码
再来看看Sync,是CountDownlatch的一个内部类。mybatis
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } //获取同步状态 int getCount() { return getState(); } //尝试获取同步状态 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //尝试释放同步状态 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } 复制代码
CountDownLatch内部经过共享锁实现:
一、在建立CountDownLatch实例时,须要传递一个int型参数:count,该参数为计数器的初始值,也能够理解为该共享锁能够获取的总次数。
二、当某个线程调用await()方法,程序首先判断count的值是否为0,若是不为0的话,则会一直等待直到为0为止。
三、当其余线程调用countDown()方法时,则执行释放共享锁状态,使count-1。
四、注意CountDownLatch不能回滚重置。
public class CountDownLatchTest {
private static final Integer STUDENT_COUNT = 10; private static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT); static class TeacherThread implements Runnable { @Override public void run() { System.out.println("老师来了,等"+ STUDENT_COUNT+"位同窗都到教室了才开始上课"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(STUDENT_COUNT+"位同窗都到齐了,开始上课!"); } } static class StudentThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()+"进了教室"); countDownLatch.countDown(); } } public static void main(String [] args) { Thread teacher = new Thread(new TeacherThread()); teacher.start(); for (int i=0; i<STUDENT_COUNT; i++) { Thread student = new Thread(new StudentThread()); student.start(); } } } 复制代码
从上面的分析能够看出:信号量Semaphore是一个非负整数(>=1)。当一个线程想要访问某个共享资源时,它必须先获取Semaphore。当Semaphore>0时,获取该资源并使Semaphore-1。 若是Semaphore的值==0,则表示所有的共享资源已经被线程所有占用,新来的线程必须等待其余线程释放资源。当线程释放资源时,Semaphore则+1。
public class SemaphoreTest {
static class Parking { private Semaphore semaphore; Parking(int count) { semaphore = new Semaphore(count); } public void park() { try { //获取信号量 semaphore.acquire(); long time = (long) (Math.random()*10+1); System.out.println(Thread.currentThread().getName()+"进入停车场停车,停车时间:"+time+"秒"); //模拟停车时间 Thread.sleep(time); System.out.println(Thread.currentThread().getName()+"开出停车场..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放信号量(跟lock的用法差很少) semaphore.release(); } } } static class Car implements Runnable{ private Parking parking; Car(Parking parking) { this.parking = parking; } /** * 每辆车至关于一个线程,线程的任务就是停车 */ @Override public void run() { parking.park(); } } public static void main(String [] args) { //假设有3个停车位 Parking parking = new Parking(3); //这时候同时来了5辆车,只有3辆车能够进去停车,其他2辆车须要等待有空余车位以后才能进去停车。 for (int i=0; i<5; i++) { Thread thread = new Thread(new Car(parking)); thread.start(); } } } 复制代码
运行结果:
public class ExchangerTest {
static class ThreadA implements Runnable { private Exchanger<String> exchanger; ThreadA (Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { //模拟业务代码 Long time = (long)(Math.random()*10+1)*10; System.out.println("线程A等待了"+time+"秒"); Thread.sleep(time); //线程间数据交换 System.out.println("在线程A获得线程B的值:"+ exchanger.exchange("我是线程A")); } catch (InterruptedException e) { e.printStackTrace(); } } } static class ThreadB implements Runnable { private Exchanger<String> exchanger; ThreadB(Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { //模拟业务代码 Long time = (long)(Math.random()*10+1)*10; System.out.println("线程B等待了"+time+"秒"); Thread.sleep(time); //线程间数据交换 System.out.println("在线程B获得线程A的值:"+ exchanger.exchange("我是线程B")); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String [] args) { Exchanger<String> exchanger = new Exchanger<>(); //线程A和线程B要使用同一个exchanger才有用 Thread threadA = new Thread(new ThreadA(exchanger)); Thread threadB = new Thread(new ThreadB(exchanger)); threadA.start(); threadB.start(); } } 复制代码
运行结果:
今天面试了吗系列
Java并发编程系列:
https://juejin.im/post/5ef764985188252e7c21ad5c
https://juejin.im/post/5ee82c736fb9a047fe5c1af3
redis:
https://juejin.im/post/5dccf260f265da0bf66b626d
spring:
https://juejin.im/post/5e6d993cf265da575b1bd4af
mybatis:
https://juejin.im/post/5e80b6d76fb9a03c3f1e92a2#comment
数据库系列
mysql索引:
https://juejin.im/post/5d67702cf265da03f333664c
数据库锁:
https://juejin.im/post/5dbbc1d06fb9a0205425309e
分库分表:
https://juejin.im/post/5dc77a9451882559465e390b
数据库事务:
https://juejin.im/post/5dcb9c38f265da4d2971038c
线上问题系列
https://juejin.im/post/5ef055b7e51d45740e4275e2
java基础 https://juejin.im/post/5d5e2616f265da03b638b28a https://juejin.im/post/5d427f306fb9a06b122f1b94