当咱们提到 juc 包下的锁,就不得不联系到 AbstractQueuedSynchronizer 这个类,这个类就是大名鼎鼎的 AQS,AQS 按字面意思翻译为抽象队列同步器,调用者能够经过继承该类快速的实现同步多线程下的同步容器。不论是咱们熟悉的 ReadWriteLock 亦或是 ReentrantLock,或者 CountDownLatch 与 Semaphore,甚至是线程池类 ThreadPoolExecutor 都继承了 AQS。java
在本文,将深刻源码,了解 AQS 的运行机制,了解经过 AQS 实现非公平锁,公平锁,可重入锁等的原理。node
AQS 的底层数据结构实际上是一条双向链表以及一个表明锁状态的变量 state
。当加锁后,state
会改变,而竞争锁的线程会被封装到节点中造成链表,而且尝试改变 state
以获取锁。数据结构
在 AQS 中有一个 Node 内部类,该类即为链表的节点类。当经过 AQS 竞争锁的时候,线程会被封装到一个对应的节点中,多个竞争不到锁的线程最终会连成一条链表,这条链表上节点表明的线程处于等待状态,所以咱们称之为等待队列,也就是 CLH。多线程
节点类中封装了竞争锁的线程的等待状态:工具
和线程池中的状态同样,Node 只有小于 0 的时候才处于正常的等待状态中,所以不少地方经过判断是否小于 0 来肯定节点是否处于等待状态。oop
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 等待状态 volatile int waitStatus; volatile Node prev; volatile Node next; // 等待线程 volatile Thread thread; // 下一等待节点 Node nextWaiter; }
private volatile int state;
AQS 中提供了 state
变量作为锁状态,通常来讲,0 被视为无锁状态,1 被视为加锁状态,若是是可重入锁,就会大于 1。ui
所以,AQS 中的加锁解锁实际上就是经过 CAS 改变 state
的过程,即下列三个方法:this
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 的同步过程其实就是同步队列节点中依次获取锁的过程。AQS 一共提供了独占和非独占两种获取资源的方法:线程
acquire()
:以独占模式获取锁;release()
:以独占模式释放锁;acquireShared()
:以共享模式获取锁;releaseShared()
:以共享模式释放锁;独占锁和非独占锁二者从流程上来讲都差很少,只在一些实现上有区别。翻译
独占锁,顾名思义,即只有占有锁的线程才能操做资源,在 synchronize 底层的锁中,独占经过锁对象对象头中的指针来声明独占的线程,而在 AQS 中则经过父类 AbstractOwnableSynchronizer 提供的 exclusiveOwnerThread
变量来声明独占的线程:
private transient Thread exclusiveOwnerThread;
此外,AQS 并未提供其余具体实现。AQS 独占锁加锁的方法是 acquire()
,其中涉及到 tryAcquire()
方法是一个空实现,须要由子类实现并在在里面进行具体的独占判断:
public final void acquire(int arg) { // 尝试获取锁 if (!tryAcquire(arg) && // 添加到等待队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 进入等待队列后阻塞 selfInterrupt(); }
里面还涉及到 addWaiter()
,acquireQueued()
和 selfInterrupt()
四个方法。
在 AQS 中,tryAccquire()
是一个未实现的方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
他须要由具体的实现类去实现,并完成获取资源的功能。这里咱们以用可重入锁 ReentrantLock 内为例(后文无声明亦同)。
在 ReentrantLock 中,锁分为公平锁和非公平锁两种,两者的区别在于公平锁中等待队列中的线程严格按顺序获取锁,非公平锁中的线程可能不会按顺序获取锁。ReentrantLock 有一个内部类 Sync 继承了 AQS,提供基本的加锁解锁方法。
而后分别有非公平锁 NonfairSync 类与公平锁类 FairSync 去继承 Sync,进一步区别公平锁与非公平的锁的实现逻辑。咱们先看公平锁 FairSync 的tryAccquire()
方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 若是当前等待队列中没有线程在等待 if (!hasQueuedPredecessors() && // 尝试CAS修改state compareAndSetState(0, acquires)) { // 将当前锁设为本身独占 setExclusiveOwnerThread(current); return true; } } // 若是锁已经被本身获取过了,即重入 else if (current == getExclusiveOwnerThread()) { // state + 1,即多获取一次锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 没有获取锁 return false; }
而非公平锁与公平锁的tryAccquire()
主要差异在于,公平锁会先看看有没有线程在等待,没有才去竞争锁,而非公平锁不会看有没有线程在等待,不管如何都会先去竞争一次锁。
其余锁的 tryAccquire()
与 ReentrantLock 的大致相同。
addWaiter()
方法用于建立并添加等待节点。
private Node addWaiter(Node mode) { // 以共享或者独占模式建立节点 Node node = new Node(Thread.currentThread(), mode); // 尾插法插入节点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是队列为空 enq(node); return node; }
这里涉及到一个 enq()
方法,这个方法不复杂,主要是自旋初始化 AQS 中的头结点和尾节点,值得注意的是,这里的头结点其实是一个哨兵节点,自己并没有意义,当等待队列排队获取资源的时候,会直接从 head.next 开始。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这个方法主要作两件事:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的上一节点 final Node p = node.predecessor(); // 再次尝试获取锁,若是前驱节点已是头节点了,或者获取资源成功 if (p == head && tryAcquire(arg)) { // 将当前节点设置为头结点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 前驱节点不是头结点或者获取锁失败 // 若是前驱节点须要被 park 挂起 if (shouldParkAfterFailedAcquire(p, node) && // 挂起当前线程 parkAndCheckInterrupt()) interrupted = true; } } finally { // 旧头结点已经处理完了,直接删除 if (failed) cancelAcquire(node); } }
这里涉及到一个 shouldParkAfterFailedAcquire()
方法:
这个方法主要是根据前驱节点的状态判断当前节点是否须要被 park 的。若是这个方法返回 true,那么说明前驱节点被设置为 SIGNAL 状态,而后进入 parkAndCheckInterrupt()
方法把当前线程挂起,等待前驱节点的唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 若是前驱节点状态为SIGNAL if (ws == Node.SIGNAL) return true; // 若是前驱节点已经失效 if (ws > 0) { // 移除所有失效节点,直到前驱节点为正常等待状态的节点为止 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前驱节点设置为SIGNAL,确保不影响后续节点的唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是上述shouldParkAfterFailedAcquire()
返回 ture,那么就会接着执行 parkAndCheckInterrupt()
方法挂起线程:
private final boolean parkAndCheckInterrupt() { // 让当前线程等待,并中断任务 LockSupport.park(this); return Thread.interrupted(); }
当线程使用 acquire()
方法获取锁的时候:
tryAcquire()
方法,这是个须要由子类实现的空方法,公平或者非公平在这个方法中让线程去获取锁,得到锁的线程要修改 state
;addWaiter()
方法,这个方法用于将线程封装到节点中,并以尾插法插入等待队列的链表,同时,若是等待队列没有初始化就会在此处先初始化;acquireQueued()
方法,此时会再次试图获取锁,若是此时仍是失败,就会判断当前节点的前驱节点是否失效,若是不是就直接将前驱节点状态改成 SIGNAL ,而后执行 parkAndCheckInterrupt()
方法挂起当前线程,若是是就一直找到一个正常等待的前驱节点为止,改前驱节点状态而后再挂起线程。和 AQS 使用 acquire()
方法加锁的过程相似,AQS 也有一个 release()
的解锁方法,他们一样须要实现类本身去实现 tryRelease()
方法。
public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { Node h = head; // 若是当前头节点为空且不为初始状态 if (h != null && h.waitStatus != 0) // 唤醒后继节点 unparkSuccessor(h); return true; } return false; }
和 tryAcquire()
同样,AQS 不提供 tryRelease()
的具体实现,而是交由子类去实现它。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
咱们依然以可重入锁 ReentrantLock 为例,去了解 ReentrantLock 中 tryRelease()
的实现。
虽然 ReentrantLock 中有公平锁和非公平锁两种实现,可是他们是释放过程都是同样的,都经过他们的父类,即继承 AQS 的内部类 Sync 的 tryRelease()
方法来实现释放的功能:
protected final boolean tryRelease(int releases) { // 可重入锁,减去一次持锁次数 int c = getState() - releases; // 若是当前线程不是持有锁的线程则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是可重入次数为0,说明确实释放锁了 if (c == 0) { free = true; // 独占线程设置为null setExclusiveOwnerThread(null); } setState(c); return free; }
这个地方也很好理解,就是让 tryRelease()
去执行释放锁的过程,换句话说,就是改变 state
。
unparkSuccessor()
方法的主要用途是
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 若是头节点状态还处于等待状态,则改回初始状态 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 若是后继节点存在并被标记为CANCELLED状态 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点开始,找到离node最近的处于等待状态的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒节点 LockSupport.unpark(s.thread); }
当线程使用 release()
方法释放锁的时候:
tryRelease()
方法释放资源,改变 state
以释放锁unparkSuccessor()
方法唤醒后继节点,若是后继节点挂了,就找到最近的下一个处于等待状态的有效节点唤醒。相对 AQS 独占锁,共享锁在 AQS 中以及提供好的相关的实现。共享锁经过 acquireShared()
方法加锁,经过releaseShared()
方法解锁。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()
也是一个空实现方法,须要由子类去实现。根据注释,咱们不难理解它的做用:
其中,针对共享锁,比较具备表明性的是读写锁 ReentrantReadWriteLock,它经过 state
的高 16 位记录读锁,低 16 位记录写锁,在获取锁资源的时候,若是检测存在写锁则没法得到锁,若是是读锁则获取资源并递增读锁计数器,这部分的逻辑就是在其子类中获得的实现。
基于上面的 tryAcquireShared()
方法,doAcquireShared()
要作的事情显然很明了了:
private void doAcquireShared(int arg) { // 建立共享模式的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 若是前驱节点已是头结点,即当前节点须要获取锁 if (p == head) { // 尝试获取共享锁 int r = tryAcquireShared(arg); // 唤醒后继节点 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 判断当前线程是否须要被挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这里有一个 setHeadAndPropagate()
方法,根据方法名能够猜出是用来设置头结点和唤醒后继共享节点的:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 设置头结点 setHead(node); // 若是后续有须要唤醒的节点,而且当前节点没有被CANCELLED if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 若是下一节点处于共享状态 if (s == null || s.isShared()) // 释放共享锁 doReleaseShared(); } }
这里其实只作了一些条件判断,确保有后继节点而且后继节点是正常节点,核心逻辑实际上是 doReleaseShared()
方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒节点 unparkSuccessor(h); } else if (ws == 0 && // 若是后续节点不须要唤醒,则设置为PROPAGATE避免影响后继节点的唤醒 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
解锁使用的releaseShared()
方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
这里的 tryReleaseShared()
其实跟独占锁的tryRelease()
相似,即改变状态以表示释放资源,而 doReleaseShared()
即上文唤醒后继节点的方法。
共享锁和独占锁的根本区别在于,当头是共享模式时,它被唤醒后会直接尝试唤醒后继全部共享模式的节点,直到遇到第一个非共享模式的节点为止,而不是跟独占锁同样只唤醒后继节点。
AQS 在内部为此了一个变量 state
,用于记录锁状态,线程经过 CAS 修改 state
便是加锁解锁过程。
AQS 内存维护了一条双向链表,即等待队列 CLH,等待锁的线程被封装为 Node 节点连成链表,经过 LockSuppor 工具类的 park()
和 unpark()
方法切换等待状态。
AQS 提供了独占和非独占两种锁实现方式,分别提供了 acquire()/release()
和acquireShared()/releaseShared()
两套加锁解锁方式,同时,基于 state
有衍生出可重入和非可重入锁的实现——即重入锁在state=1
的状况下继续递增,解锁在 state
上递减直到为 0 为止。而且,根据是否先判断等待队列中是否已存在等待线程,而后再尝试获取锁的状况,又分出了公平锁和非公平锁两种实现。