常见问题:AQS 原理?;CountDownLatch和CyclicBarrier了解吗,二者的区别是什么?用过Semaphore吗?
本节思惟导图:html
【强烈推荐!非广告!】阿里云双11褥羊毛活动: https://m.aliyun.com/act/team1111/#/share?params=N.FF7yxCciiM.hf47liqn 差很少一折,不过仅限阿里云新人购买,不是新人的朋友本身找方法买哦!
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。java
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用普遍的大量的同步器,好比咱们提到的ReentrantLock,Semaphore,其余的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。固然,咱们本身也能利用AQS很是轻松容易地构造出符合咱们本身需求的同步器。面试
在面试中被问到并发知识的时候,大多都会被问到“请你说一下本身对于AQS原理的理解”。下面给你们一个示例供你们参加,面试不是背题,你们必定要假如本身的思想,即便加入不了本身的思想也要保证本身可以通俗的讲出来而不是背出来。
下面大部份内容其实在AQS类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话能够看看源码。编程
AQS核心思想是,若是被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工做线程,而且将共享资源设置为锁定状态。若是被请求的共享资源被占用,那么就须要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。设计模式
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
看个AQS(AbstractQueuedSynchronizer)原理图:安全
AQS使用一个int成员变量来表示同步状态,经过内置的FIFO队列来完成获取资源线程的排队工做。AQS使用CAS对该同步状态进行原子操做实现对其值的修改。多线程
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息经过procted类型的getState,setState,compareAndSetState进行操做并发
//返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state = newState; } //原子地(CAS操做)将同步状态值设置为给定值update若是当前同步状态的值等于expect(指望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS定义两种资源共享方式框架
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:ide
ReentrantReadWriteLock 能够当作是组合式,由于ReentrantReadWriteLock也就是读写锁容许多个线程同时对某一资源进行读。
不一样的自定义同步器争用共享资源的方式也不一样。自定义同步器在实现时只须要实现共享资源 state 的获取与释放方式便可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮咱们实现好了。
同步器的设计是基于模板方法模式的,若是须要自定义同步器通常的方式是这样(模板方法模式很经典的一个应用):
这和咱们以往经过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用,下面简单的给你们介绍一下模板方法模式,模板方法模式是一个很容易理解的设计模式之一。
模板方法模式是基于”继承“的,主要是为了在不改变模板结构的前提下在子类中从新定义模板中的内容以实现复用代码。举个很简单的例子假如咱们要去一个地方的步骤是:购票buyTicket()
->安检securityCheck()
->乘坐某某工具回家ride()
->到达目的地arrive()
。咱们可能乘坐不一样的交通工具回家好比飞机或者火车,因此除了ride()
方法,其余方法的实现几乎相同。咱们能够定义一个包含了这些方法的抽象类,而后用户根据本身的须要继承该抽象类而后修改ride()
方法。
AQS使用了模板方法模式,自定义同步器时须要重写下面几个AQS提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才须要去实现它。 tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认状况下,每一个方法都抛出 UnsupportedOperationException
。 这些方法的实现必须是内部线程安全的,而且一般应该简短而不是阻塞。AQS类中的其余方法都是final ,因此没法被其余类使用,只有这几个方法能够被其余类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其余线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。固然,释放锁以前,A线程本身是能够重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每一个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到全部子线程都执行完后(即state=0),会unpark()主调用线程,而后主调用线程就会从await()函数返回,继续后余动做。
通常来讲,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种便可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
推荐两篇 AQS 原理和相关源码分析的文章:
synchronized 和 ReentrantLock 都是一次只容许一个线程访问某个资源,Semaphore(信号量)能够指定多个线程同时访问某个资源。示例代码以下:
/** * * @author Snailclimb * @date 2018年9月30日 * @Description: 须要一次性拿一个许可的状况 */ public class SemaphoreExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一个具备固定线程数量的线程池对象(若是这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能容许执行的线程数量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { semaphore.acquire();// 获取一个许可,因此可运行线程数量为20/1=20 test(threadnum); semaphore.release();// 释放一个许可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模拟请求的耗时操做 } }
执行 acquire
方法阻塞,直到有一个许可证能够得到而后拿走一个许可证;每一个 release
方法增长一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并无实际的许可证这个对象,Semaphore只是维持了一个可得到许可证的数量。 Semaphore常常用于限制获取某种资源的线程数量。
固然一次也能够一次拿取和释放多个许可,不过通常没有必要这样作:
semaphore.acquire(5);// 获取5个许可,因此可运行线程数量为20/5=4 test(threadnum); semaphore.release(5);// 获取5个许可,因此可运行线程数量为20/5=4
除了 acquire
方法以外,另外一个比较经常使用的与之对应的方法是tryAcquire
方法,该方法若是获取不到许可就当即返回false。
Semaphore 有两种模式,公平模式和非公平模式。
Semaphore 对应的两个构造方法以下:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
这两个构造方法,都必须提供许可的数量,第二个构造方法能够指定是公平模式仍是非公平模式,默认非公平模式。
因为篇幅问题,若是对 Semaphore 源码感兴趣的朋友能够看下面这篇文章:
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具一般用来控制线程等待,它可让某一个线程等待直到倒计时结束,再开始执行。
①某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown()
,当计数器的值变为0时,在CountDownLatch上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程须要等待多个组件加载完毕,以后再继续执行。
②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。相似于赛跑,将多个线程放到起点,等待发令枪响,而后同时开跑。作法是初始化一个共享的 CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先 coundownlatch.await()
,当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: CountDownLatch 使用方法示例 */ public class CountDownLatchExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一个具备固定线程数量的线程池对象(若是这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { test(threadnum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { countDownLatch.countDown();// 表示一个请求已经被完成 } }); } countDownLatch.await(); threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模拟请求的耗时操做 } }
上面的代码中,咱们定义了请求的数量为550,当这550个请求被处理完成以后,才会执行System.out.println("finish");
。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,以后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
CyclicBarrier 和 CountDownLatch 很是相似,它也能够实现线程间的技术等待,可是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 相似。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每一个线程调用await
方法告诉 CyclicBarrier 我已经到达了屏障,而后当前线程被阻塞。
CyclicBarrier 能够用于多线程计算数据,最后合并计算结果的应用场景。好比咱们用一个Excel保存了用户全部银行流水,每一个Sheet保存一个账户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
示例1:
/** * * @author Snailclimb * @date 2018年10月1日 * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 */ public class CyclicBarrierExample2 { // 请求的数量 private static final int threadCount = 550; // 须要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 建立线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); } }
运行结果,以下:
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready threadnum:4is finish threadnum:0is finish threadnum:1is finish threadnum:2is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready threadnum:9is finish threadnum:5is finish threadnum:8is finish threadnum:7is finish threadnum:6is finish ......
能够看到当线程数量也就是请求数量达到咱们定义的 5 个的时候, await
方法以后的方法才被执行。
另外,CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行barrierAction
,方便处理更复杂的业务场景。示例代码以下:
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable */ public class CyclicBarrierExample3 { // 请求的数量 private static final int threadCount = 550; // 须要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("------当线程数达到以后,优先执行------"); }); public static void main(String[] args) throws InterruptedException { // 建立线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); cyclicBarrier.await(); System.out.println("threadnum:" + threadnum + "is finish"); } }
运行结果,以下:
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready ------当线程数达到以后,优先执行------ threadnum:4is finish threadnum:0is finish threadnum:2is finish threadnum:1is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready ------当线程数达到以后,优先执行------ threadnum:9is finish threadnum:5is finish threadnum:6is finish threadnum:8is finish threadnum:7is finish ......
CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,能够屡次使用。可是我不那么认为它们之间的区别仅仅就是这么简单的一点。咱们来从jdk做者设计的目的来看,javadoc是这么描述它们的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其余多个线程完成某件事情以后才能执行;)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一块儿执行。)
对于CountDownLatch来讲,重点是“一个线程(多个线程)等待”,而其余的N个线程在完成“某件事情”以后,能够终止,也能够等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,全部的线程都必须等待。
CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,须要全部线程都到达,阀门才能打开,而后继续执行。
CyclicBarrier和CountDownLatch的区别这部份内容参考了以下两篇文章:
ReentrantLock 和 synchronized 的区别在上面已经讲过了这里就很少作讲解。另外,须要注意的是:读写锁 ReentrantReadWriteLock 能够保证多个线程能够同时读,因此在读操做远大于写操做的时候,读写锁就很是有用了。
因为篇幅问题,关于 ReentrantLock 和 ReentrantReadWriteLock 详细内容能够查看个人这篇原创文章。