原文:慕课网高并发实战(七)- J.U.C之AQShtml
在【并发编程】【JDK源码】AQS (AbstractQueuedSynchronizer)(1/2)中简要介绍了AQS的概念和基本原理,下面继续对AQS进行分析。java
一、首先 AQS内部维护了一个CLH队列,来管理锁。
二、线程尝试获取锁,若是获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里。
三、不断从新尝试获取锁(当前结点为head的直接后继才会 尝试),若是获取失败,则会阻塞本身,直到被唤醒。
四、当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。数据库
下面几个主要同步组件:编程
CountDownLatch
Semaphore
CyclicBarrier安全ReentrantLock
Condition
FutureTask数据结构
同步阻塞类,能够完成阻塞线程的功能
多线程
程序执行须要等待某个条件完成后,才能进行后面的操做。好比父任务等待全部子任务都完成的时候,再继续往下进行。
实例一并发
@Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { // 为防止出现异常,放在finally更保险一些 countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
实例二
好比有多个线程完成一个任务,可是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。完成多少算多少。框架
@Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; // 放在这里没有用的,由于这时候仍是在主线程中阻塞,阻塞完之后才开始执行下面的await // Thread.sleep(1); exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } // 等待指定的时间 参数1:等待时间 参数2:时间单位 countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); // 并非第一时间内销毁掉全部线程,而是先让正在执行线程执行完 exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
仅能提供有限访问的资源:好比数据库的链接数最大只有20,而上层的并发数远远大于20,这时候若是不作限制,可能会因为没法获取链接而致使并发异常,这时候可使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很类似了。高并发
实例一:每次获取1个许可
@Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
实例2:一次性获取多个许可
@Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); // 获取多个许可 test(threadNum); semaphore.release(3); // 释放多个许可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
实例三:并发很高,想要超过容许的并发数以后,就抛弃
@Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try{ if (semaphore.tryAcquire()) { // 尝试获取一个许可 // 本例中只有一个三个线程能够执行到这里 test(threadNum); semaphore.release(); // 释放一个许可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
下面是Semaphore的方法列表
尝试获取获取许可的时候等一段时间
尝试获取获取许可的次数以及超时时间均可以设置
@Slf4j public class SemaphoreExample4 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
同步辅助类,容许一组线程相互等待,知道全部线程都准备就绪后,才能继续操做,当某个线程调用了await方法以后,就会进入等待状态,并将计数器-1,直到全部线程调用await方法使计数器为0,才能够继续执行,因为计数器能够重复使用,因此咱们又叫他循环屏障。
使用场景
多线程计算数据,最后合并计算结果的应用场景,好比用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,如今须要统计用户的日均银行流水,这时候咱们就能够用多线程处理每一页里的银行流水,都执行完之后,获得每个页的日均银行流水,以后经过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水。
CyclicBarrier与CountDownLatch区别
一、CyclicBarrier能够重复使用(使用reset方法),CountDownLatch只能用一次
二、CountDownLatch主要用于实现一个或n个线程须要等待其余线程完成某项操做以后,才能继续往下执行,描述的是一个或n个线程等待其余线程的关系,而CyclicBarrier是多个线程相互等待,知道知足条件之后再一块儿往下执行。描述的是多个线程相互等待的场景
实例一:能够设置等待时间
@Slf4j public class CyclicBarrierExample1 { // 1.给定一个值,说明有多少个线程同步等待 private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; // 延迟1秒,方便观察 Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); // 2.使用await方法进行等待 barrier.await(); log.info("{} continue", threadNum); } }
实例二
@Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { // 因为状态可能会改变,因此会抛出BarrierException异常,若是想继续往下执行,须要加上try-catch barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
实例三
@Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { // 当线程所有到达屏障时,优先执行这里的runable log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
Java一共分为两类锁,一类是由synchornized修饰的锁,还有一种是JUC里提供的锁,核心就是ReentrantLock
synchornized与ReentrantLock的区别对比:
对比维度 | synchornized | ReentrantLock |
---|---|---|
可重入性(进入锁的时候计数器自增1) | 可重入 | 可重入 |
锁的实现 | JVM实现,很难操做源码,获得实现 | JDK实现 |
性能 | 在引入轻量级锁后性能大大提高,建议均可以选择的时候选择synchornized | - |
功能区别 | 方便简洁,由编译器负责加锁和释放锁 | 手工操做 |
粗粒度,不灵活 | 细粒度,可灵活控制 | - |
能否指定公平所 | 不能够 | 能够 |
能否放弃锁 | 不能够 | 能够 |
ReentrantLock独有的功能
能够指定是公平锁仍是非公平锁
提供了一个Condition类,能够分组唤醒须要唤醒的线程
提供可以中断等待锁的线程的机制,lock.lockInterruptibly()
ReentrantLock实现:自旋锁,循环调用CAS操做来实现加锁,避免了使线程进入内核态的阻塞状态。想办法组织线程进入内核态的阻塞状态,是咱们分析和理解锁的关键钥匙。
基本用法
@Slf4j @ThreadSafe public class LockExample2 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
源码分析
默认使用非公平锁,能够传入true和false来使用公平所仍是非公平锁。
tryLock,能够设置等待时间,或者直接返回
在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取,读多写少的场景下可能会出现线程饥饿。
@Slf4j public class LockExample3 { private final Map<String, Data> map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Set<String> getAllKeys() { readLock.lock(); try { return map.keySet(); } finally { readLock.unlock(); } } // 在没有任何读写锁的时候才能够进行写入操做 public Data put(String key, Data value) { writeLock.lock(); try { return map.put(key, value); } finally { readLock.unlock(); } } class Data { } }
StempedLock控制锁有三种形式,分别是写,读,和乐观读,重点在乐观锁。一个StempedLock,状态是由版本和模式两个部分组成;锁获取的方法返回的是一个数字做为票据(Stempe),他用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被受权访问;在读锁上分为悲观读和乐观读;
乐观读:若是读的操做不少,写操做不多的状况下,咱们能够乐观的认为,读写同时发生的概率很小,所以不悲观的使用读取锁定很小,程序能够在查看相关的状态以后,判断有没有写操做的变动,再采起相应的措施,这一小小的改进,能够大大提高执行效率。
源码案例解释
import java.util.concurrent.locks.StampedLock; public class LockExample4 { class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看乐观读锁案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //得到一个乐观读锁 double currentX = x, currentY = y; //将两个字段读入本地局部变量 if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其余写锁发生? stamp = sl.readLock(); //若是没有,咱们再次得到一个读悲观锁 try { currentX = x; // 将两个字段读入本地局部变量 currentY = y; // 将两个字段读入本地局部变量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲观读锁案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合 long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁 if (ws != 0L) { //这是确认转为写锁是否成功 stamp = ws; //若是成功 替换票据 x = newX; //进行状态改变 y = newY; //进行状态改变 break; } else { //若是不能成功转换为写锁 sl.unlockRead(stamp); //咱们显式释放读锁 stamp = sl.writeLock(); //显式直接进行写锁 而后再经过循环再试 } } } finally { sl.unlock(stamp); //释放读锁或写锁 } } } }
简单使用
@Slf4j @ThreadSafe public class LockExample5 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { // 会返回一个stamp的值 long stamp = lock.writeLock(); try { count++; } finally { //释放的时候要释放 lock.unlock(stamp); } } }
总结关于锁的几个类:
synchronized:JVM实现,不但能够经过一些监控工具监控,并且在出现未知异常的时候JVM也会自动帮咱们释放锁
ReentrantLock、ReentrantRead/WriteLock、StempedLock 他们都是对象层面的锁定,要想保证锁必定被释放,要放到finally里面,才会更安全一些;StempedLock对性能有很大的改进,特别是在读线程愈来愈多的状况下,StempedLock有一个复杂的API。要注意使用
如何使用:
1.在只有少许竞争者的时候,synchronized是一个很好的锁的实现
2.竞争者很多,可是增加量是能够竞争的,ReentrantLock是一个很好的锁的实现(适合本身的才是最好的,不是越高级越好)
@Slf4j public class LockExample6 { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); // 从reentrantLock实例里获取了condition Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { // 线程1调用了lock方法,加入到了AQS的等待队里里面去 reentrantLock.lock(); log.info("wait signal"); // 1 等待信号 // 调用await方法后,从AQS队列里移除了,进入到了condition队列里面去,等待一个信号 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal"); // 4 获得信号 // 线程1释放锁 reentrantLock.unlock(); }).start(); new Thread(() -> { // 线程1await释放锁之后,这里就获取了锁,加入到了AQS等待队列中 reentrantLock.lock(); log.info("get lock"); // 2 获取锁 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //调用signalAll发送信号的方法,Condition节点的线程1节点元素被取出,放在了AQS等待队列里(注意并无被唤醒) condition.signalAll(); log.info("send signal ~ "); // 3 发送信号 // 线程2释放锁,这时候AQS队列中只剩下线程1,线程1开始执行 reentrantLock.unlock(); }).start(); } }