AbstractQueuedSynchronizer,简称 AQS,是一个用于构建锁和同步器的框架。
JUC 包下常见的锁工具如 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 都是基于 AQS 实现的。
本文将介绍 AQS 的数据结构及独占模式的实现原理。java
本文基于 jdk1.8.0_91
AQS 全部操做都围绕着同步资源(synchronization state)来展开,解决了资源访问的互斥和同步问题。node
AQS框架将剩下的一个问题留给用户:获取、释放资源的具体方式和结果。
这实际上是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操做细节,具体规则由子类去实现。segmentfault
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
AbstractQueuedSynchronizer 继承 AbstractOwnableSynchronizer,后者具备属性 exclusiveOwnerThread,用于记录独占模式下得到锁的线程。设计模式
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; }
AbstractQueuedSynchronizer 具备 ConditionObject 和 Node 两个内部类。数据结构
AQS 定义了一系列模板方法以下:并发
// 独占获取(资源数) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 独占释放(资源数) protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享获取(资源数) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享获取(资源数) protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否排它状态 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
Java 中经常使用的锁工具都是基于 AQS 来实现的。app
锁和资源是同一个概念,是多个线程争夺的对象。
AQS 使用 state 来表示资源/锁,经过内置的等待队列来完成获取资源/锁的排队工做。
等待队列(wait queue)是严格的 FIFO 队列,是 CLH 锁队列的变种。框架
因为 state 是共享的,使用 volatile 来保证其可见性,并提供了getState/setState/compareAndSetState
三个方法来操做 state。ide
/** * The synchronization state. */ private volatile int state;// 资源/锁 /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // 原子操做 // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 的内部实现了两个队列:同步队列和条件队列。这两种队列都使用了 Node 做为节点。工具
节点的定义主要包含三部份内容:
节点的状态
java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */ static final int PROPAGATE = -3; // 等待状态:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0 volatile int waitStatus; // 指向同步队列中的上一个节点 volatile Node prev; // 指向同步队列中的下一个节点 volatile Node next; volatile Thread thread; // 在同步队列中,nextWaiter用于标记节点的模式:独占、共享 // 在条件队列中,nextWaiter指向条件队列中的下一个节点 Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ // 节点模式是否为共享 final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
同步队列是等待获取锁的队列,是一个双向链表(prev/next),使用 head/tail 执行队列的首尾节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ // 等待队列的头节点,懒初始化。 // 注意,若是头节点存在,那么它的 waitStatus 必定不是 CANCELLED private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ // 等待队列的尾节点,懒初始化。 // 只能经过 enq 方法给等待队列添加新的节点。 private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;
在线程尝试获取资源失败后,会进入同步队列队尾,给前继节点设置一个唤醒信号后,经过 LockSupport.park(this)
让自身进入等待状态,直到被前继节点唤醒。
当线程在同步队列中等待,获取资源成功后,经过执行 setHead(node)
将自身设为头节点。
同步队列的头节点是一个 dummy node,它的 thread 为空(某些状况下能够看作是表明了当前持有锁的线程)。
/** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
AQS 不会在初始化队列的时候构建空的头节点(dummy node),而是在第一次发生争用时构造:
第一个线程获取锁,第二个线程获取锁失败入队,此时才会初始化队列,构造空节点并将 head/tail 指向该空节点。
具体见 AbstractQueuedSynchronizer#enq。
条件队列是等待条件成立的队列,是一个单向链表(nextWaiter),使用 firstWaiter/lastWaiter 指向队列的首尾节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
/** First node of condition queue. */ private transient Node firstWaiter; // 条件队列的头节点 /** Last node of condition queue. */ private transient Node lastWaiter; // 条件队列的尾节点
当线程获取锁成功以后,执行 Conition.await(),释放锁并进入条件队列中等待,直到其余线程执行 Conition.signal 唤醒当前线程。
当前线程被唤醒后,从条件队列转移到同步队列,从新等待获取锁。
独占模式下,只要有一个线程占有锁,其余线程试图获取该锁将没法取得成功。
独占模式下获取锁/资源,无视中断,Lock#lock的内部实现
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
尝试获取资源,成功返回true。具体资源获取方式交由自定义同步器实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
获取资源/锁失败后,将当前线程封装为新的节点,设置节点的模式(独占、共享),加入同步队列的尾部,返回该新节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared // 独占模式、共享模式 * @return the new node */ // 从队列尾部入队 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 设置新的尾节点 pred.next = node; return node; } } enq(node); // tail为空,入队 return node; // 返回当前的新节点 }
从同步队列的尾部入队,若是队列不存在则进行初始化。
java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { // 从同步队列的尾部入队 for (;;) { Node t = tail; if (t == null) { // Must initialize // 队列为空,则建立一个空节点,做头节点 if (compareAndSetHead(new Node())) tail = head; // 初始化完成后并无返回,而是进行下一次循环 } else { node.prev = t; if (compareAndSetTail(t, node)) { // 队列不为空,则当前节点做为新的tail // CAS失败,可能会出现尾分叉的现象,由下一次循环消除分叉 t.next = node; // 因为不是原子操做,入队操做先设置prev指针,再设置next指针,会致使并发状况下没法经过next遍历到尾节点 return t; // 返回当前节点的上一个节点(旧的尾节点) } } } }
注意:
在同步队列自旋、等待获取资源直到成功,返回等待期间的中断状态。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 上一个节点若是是头结点,说明当前节点的线程能够尝试获取锁资源 // 获取锁成功,当前节点做为新的头节点,而且清理掉当前节点中的线程信息(也就是说头节点是个dummy node) // 这里不会发生争用,不须要CAS setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 上一个节点不是头节点,或者当前节点的线程获取锁失败,须要判断是否进入阻塞: // 1. 不能进入阻塞,则重试获取锁。2. 进入阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 阻塞当前线程,当从阻塞中被唤醒时,检测当前线程是否已中断,并清除中断状态。接着继续重试获取锁。 interrupted = true; // 标记当前线程已中断(若是线程在阻塞时被中断唤醒,会重试获取锁直到成功以后,再响应中断) } } finally { if (failed) // 自旋获取锁和阻塞过程当中发生异常 cancelAcquire(node); // 取消获取锁 } }
在 acquireQueued 方法中,线程在自旋中主要进行两个判断:
具体代码流程:
当前节点获取锁失败以后,经过校验上一个节点的等待状态,判断当前节点可否进入阻塞。
返回 true,可进入阻塞;返回 false,不可进入阻塞,需重试获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release // 当前节点已经给它的上一个节点设置了唤醒信号 * to signal it, so it can safely park. // 当前节点能够进入阻塞 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and // 上一个节点状态大于 0,说明是已取消状态 CANCELLED,不会通知当前节点 * indicate retry. // 则一直往前找到一个等待状态的节点,并排在它的后边 */ // 当前节点不能进入阻塞,需重试获取锁 do { node.prev = pred = pred.prev; // pred = pred.prev; node.prev = pred; // 跳过上一个节点,直到找到 waitStatus > 0 的节点 } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we // 上一个节点状态等于 0 或 PROPAGATE,说明正在等待获取锁/资源 * need a signal, but don't park yet. Caller will need to // 此时须要给上一个节点设置唤醒信号SIGNAL,但不直接阻塞 * retry to make sure it cannot acquire before parking. // 由于在阻塞前调用者须要重试来确认它确实不能获取资源 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 经过 CAS 将上一个节点的状态改成 SIGNAL } return false; }
当前节点可以进入阻塞的条件是:具备其余线程来唤醒它。
经过设置上一个节点状态为 SIGNAL,以确保上一个节点在释放锁以后,可以唤醒当前节点。
分为三种状况:
进入阻塞,阻塞结束后,检查中断状态。
java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
线程在 acquireQueued 中自旋尝试获取锁的过程当中,若是发生异常,会在 finally 代码块中执行 cancelAcquire,终止获取锁。
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // 取消获取锁 // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors // 跳过已取消的前继节点,为当前节点找出一个有效的前继节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // 写操做具备可见性(volatile),所以这里无需使用 CAS // After this atomic step, other Nodes can skip past us. // 把当前节点设为已取消以后,其余节点寻找有效前继节点时会跳过当前节点 // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { // 若是是尾节点,则出队 compareAndSetNext(pred, predNext, null); } else { // 进入这里,说明不是尾节点,或者是尾节点但出队失败,须要处理后继节点 // If successor needs signal, try to set pred's next-link // 若是后继节点须要获得通知,则尝试给它找一个新的前继节点 // so it will get one. Otherwise wake it up to propagate. // 不然把后继节点唤醒 int ws; if (pred != head && // 前继节点不是头节点 ((ws = pred.waitStatus) == Node.SIGNAL || // 前继节点的状态为SIGNAL 或者 前继节点的状态为未取消且尝试设置为SIGNAL成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) // 后继节点存在且未取消 compareAndSetNext(pred, predNext, next); // 给后继节点设置一个新的前继节点(即前面找的有效节点),当前节点出队 } else { unparkSuccessor(node); // 若是存在后继节点,这里说明没法给后继节点找到新的前继节点(可能前继节点是head,或者前继节点失效了),直接唤醒该后继节点 } node.next = node; // help GC } }
节点 node 取消获取锁,说明当前节点 node 状态变为已取消,成为一个无效节点。
须要考虑如何处理节点 node 的后继节点:
唤醒当前节点的后继节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒当前节点的后继节点 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 若是当前节点的状态为已取消,则不变;若是小于0(有可能后继节点须要当前节点来唤醒),则清零。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // CAS失败也无所谓(说明后继节点的线程先一步修改了当前节点的状态),由于接下来会手动唤醒后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 后继节点为空,或已取消,则从tail开始向前遍历有效节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; // // 注意! 这里找到了以后并无return, 而是继续向前找 } if (s != null) LockSupport.unpark(s.thread); // 唤醒后继节点(或者是队列中距离head节点最近的有效节点)的线程 }
一般状况下, 要唤醒的节点就是本身的后继节点。若是后继节点存在且也在等待锁, 那就直接唤醒它。
可是有可能存在 后继节点取消等待锁 的状况,此时从尾节点开始向前找起, 直到找到距离 head 节点最近的未取消的节点,对它进行唤醒。
为何不从当前节点向后遍历有效节点呢?
对比 acquire,二者对获取锁过程当中发生中断的处理不一样。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // 线程在阻塞等待锁的过程当中,被中断唤醒,则放弃等待锁,直接抛出异常 } } finally { if (failed) cancelAcquire(node); } }
独占模式下释放锁/资源,是 Lock#unlock 的内部实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { // 释放锁资源 Node h = head; if (h != null && h.waitStatus != 0) // head.waitStatus == 0,说明head节点后没有须要唤醒的节点 unparkSuccessor(h); // 唤醒head的后继节点 return true; } return false; }
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
头节点 h 的状态:
相关阅读:
阅读 JDK 源码:AQS 中的独占模式
阅读 JDK 源码:AQS 中的共享模式
阅读 JDK 源码:AQS 对 Condition 的实现
做者:Sumkor