AQS
全称AbstractQueuedSynchronizer
.java
AQS
底层的数据结构是双向链表,是队列的一种实现,所以也能够把它看成一个队列。数组
其中Sync queue
是同步队列,其head
节点主要用于后期调度。这里的head节点就是占用资源的线程,后面的都是等待资源的线程。安全
下面的Condition queue
是一个单项链表,它不是必须的,只有当程序中须要使用到condition
的时候才会存在,而且可能会有多个condition queue
。数据结构
AQS的设计思想:并发
Node
实现FIFO
队列,可用于构建锁或者其余同步装置的基础框架。acquire
和release
)的方法操纵状态。具体实现的大体思路:框架
AQS
内部维护了一个CLH
队列来管理锁,线程会首先尝试获取锁,若是失败就将当前线程以及等待状态等信息包成一个Node
节点,加入到同步队列Sync queue
中,接着会不断循环尝试获取锁,条件是当前节点为head
的直接后继才会尝试,若是失败就会阻塞本身,直到本身被唤醒。当持有锁的线程释放锁的时候会唤醒队列中的后继线程。jvm
AQS同步组件性能
基于这些设计思路JDK提供了不少基于AQS的子类。学习
CountDownLatch
是一个同步辅助类,经过它能够完成相似于阻塞当前线程的功能,也就是一个或多个线程一直等待直到其余线程执行完成。CountDownLatch
用了一个给定的计数器来进行初始化,该计数器的操做是原子操做,即同时只能有一个线程操做该计数器,调用该类await
方法的线程会一直处于阻塞状态,直到其余线程调用countDown
方法时计数器的值变成0,每次调用countDown
时计数器的值会减1,当计数器的值为0时全部因await
方法而处于等待状态的线程就会继续执行。这种操做至多出现一次,由于这里的计数器是不能被重置的。优化
使用场景
并行计算,处理量很大时能够将运算任务拆分红多个子任务,当全部子任务都完成以后,父任务再将全部子任务都结果进行汇总。
Coding演示
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++){
final int thradNum = i;
executorService.execute(() -> {
try {
test(thradNum);
} catch (Exception e){
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
复制代码
运行结果
能够看到finish
是在全部方法都执行完后才执行的,也就是计数器减到0了才能执行。
相似于操做系统里的信号量,能够控制某个资源可被同时访问的线程数。
Coding演示
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int thradNum = i;
executorService.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(thradNum);
semaphore.release(); // 释放一个许可
} catch (Exception e){
log.error("exception", e);
}
});
}
executorService.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(100);
}
}
复制代码
运行结果是以每次3个输出的,总共输出20个。
实际开发中有时须要拿到多个许可才容许执行,下面来演示一下须要获取多个许可的状况。
semaphore.acquire(3); // 获取3个许可
test(thradNum);
semaphore.release(3); // 释放3个许可
复制代码
这样运行之后就是每次只输出1条结果,由于Semaphore
总共只有3个许可,而这里一次操做须要获取3个许可。
当线程数过多时咱们可使用尝试获取许可的方法
if (semaphore.tryAcquire()){
test(thradNum);
semaphore.release(); // 释放一个许可
}
复制代码
这种状况下若是获取不到许可就会被直接丢弃。
运行结果只输出了3条结果。
咱们来看一下tryAcquire
方法的实现
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
复制代码
能够看到传入的参数为申请的许可数,也就是获取多个许可。
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
复制代码
这里的参数为超时时间和时间单位,意味着在尝试获取许可时能够等待一段时间。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
复制代码
这里就是结合上面两种来使用的状况。
CyclicBarrier
也是一个同步辅助类,它容许一组线程相互等待直到到达某个公共的屏障点,经过它能够完成多个线程之间相互等待时,只有当每一个线程都准备就绪后才能各自继续执行后面的操做。它也是经过计数器来实现,当某个线程调用await
方法后就进入等待状态,计数器执行加一操做。当计数器的值达到了设置的初始值时等待状态的线程会被唤醒继续执行。因为CyclicBarrier
在释放等待线程后能够重用,因此能够称之为循环屏障。
注意: CyclicBarrier
的计数器能够重置。
Coding演示
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
复制代码
运行结果
在ready
状态时日志是每秒输出一条,当有5条ready
时会一次性输出5条continue
。这就是前面讲的所有线程准备就绪后同时开始执行。
在初始化CyclicBarrier
时还能够在等待线程数后指定一个runnable
,含义是当线程到达这个屏障时优先执行这里的runnable
。
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("call back is ready.");
});
复制代码
运行结果
Java中的锁主要分红两类:
ReentrantLock属于第二类,下面经过与第一类锁的对比来学习这个锁
ReentrantLock的实现是一种自旋锁,经过循环调用CAS操做来实现加锁,它性能比较好的缘由之一就是避免了线程进入内核态的阻塞状态。
@Slf4j
public class LockExample2 {
/** * 请求总数 */
public static int clientTotal = 5000;
/** * 同时并发执行线程数 */
public static int threadTotal = 200;
public static int count = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add(){
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
复制代码
执行结果始终是5000。
咱们点进ReentrantLock
来看一下它作了什么事情
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */
public ReentrantLock() {
sync = new NonfairSync();
}
复制代码
这里默认给了一个不公平的锁。
也能够经过传入true
或者false
来选择公平锁仍是不公平锁
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
下面来看tryLock
方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码
tryLock
的含义是仅在调用时锁定未被另外一个线程保持的状况下才获取锁定。第二个传入时间参数的含义是若是锁定在给定的等待时间内没有被另外一个线程保持且当前线程没有被中断,则获取这个锁定。使用这两个方法时必定要理解清楚这两个方法的含义。
下面来介绍JUC
的locks
包内另一个锁ReentrantReadWriteLock
。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
}
复制代码
能够看到它里面有两个锁:读锁和写锁。
它是在没有任何读写锁的状况下才能取得写入的锁。
它能够用于实现了悲观读取,即当执行中进行读取时,可能有另外一个进程要写入的需求,为了保持同步就须要ReentrantReadWriteLock
的读取锁定。可是若是读取不少,写入不多的状况下,使用ReentrantReadWriteLock
可能会使写入线程遭遇饥饿,即写入线程长期处于等待状态。
Coding演示
@Slf4j
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
}
复制代码
这里实现了对Map
的读写操做的同步,经过对内部方法的封装使外部在调用方法时不须要考虑同步问题。
须要注意的是ReentrantReadWriteLock
实现的是悲观读取,若是想得到写入锁时坚定不容许有任何读锁还保持着,即当全部读操做作完时才容许写操做,所以可能会形成写操做的线程饥饿。
StampedLock控制锁有三种方式,分别是:写,读,乐观读。
StampedLock的状态由版本和模式两个部分组成。锁获取方法返回的是一个数组做为票据(Stamp),用相应的锁状态来表示和控制相关的访问,输出0表示没有写锁被受权访问,在读锁上分为悲观锁和乐观锁。
Coding演示
@Slf4j
public class LockExample5 {
/** * 请求总数 */
public static int clientTotal = 5000;
/** * 同时并发执行线程数 */
public static int threadTotal = 200;
public static int count = 0;
private static StampedLock lock = new StampedLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add(){
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}
复制代码
运行结果始终为5000,线程安全。
StampLock
对吞吐量有巨大改进,特别是在读线程愈来愈多的场景下。
注意:除了sync
,其余锁都要在使用完后释放锁。
介绍了这么多种锁,咱们来总结一下各个锁使用的场景
sync
是很好的选择。ReentrantLock
适合。sync
因为是jvm
自动解锁,因此确定不会形成死锁,而其余锁可能由于使用不当形成死锁。
Written by Autu
2019.7.20