本文将主要讲述 AbstractQueuedSynchronizer
的内部结构和实现逻辑,在看本文以前最好先了解一下 CLH
队列锁,AbstractQueuedSynchronizer
就是根据 CLH
队列锁的变种实现的,由于自己 AQS
比较复杂不容易看清楚他自己的实现逻辑,因此查看 CLH
队列锁的实现,能够帮助咱们理清楚他内部的关系;关于队列锁的内容能够参考 ,CLH、MCS 队列锁简介 ;html
在 JDK 中除 synchronized
内置锁外,其余的锁和同步组件,基本能够分为:java
而 AbstractQueuedSynchronizer
即同步队列则是 Doug Lea 大神为咱们提供的底层线程调度的封装;AQS
自己是根据 CLH
队列锁实现的,这一点在注释中有详细的介绍,CLH、MCS 队列锁简介 ;node
简单来说,CLH
队列锁就是一个单项链表,想要获取锁的线程封装为节点添加到尾部,而后阻塞检查前任节点的状态 (必定要注意是前任节点,由于这样更容易实现取消、超时等功能,同时这也是选择 CLH 队列锁的缘由),而头结点则是当前已经得到锁的线程,其主要做用是通知后继节点(也就是说在没有发生竞争的状况下,是不须要头结点的,这一点后面会详细分析);编程
而对于 AQS
的结构大体能够表述为:并发
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } private transient volatile Node head; // 懒加载,只有在发生竞争的时候才会初始化; private transient volatile Node tail; // 一样懒加载; private volatile int state; // 自定义的锁状态,能够用来表示锁的个数,以实现互斥锁和共享锁; }
这里的能够直观的看到链表结构的变化,其实next链表只是至关于遍历的优化,而node节点的变化才是主要的更新;ide
static final class Node { static final Node SHARED = new Node(); // 共享模式 static final Node EXCLUSIVE = null; // 互斥模式 static final int CANCELLED = 1; // 表示线程取消获取锁 static final int SIGNAL = -1; // 表示后继节点须要被唤醒 static final int CONDITION = -2; // 表示线程位于条件队列 static final int PROPAGATE = -3; // 共享模式下节点的最终状态,确保在doReleaseShared的时候将共享状态继续传播下去 /** * 节点状态(初始为0,使用CAS原则更新) * 互斥模式:0,SIGNAL,CANCELLED * 共享模式:0,SIGNAL,CANCELLED,PROPAGATE * 条件队列:CONDITION */ volatile int waitStatus; volatile Node prev; // 前继节点 volatile Node next; // 后继节点 volatile Thread thread; // 取锁线程 Node nextWaiter; // 模式标识,取值:SHARED、EXCLUSIVE // Used by addWaiter,用于添加同队队列 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // Used by Condition,同于添加条件队列 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
根据上面的代码和注释已经能够看到 AQS
为咱们提供了两种模式,独占模式和共享模式(彼此独立能够同时使用);其中:oop
AbstractQueuedSynchronizer.state
: 表示锁的资源状态,是咱们上面所说的面向用户逻辑的部分;Node.waitStatus
: 表示节点在队列中的状态,是面向底层线程调度的部分;这两个变量必定要分清楚,在后面的代码中也很容易弄混;源码分析
AQS 的运行逻辑能够简单表述为:优化
若是你熟悉 synchronized
,应该已经发现他们的运行逻辑实际上是差很少的,都用同步队列和条件队列,值得注意的是这里的条件队列和 Condition
一一对应,可能有多个;根据上图能够将 AQS
提供的功能总结为:ui
由于独占模式和共享模式彼此独立能够同时使用,因此在入队的时候须要首先指定 Node
的类型,同时入队的时候有竞争的可能,因此须要 CAS 入队;
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // SHARED、EXCLUSIVE // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
代码中注释也说明了,此处快速尝试入队,是一种优化手段,由于就通常状况而言大多数时候是没有竞争的;失败后在循环入队;
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 此时head和tail才初始化 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
而对于出队则稍微复杂一点,独占模式下直接出队,由于没有竞争;共享模式下,则须要 CAS 设置头结点,由于可能对有多个节点同时出队,同时还须要向后传播状态,保证后面的线程能够及时得到锁;此外还可能发生中断或者异常出队,此时则须要考虑头尾的状况,保证不会影响队列的结构;具体内容将会在源码中一次讲解;
public class Mutex implements Lock { private final Sync sync = new Sync(); private static final int lock = 1; private static final int unlock = 0; @Override public void lock() { sync.acquire(lock); } @Override public boolean tryLock() { return sync.tryAcquire(lock); } @Override public void unlock() { sync.release(unlock); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively() { return getState() == lock; } @Override public boolean tryAcquire(int acquires) { if (compareAndSetState(unlock, lock)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int releases) { if (getState() == unlock) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(unlock); return true; } } }
注意代码中特地将 AbstractQueuedSynchronizer.state
取值定为lock\unlock
,主要是便于理解 state
的含义,在互斥锁中能够任意取值,固然也能够是负数,可是通常状况下令其表示为锁的资源数量(也就是0、1)和共享模式对比,比较容易理解;
对于独占模式取锁而言有一共有四中方式,
tryAcquire
也就决定了,这个锁时公平锁/非公平锁,可重入锁/不重冲入锁等;(好比上面的实例就是不可重入非公平锁,具体分析之后还会详细讲解)流程图:
源码分析:
public final void acquire(int arg) { if (!tryAcquire(arg) && // 首先尝试快速获取锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失败后入队,而后阻塞获取 selfInterrupt(); // 最后若是取锁的有中断,则从新设置中断 }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 只要取锁过程当中有一次中断,返回时都要从新设置中断 for (;;) { final Node p = node.predecessor(); // 一直阻塞到前继节点为头结点 if (p == head && tryAcquire(arg)) { // 获取同步状态 setHead(node); // 设置头结点,此时头部不存在竞争,直接设置 // next 主要起优化做用,而且在入队的时候next不是CAS设置 // 也就是经过next不必定能够准确取到后继节点,因此在唤醒的时候不能依赖next,须要反向遍历 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 判断并整理前继节点 parkAndCheckInterrupt()) // 当循环最多第二次的时候,必然阻塞 interrupted = true; } } finally { if (failed) // 异常时取消获取 cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { // 大于0说明,前继节点异常或者取消获取,直接跳过; do { node.prev = pred = pred.prev; // 跳过pred并创建链接 } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 标记后继节点须要唤醒 } return false; }
其中 node.prev = pred = pred.prev;
相关的内存分析能够查看 JAVA 连等赋值问题;
流程图:
源码分析:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 中断退出 if (!tryAcquire(arg)) // 获取同步状态 doAcquireInterruptibly(arg); // 中断获取 }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); // 加入队尾 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && // 判断并整理前继节点 parkAndCheckInterrupt()) // 等待 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
流程图:
源码分析:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 超时退出 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
释放锁时,判断有后继节点须要唤醒,则唤醒后继节点,而后退出;有唤醒的后继节点从新设置头结点,并标记状态
public final boolean release(int arg) { if (tryRelease(arg)) { // 由用户重写,尝试释放 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒后继节点 return true; } return false; }
public class ShareLock implements Lock { private Syn sync; public ShareLock(int count) { this.sync = new Syn(count); } @Override public void lock() { sync.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } private static final class Syn extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 5854536238831876527L; Syn(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } @Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; //若是新的状态小于0 则返回值,则表示没有锁资源,直接返回 if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int retrunCount) { for (; ; ) { int current = getState(); int newCount = current + retrunCount; if (compareAndSetState(current, newCount)) { return true; } } } } }
上述代码中的 AbstractQueuedSynchronizer.state
表示锁的资源数,可是仍然是不可重入的;
一样对于共享模式取锁也有四中方式:
@Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; //若是新的状态小于0 则返回值,则表示没有锁资源,直接返回 if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
须要注意的是 tryAcquireShared
方法是快速尝试获取锁,并更新锁状态,若是失败则必然锁资源不足,返回负值;
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 快速获取失败 doAcquireShared(arg); // 阻塞获取锁 }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 设置头结点,并是状况将信号传播下去 p.next = null; // help GC if (interrupted) selfInterrupt(); // 从新设置中断状态 failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// propagate 表示线程获取锁后,共享锁剩余的锁资源 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // propagate > 0 :表示还有剩余的资源 // h.waitStatus < 0 : 表示后继节点须要被唤醒 // 其他还作了不少保守判断,确保后面的节点能及时那到锁 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); // 唤醒后继节点 } }
根据上面的代码能够看到,共享模式和独占模式获取锁的主要区别:
其他的思路和独占模式差很少,他家能够本身看源码;
一样 tryReleaseShared
是由用户本身重写的,这里须要注意的是若是不能确保释放成功(由于共享模式释放锁的时候可能有竞争,因此可能失败),则在外层 Lock
接口使用的时候,就须要额外处理;
@Override public boolean tryReleaseShared(int retrunCount) { for (; ; ) { int current = getState(); int newCount = current + retrunCount; if (compareAndSetState(current, newCount)) { return true; } } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 尝试取锁成功,此时锁资源已从新设置 doReleaseShared(); // 唤醒后继节点 return true; } return false; }
doReleaseShared
方法必然执行两次,
最终使得头结点的状态必然是 PROPAGATE
;
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; ... }
如代码所示条件队列是一个由 Node
组成的链表,注意这里的链表不一样于同步队列,是经过 nextWaiter
链接的,在同步队列中 nextWaiter
用来表示独占和共享模式,因此区分条件队列的方法就有两个:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 添加节点到条件队列 int savedState = fullyRelease(node); // 确保释放锁,并唤醒后继节点 int interruptMode = 0; while (!isOnSyncQueue(node)) { // node 不在同步队列中 LockSupport.park(this); // 阻塞 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); // 从头结点一次唤醒 } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 将节点移动到同步节点中 (first = firstWaiter) != null); }
由于篇幅有点长了,因此条件队列讲的也就相对简单了一点,可是大致的思路仍是讲了;