AQS(AbstractQueuedSynchronizer)队列同步器,是JUC中很是重要的一个组件,基于它能够简单高效地构建一些通用的锁和同步器,如ReentrantLock、Semaphore等(本文学习内容基于JDK1.8),本文主要关注AQS的源码实现及基于AQS实现的一些经常使用的同步组件html
经过使用JUC中的同步组件,能够比较简洁地进行并发编程,而在不少同步组件的实现中都出现了Sync extends AbstractQueuedSynchronizer
的身影,经过对AQS的一些方法的重写,实现了相应的组件的功能。AQS是实现锁的关键,其中锁是面向锁的使用者的,定义了锁的使用方式,而AQS是面向锁的实现者的,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操做。java
AQS采用了模板方法设计模式,支持经过子类重写相应的方法实现不一样的同步器。在AQS中,有一个state变量,表示同步状态(这里的同步状态就能够看做是一种资源,对同步状态的获取能够看做是对同步资源的竞争),AQS提供了多种获取同步状态的方式,包括独占式获取、共享式获取以及超时获取等,下面会进行具体的介绍。node
下面将结合源码从模板方法、同步状态管理、CLH锁队列、独占式获取方式、共享式获取方式、超时获取方式等方面分析AQS的原理及实现git
能够经过子类重写的方法列表以下github
方法名称 | 用途 |
---|---|
tryAcquire(int arg) | 主要用于实现独占式获取同步状态,实现该方法须要查询当前状态是否符合预期,而后进行相应的状态更新实现控制(获取成功返回true,不然返回false,成功一般是能够更新同步状态,失败则是不符合更新同步状态的条件),其中arg表示须要获取的同步状态数 |
tryRelease(int arg) | 主要用于实现独占式释放同步状态,同时更新同步状态(一般在同步状态state更新为0才会返回true,表示已经完全释放同步资源),其中arg表示须要释放的同步状态数 |
tryAcquireShared(int arg) | 主要用于实现共享式获取同步状态,同时更新同步状态 |
tryReleaseShared(int arg) | 主要用于实现共享式释放同步状态,同时更新同步状态 |
isHeldExclusively() | 通常用于判断同步器是否被当前线程独占 |
对线程进行加锁在AQS中体现为对同步状态的操做,经过的同步状态地管理,能够实现不一样的同步任务,同步状态state
是AQS很关键的一个域面试
// 由于state是volatile的,因此get、set方法均为原子操做,而compareAndSetState方法
// 使用了Unsafe类的CAS操做,因此也是原子的
// 同步状态
private volatile int state;
// 同步状态的操做包括
// 获取同步状态
protected final int getState() { return state;}
// 设置同步状态
protected final void setState(int newState) { state = newState;}
// CAS操做更新同步状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
CLH(Craig, Landin, and Hagersten)锁,是自旋锁的一种。AQS中使用了CLH锁的一个变种,实现了一个双向队列,并使用其实现阻塞的功能,经过将请求共享资源的线程封装为队列中的一个结点实现锁的分配。编程
双向队列的头结点记录工做状态下的线程,后继结点若获取不了同步状态则会进入阻塞状态,新的结点会从队尾加入队列,竞争同步状态设计模式
// 队列的数据结构以下
// 结点的数据结构
static final class Node {
// 表示该节点等待模式为共享式,一般记录于nextWaiter,
// 经过判断nextWaiter的值能够判断当前结点是否处于共享模式
static final Node SHARED = new Node();
// 表示节点处于独占式模式,与SHARED相对
static final Node EXCLUSIVE = null;
// waitStatus的不一样状态,具体内容见下文的表格
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 记录前置结点
volatile Node prev;
// 记录后置结点
volatile Node next;
// 记录当前的线程
volatile Thread thread;
// 用于记录共享模式(SHARED), 也能够用来记录CONDITION队列(见扩展分析)
Node nextWaiter;
// 经过nextWaiter的记录值判断当前结点的模式是否为共享模式
final boolean isShared() { return nextWaiter == SHARED;}
// 获取当前结点的前置结点
final Node predecessor() throws NullPointerException { ... }
// 用于初始化时建立head结点或者建立SHARED结点
Node() {}
// 在addWaiter方法中使用,用于建立一个新的结点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 在CONDITION队列中使用该构造函数新建结点
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 记录头结点
private transient volatile Node head;
// 记录尾结点
private transient volatile Node tail;
复制代码
Node状态表(waitStatus,初始化时默认为0)数据结构
状态名称 | 状态值 | 状态描述 |
---|---|---|
CANCELLED | 1 | 说明当前结点(即相应的线程)是由于超时或者中断取消的,进入该状态后将没法恢复 |
SIGNAL | -1 | 说明当前结点的后继结点是(或者将要)由park致使阻塞的,当结点被释放或者取消时,须要经过unpark唤醒后继结点(表现为unparkSuccessor()方法) |
CONDITION | -2 | 该状态是用于condition队列结点的,代表结点在等待队列中,结点线程等待在Condition上,当其余线程对Condition调用了signal()方法时,会将其加入到同步队列中去,关于这一部分的内容会在扩展中说起。 |
PROPAGATE | -3 | 说明下一次共享式同步状态的获取将会无条件地向后继结点传播 |
下图展现该队列的基本结构并发
独占式(EXCLUSIVE)获取需重写tryAcquire
、tryRelease
方法,并访问acquire
、release
方法实现相应的功能。
acquire的流程图以下:
上述流程图比较复杂,这里简单概述一下其中的过程
主要代码以下:
// 这里不去看tryAcquire、tryRelease方法的具体实现,只知道它们的做用分别为尝试获取同步状态、
// 尝试释放同步状态
public final void acquire(int arg) {
// 若是线程直接获取成功,或者再尝试获取成功后都是直接工做,
// 若是是从阻塞状态中唤醒开始工做的线程,将当前的线程中断
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 包装线程,新建结点并加入到同步队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 尝试入队, 成功返回
if (pred != null) {
node.prev = pred;
// CAS操做设置队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 经过CAS操做自旋完成node入队操做
enq(node);
return node;
}
// 在同步队列中等待获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 检查是否符合开始工做的条件
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 获取不到同步状态,将前置结点标为SIGNAL状态而且经过park操做将node包装的线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 若是获取失败,将node标记为CANCELLED
if (failed)
cancelAcquire(node);
}
}
复制代码
release流程图以下
release的过程比较简单,主要就是经过tryRelease更新同步状态,而后若是须要,唤醒后置结点中被阻塞的线程
主要代码以下
// release
public final boolean release(int arg) {
// 首先尝试释放并更新同步状态
if (tryRelease(arg)) {
Node h = head;
// 检查是否须要唤醒后置结点
if (h != null && h.waitStatus != 0)
// 唤醒后置结点
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后置结点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 经过CAS操做将waitStatus更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 检查后置结点,若为空或者状态为CANCELLED,找到后置非CANCELLED结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后置结点
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
共享式(SHARED)获取需重写tryAcquireShared
、tryReleaseShared
方法,并访问acquireShared
、releaseShared
方法实现相应的功能。与独占式相对,共享式支持多个线程同时获取到同步状态并进行工做
acquireShared
acquireShared过程和acquire很是类似,流程大体相同,下面简单归纳一下
//
public final void acquireShared(int arg) {
// 尝试共享式获取同步状态,若是成功获取则能够继续执行,不然执行doAcquireShared
if (tryAcquireShared(arg) < 0)
// 以共享式不停得尝试获取同步状态
doAcquireShared(arg);
}
// Acquires in shared uninterruptible mode.
private void doAcquireShared(int arg) {
// 向同步队列中新增一个共享式的结点
final Node node = addWaiter(Node.SHARED);
// 标记获取失败状态
boolean failed = true;
try {
// 标记中断状态(若在该过程当中被中断是不会响应的,须要手动中断)
boolean interrupted = false;
// 自旋
for (;;) {
// 获取前置结点
final Node p = node.predecessor();
// 若前置结点为头结点
if (p == head) {
// 尝试获取同步状态
int r = tryAcquireShared(arg);
// 若获取到同步状态。
if (r >= 0) {
// 此时,当前结点存储的线程恢复执行,须要将当前结点设置为头结点而且向后传播,
// 通知符合唤醒条件的结点一块儿恢复执行
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 须要中断,中断当前线程
if (interrupted)
selfInterrupt();
// 获取成功
failed = false;
return;
}
}
// 获取同步状态失败,须要进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取失败,CANCELL node
if (failed)
cancelAcquire(node);
}
}
// 将node设置为同步队列的头结点,而且向后通知当前结点的后置结点,完成传播
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 向后传播
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if(s == null || s.isShared())
doReleaseShared();
}
}
复制代码
releasShared
releaseShared在尝试释放同步状态成功后,会唤醒后置结点,而且保证传播性
public final boolean releaseShared(int arg) {
// 尝试释放同步状态
if (tryReleaseShared(arg)) {
// 成功后唤醒后置结点
doReleaseShared();
return true;
}
return false;
}
// 唤醒后置结点
private void doReleaseShared() {
// 循环的目的是为了防止新结点在该过程当中进入同步队列产生的影响,同时要保证CAS操做的完成
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
复制代码
超时获取使经过AQS实现的锁支持超时获取锁,这是synchronized关键字所不具有的,关于其具体的实现,和上述实现方式类似,只是在独占式、共享式获取的基础上增长了时间的约束,同时经过parkNanos()方法为阻塞定时,这里再也不过多展开。
下面列举几个经常使用的并发组件
ReentrantLock,重入锁。经过AQS独占式实现加锁、解锁操做,支持同一线程重复获取锁。主要操做为lock,unlock,其实现分别依赖acquire和release
private final Sync sync;
// 继承AQS,重写相应方法
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { ... }
protected final boolean tryRelease(int releases) { ... }
// ...略
}
static final class NonfairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
static final class FairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
复制代码
相关总结
CountDownLatch,一种同步工具,可使一个或多个线程一直等待,直到指定数量线程所有执行完毕后才再执行。经过AQS共享式实现。主要操做为await(让当前线程等待,调用了AQS的acquireSharedInterruptibly方法,能够简单将其看成acquireShared方法,实现基本相同),countDown(执行同步状态释放的操做),其实大致的思路就是,初始化CountDownLatch必定的同步状态数,执行await操做的线程需等待同步状态数彻底释放(为0)时才能够执行,而须要先完成的任务在完成后都经过countDown释放必定的同步状态数
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 检查同步状态数是否已经为0,不为0则同步状态获取失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放必定的同步状态数
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
复制代码
ReentrantReadWriteLock,可重入的读写锁,同时使用了AQS的独占式和共享式,当进行写操做时,锁由写线程独占,其余写线程和读线程阻塞。当进行读操做时,写线程阻塞,全部读线程能够共享锁。读写锁的实现相对复杂,这里再也不贴过多的代码,简单归纳一下其实现的方式:
在synchronized加锁的时候,能够经过Object类的方法的wait()、notify()方法实现等待通知,那么在Lock锁的过程当中,也存在相似的操做,即Condition接口,该接口提供了await()、signal()方法,具备相同的功能。
在AQS中,有一个类ConditionObject,实现了Condition接口。它一样使用了Node的数据结构,构成了一个队列(FIFO),与同步队列区别,能够叫它等待队列。获取Condition须要经过Lock接口的newCondition方法,这意味着一个Lock能够有多个等待队列,而Object监视器模型提供的一个对象仅有一个等待队列
// Condition的数据结构
static final class Node {
// next 指针
Node nextWaiter;
// ...
}
public class ConditionObject implements Condition, java.io.Serializable {
// head
private transient Node firstWaiter;
// tail
private transient Node lastWaiter;
// ...
}
复制代码
下面具体来看await和signal操做
await()
// 涉及中断的操做,暂时忽略
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 向等待队列的队尾新建一个CONDITION结点
Node node = addConditionWaiter();
// 由于要进入等待状态,因此须要释放同步状态(即释放锁),若是失败,该结点会被CANCELLED
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判读该结点是否在同步队列上,若是不在就经过park操做将其阻塞,进入等待状态
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 从等待状态恢复,进入同步队列竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 向等待队列的队尾新建一个CONDITION结点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若是最后一个结点的waitStatus并不是CONDITION,说明该结点被CANCELLED了,须要
// 从队列中清除掉
if (t != null && t.waitStatus != Node.CONDITION) {
// 将CANCELLED结点从等待队列中清除出去
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建CONDITION结点而且将其加入队尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
复制代码
关于await的操做
signal()
//
public final void signal() {
// 校验
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 唤醒等待队列的头结点
if (first != null)
doSignal(first);
}
// 执行唤醒操做
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 唤醒结点而且将其加入同步队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将唤醒的结点加入到同步队列中竞争同步状态,恢复执行
final boolean transferForSignal(Node node) {
// 将node的状态从CONDITION恢复到默认状态,该CAS操做由外层doSignal的循环保证成功操做
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将node加入到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
// 若是前置结点已经被取消或者将前置结点设置为SIGNAL失败,就经过unpark唤醒node包装的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
复制代码
关于signal的操做
将等待队列的头结点唤醒,从等待队列中移除,并将其加入到同步队列中竞争同步状态,恢复执行
还有一些操做,如signalAll()则是将等待队列中的所有结点从等待队列中移除并加入到同步队列中竞争同步状态
StampedLock是Java8中新增的一个锁,是对读写锁的改进。读写锁虽然分离了读与写的功能,可是它在处理读与写的并发上,采起的是一种悲观的策略,这就致使了,当读取的状况不少而写入的状况不多时,写入线程可能迟迟没法竞争到锁并被阻塞,遭遇饥饿问题。
StampedLock提供了3种控制锁的模式,写、读、乐观读。在加锁时能够获取一个stamp做为校验的凭证,在释放锁的时候须要校验这个凭证,若是凭证失效的话(好比在读的过程当中,写线程产生了修改),就须要从新获取凭证,而且从新获取数据。这很适合在写入操做较少,读取操做较多的情景,能够乐观地认为写入操做不会发生在读取数据的过程当中,而是在读取线程解锁前进行凭证的校验,在必要的状况下,切换成悲观读锁,完成数据的获取。这样能够大幅度提升程序的吞吐量。
StampedLock在实现上没有借助AQS,可是其不少设计的思想、方法都是参照AQS并进行了一些修改完成的。在StampedLock内部一样维护了一个CLH队列完成相关的功能。
与ReentrantReadWriteLock相比,StamptedLock的API调用相对复杂一些,因此在不少时候仍是会用ReentrantReadWriteLock。
更多的关于StampedLock的内容后续再补充。
若有问题,还请指出