AQS 是用来构建锁和同步工具的基本框架。本文主要基于 AQS 做者 Doug Lea 的论文 The java.util.concurrent Synchronizer Framework 和 JDK 1.8 的文档。java
这篇文章也同时发布在个人博客中。node
不过英文好的话,仍是直接看论文吧。编程
同步(维持变量在各个线程间状态的一致性)至少须要两种操做:设计模式
同时支持两种模式:bash
同时框架须要一些高级功能:并发
tryLock
和lock
)同步器的基本思想很直接简洁,用伪代码表示以下:框架
acquireide
while (syncronization state does not allow acquire) {
enqueue current thread if not already queued;
possiblly block current thread;
}
复制代码
release工具
update synchronization state;
if (state may premit a blocked thread acuire) {
unblock one or more queued thread;
}
复制代码
要实现这两个操做,须要三个基本模块的配合:优化
AQS 使用一个 32 位整数(int)来表明共享资源,也就是同步状态。
该整数能够表现任何状态。好比,
Semaphore
用它来表现剩余的许可数,ReentrantLock
用它来表现拥有它的线程已经请求了多少次锁;FutureTask
用它来表现任务的状态 (还没有开始、运行、完成和取消)
AQS 使用 JUC 包下LockSupport
中的pack()
和unpack()
方法来阻塞和唤醒进程。最终会调用Unsafe.park()
和Unsafe.unpack()
两个 native 方法,最终的阻塞线程和唤醒线程具体实现仍是由操做系统来实现的。
AQS 维护一个 FIFO 的队列,来管理阻塞的线程,能够实现公平性(也能够不公平),也就是同时支持公平锁和非公平锁两种模式。内部使用 CLH Lock,可是作了不少优化,好比CLH 锁不是自旋的而是阻塞的。
AQS 中的 CLH lock 和原汁原味的 CLH lock 相比,主要有两点不一样:
不使用自旋锁而是阻塞锁,调用pack()
和unpack()
实现。
节点有显式的后继节点next
,原来的 CLH lock 不须要显式的链表由于当前一个节点为释放锁时,后一个节点在一直轮询,因此它可以拿到锁。而 AQS 的锁是阻塞的,须要调用unpack(Thread)
来唤醒请求锁的线程,因此须要知道它的后继节点。
AQS 同时还设置了一个
signal bit
来避免没必要要的pack()
和unpack()
调用。在调用pack()
以前,首先设置signal bit
为 true,而后再次检查节点状态,若是还不能拿到锁,就调用pack()
阻塞线程。
这样就能够用更加详细的伪代码来描述acquire
和release
,这里只考虑exclusive mode、不可中断的、没有超时功能的状况:
acquire:
if (!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
park();
else
compareAndSet pred's signal bit to ture pred = node's effective predecessor;
}
head = node;
}
复制代码
release:
if (tryRelease(arg) && head node's signal bit is set) { compareAndSet head's signal bit to false;
unpack head's successor, if one exists }s 复制代码
实现一个同步器须要实现下面的方法:
tryAcquire()
tryRelease()
tryAcquireShared()
tryReleaseShared()
isHeldExclusively()
复制代码
以上方法不须要所有实现,根据获取的锁的种类能够选择实现不一样的方法.
tryAcquire
、 tryRelease
、isHeldExclusively
tryAcquireShared
、tryReleaseShared
、isHeldExclusively
ReentrantReadWriteLock
实现一个同步器最好的设计模式是把功能委托给一个AQS的私有内部子类,而不是直接继承 AQS 来实现(这样会破坏同步器的简洁性,调用者可能会调用 AQS 的其余方法破坏同步状态)。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/** * @author leer * Created at 4/25/19 6:24 PM * 一个不可重入的互斥锁 */
public class Mutex {
static final class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int ignore) {
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(0);
}
public void unlock() {
sync.release(0);
}
}
复制代码
非公平锁的tryAcquire
实现:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
ReentrantReadWriteLock 使用 同步状态的 16 位来存放读锁计数,另外的 16 位存放写锁计数。
ReentrantLock
相似Semaphore
使用同步状态来保存当前可用许可数量。它重写tryAcquireShared
来减小计数来模拟获取资源,若是计数小于 0 则会阻塞线程;重写tryReleaseShared
来模拟释放资源。同时它也有公平模式和非公平模式。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
复制代码
和Semaphore
相似,同步状态保存当前的计数值。countDown()
调用releaseShared()
,await()
方法调用acquireShared()
,等待计数器到零。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
Only when JDK version < 1.7
FutureTask
使用同步状态保存Future
任务的状态(initial、running、cancelled、done)。
设置和取消一个任务将调用release()
,调用Future.get()
等待结果将会调用acquire()
。
Doug Lea 的论文: The java.util.concurrent Synchronizer Framework
《Java并发编程实战》第14章