public class CopyOnWriteArrayList<E> extends Object implements List<E>, RandomAccess, Cloneable, Serializable
在不少应用场景中,读操做可能会远远大于写操做。因为读操做根本不会修改原有的数据,所以对于每次读取都进行加锁实际上是一种资源浪费。咱们应该容许多个线程同时访问List的内部数据,毕竟读取操做是安全的。java
这和咱们以前在多线程章节讲过 ReentrantReadWriteLock
读写锁的思想很是相似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK中提供了 CopyOnWriteArrayList
类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList
读取是彻底不用加锁的,而且更厉害的是:写入也不会阻塞读取操做。只有写入和写入之间须要进行同步等待。这样一来,读操做的性能就会大幅度提高。那它是怎么作的呢?面试
CopyOnWriteArrayList
类的全部可变操做(add,set等等)都是经过建立底层数组的新副原本实现的。当 List 须要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完以后,再将修改完的副本替换原来的数据,这样就能够保证写操做不会影响读操做了。算法
从 CopyOnWriteArrayList
的名字就能看出CopyOnWriteArrayList
是知足CopyOnWrite
的ArrayList,所谓CopyOnWrite
也就是说:在计算机,若是你想要对一块内存进行修改时,咱们不在原有内存块中进行写操做,而是将内存拷贝一份,在新的内存中进行写操做,写完以后呢,就将指向原来内存指针指向新的内存,原来的内存就能够被回收掉了。数组
为了引出ConcurrentSkipListMap,先带着你们简单理解一下跳表。安全
对于一个单链表,即便链表是有序的,若是咱们想要在其中查找某个数据,也只能从头至尾遍历链表,这样效率天然就会很低,跳表就不同了。跳表是一种能够用来快速查找的数据结构,有点相似于平衡树。它们均可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除每每极可能致使平衡树进行一次全局的调整。而对跳表的插入和删除只须要对整个数据结构的局部进行操做便可。这样带来的好处是:在高并发的状况下,你会须要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只须要部分锁便可。这样,在高并发环境下,你就能够拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 因此在并发数据结构中,JDK 使用跳表来实现一个 Map。数据结构
跳表的本质是同时维护了多个链表,而且链表是分层的,多线程
最低层的链表维护了跳表内全部的元素,每上面一层链表都是下面一层的子集。并发
跳表内的全部链表的元素都是排序的。查找时,能够从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程当中,搜索是跳跃式的。如上图所示,在跳表中查找元素18。框架
查找18 的时候原来须要遍历 18 次,如今只须要 7 次便可。针对链表长度比较大的时候,构建索引查找效率的提高就会很是明显。dom
从上面很容易看出,跳表是一种利用空间换时间的算法。
使用跳表实现Map 和使用哈希算法实现Map的另一个不一样之处是:哈希并不会保存元素的顺序,而跳表内全部的元素都是排序的。所以在对跳表进行遍历时,你会获得一个有序的结果。因此,若是你的应用须要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是ConcurrentSkipListMap。
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用普遍的大量的同步器,好比咱们提到的ReentrantLock,Semaphore,其余的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。固然,咱们本身也能利用AQS很是轻松容易地构造出符合咱们本身需求的同步器。
AQS核心思想是,若是被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工做线程,而且将共享资源设置为锁定状态。若是被请求的共享资源被占用,那么就须要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
synchronized 和 ReentrantLock 都是一次只容许一个线程访问某个资源,Semaphore(信号量)能够指定多个线程同时访问某个资源。 示例代码以下:
/** * * @author Snailclimb * @date 2018年9月30日 * @Description: 须要一次性拿一个许可的状况 */ public class SemaphoreExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一个具备固定线程数量的线程池对象(若是这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能容许执行的线程数量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { semaphore.acquire();// 获取一个许可,因此可运行线程数量为20/1=20 test(threadnum); semaphore.release();// 释放一个许可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模拟请求的耗时操做 } }
除了 acquire
方法以外,另外一个比较经常使用的与之对应的方法是tryAcquire
方法,该方法若是获取不到许可就当即返回false。
CountDownLatch是一个同步工具类,它容许一个或多个线程一直等待,直到其余线程的操做执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,因此必定要确保你很好的理解了它。
①某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown()
,当计数器的值变为0时,在CountDownLatch上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程须要等待多个组件加载完毕,以后再继续执行。
②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。相似于赛跑,将多个线程放到起点,等待发令枪响,而后同时开跑。作法是初始化一个共享的 CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先 coundownlatch.await()
,当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
③死锁检测:一个很是方便的使用场景是,你可使用n个线程访问共享资源,在每次测试阶段的线程数目是不一样的,并尝试产生死锁。
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: CountDownLatch 使用方法示例 */ public class CountDownLatchExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一个具备固定线程数量的线程池对象(若是这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { test(threadnum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { countDownLatch.countDown();// 表示一个请求已经被完成 } }); } countDownLatch.await(); threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模拟请求的耗时操做 } }
上面的代码中,咱们定义了请求的数量为550,当这550个请求被处理完成以后,才会执行System.out.println("finish");
。
与CountDownLatch的第一次交互是主线程等待其余线程。主线程必须在启动其余线程后当即调用CountDownLatch.await()方法。这样主线程的操做就会在这个方法上阻塞,直到其余线程完成各自的任务。
其余N个线程必须引用闭锁对象,由于他们须要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是经过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。因此当N个线程都调 用了这个方法,count的值等于0,而后主线程就能经过await()方法,恢复执行本身的任务。
CyclicBarrier 和 CountDownLatch 很是相似,它也能够实现线程间的技术等待,可是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 相似。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每一个线程调用await
方法告诉 CyclicBarrier 我已经到达了屏障,而后当前线程被阻塞。
CyclicBarrier 能够用于多线程计算数据,最后合并计算结果的应用场景。好比咱们用一个Excel保存了用户全部银行流水,每一个Sheet保存一个账户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
/** * * @author Snailclimb * @date 2018年10月1日 * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 */ public class CyclicBarrierExample2 { // 请求的数量 private static final int threadCount = 550; // 须要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 建立线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { /**等待60秒,保证子线程彻底执行结束*/ cyclicBarrier.await(60, TimeUnit.SECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); } }
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
能够看到当线程数量也就是请求数量达到咱们定义的 5 个的时候, await
方法以后的方法才被执行。
另外,CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行barrierAction
,方便处理更复杂的业务场景。示例代码以下:
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable */ public class CyclicBarrierExample3 { // 请求的数量 private static final int threadCount = 550; // 须要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("------当线程数达到以后,优先执行------"); }); public static void main(String[] args) throws InterruptedException { // 建立线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); cyclicBarrier.await(); System.out.println("threadnum:" + threadnum + "is finish"); } }
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready ------当线程数达到以后,优先执行------ threadnum:4is finish threadnum:0is finish threadnum:2is finish threadnum:1is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready ------当线程数达到以后,优先执行------ threadnum:9is finish threadnum:5is finish threadnum:6is finish threadnum:8is finish threadnum:7is finish ......
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其余多个线程完成某件事情以后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一块儿执行。)