提及 JUC,咱们经常会想起其中的线程池(ExecutorService)。然而,咱们今天来看看另外一个核心模块 AQS。php
AQS 是 AbstractQueuedSynchronizer 的简称,在 JUC 中做为各类同步器的基石。举个例子,常见的 ReentrantLock 就是由它实现的。java
咱们知道,java 有一个关键字 synchronized 来给一段代码加锁,但是这是 JVM 层面的事情。那么问题来了,如何在 java 代码层面来实现模拟一个锁?node
即实现这样一个接口:算法
package java.util.concurrent.locks;
public interface Lock {
void lock();
void unlock();
}
复制代码
一个简单的想法是:让全部线程去竞争一个变量owner,确保只有一个线程成功,并设置本身为owner,其余线程陷入死循环等待。这即是所谓的自旋锁。编程
一个简单的代码实现:api
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
// 若是锁未被占用,则设置当前线程为锁的拥有者
while (!owner.compareAndSet(null, currentThread)) {
}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
// 只有锁的拥有者才能释放锁
owner.compareAndSet(currentThread, null);
}
}
复制代码
扯这个自旋锁,主要是为了引出 AQS 背后的算法 CLH锁
。安全
关于CLH锁
更多细节能够参考篇文章:多线程
CLH锁的思想,简单的说就是:一群人去ATM取钱,头一我的拿到锁,在里面用银行卡取钱,其他的人在后面排队等待;前一我的取完钱出来,唤醒下一我的进去取钱。函数
关键部分翻译成代码就是:
AQS 使用节点为 Node 的双向链表做为同步队列。拿到锁的线程能够继续执行代码,没拿到的线程就进入这个队列排队。
public abstract class AbstractQueuedSynchronizer ... {
// 队列头
private transient volatile Node head;
// 队列尾
private transient volatile Node tail;
static final class Node {
/** 共享模式,可用于实现 CountDownLatch */
static final Node SHARED = new Node();
/** 独占模式,可用于实现 ReentrantLock */
static final Node EXCLUSIVE = null;
/** 取消 */
static final int CANCELLED = 1;
/** 意味着它的后继节点的线程在排队,等待被唤醒 */
static final int SIGNAL = -1;
/** 等待在条件上(与Condition相关,暂不解释) */
static final int CONDITION = -2;
/** * 与共享模式相关,暂不解释 */
static final int PROPAGATE = -3;
// 可取值:CANCELLED, 0, SIGNAL, CONDITION, PROPAGATE
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
复制代码
这个队列大致上长这样:图片来源
条件队列是为了支持 Lock.newCondition() 这个功能,暂时不care,先跳过。
AQS 支持独占锁(Exclusive)和共享锁(Share)两种模式:
这边咱们只看独占模式,它对外提供一套 api:
简单看一眼怎么用的 (ReentrantLock 的例子):
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
复制代码
能够看到,AQS 封装了排队、阻塞、唤醒之类的操做,使得实现一个锁变的如此简洁。
获取资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
这个函数很短,其中 tryAcquire(int) 为模板方法,留给子类实现。相似 Activity.onCreate()。
根据 tryAcquire(arg) 的结果,分两种状况:
这一步把当前线程(Thread.currentThread())做为一个Node节点,加入同步队列的尾部,并标记为独占模式。
固然,加入队列这个动做,要保证线程安全。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 1处尝试更快地 enq(), 成功的话直接 return。失败的话, 在2处退化为完整版的 enq(),相对更慢些
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 1
pred.next = node;
return node;
}
}
enq(node); // 2
return node;
}
private Node enq(final Node node) {
// 神奇的死循环 + CAS
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
能够看到,这边有一个死循环 + CAS的神奇操做,这是非阻塞算法的经典操做,可自行查阅相关资料。简单的说,非阻塞算法就是在多线程的状况下,不加锁同时保证某个变量(本例中为双向链表)的线程安全,并且一般比 synchronized 的效率要高。
这个函数主要作两件事:
结合这张图来看,每次出队完须要确保 head 始终指向占用资源的线程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 这又是一个死循环 + CAS,此次CAS比较隐蔽,在 shouldParkAfterFailedAcquire()里边
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 排在队首的后面,看看能不能得到锁,成功的话本身变成队首
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 这里失败会作一些回滚操做,不分析
if (failed)
cancelAcquire(node);
}
}
复制代码
这边的 interrupted 主要是保证这样一个功能。线程在排队的时候不响应中断,直到出来之后,若是等待的过程当中被中断过,做为弥补,当即相应中断(即调用selfInterrupt())。
查看prev的waitStatus,看是否是须要阻塞。能够预见的是,通过几回死循环,所有都会变成SIGNAL状态。以后所有陷入阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 查看前驱节点的状态
if (ws == Node.SIGNAL) // SIGNAL: 能够安全的阻塞
return true;
if (ws > 0) { // CANCEL: 取消排队的节点,直接从队列中清除。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 0 or PROPAGATE: 须要变成 SIGNAL,但不能当即阻塞,须要重走外层的死循环二次确认。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
值得一提的是,阻塞和唤醒没有使用常说的 wait()/notify(),而是使用了 LockSupport.park()/unpark()。这应该是出于效率上的考虑。
释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * 唤醒next节点对应的线程,一般就是老二(直接后继)。 * 若是是null,或者是cancel状态(出现异常如线程遇到空指针挂掉了), * 那么跳过cancel节点,找到后继节点。 */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒 node.next
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
释放的逻辑比较简单。注意一点,对于 next 节点 unpark(),至关于在把 next 节点从 acquireQueued() 中的死循环中解放出来。
回到 ATM 的例子,至关于,他取完钱,轮到后一我的取钱了。这样逻辑所有都串起来了。
这样,顺着独占锁这条线,AQS 的独占模式就分析完了。其余还有用于实现闭锁的共享模式,用于实现 Condition 的条件队列就不展开了。
Java并发编程实战(chapter_4)(AQS源码分析)
《JAVA并发编程实践》