AQS
是什么,相信你们都不陌生这个题目,那么AQS
究竟是什么呢? AQS的全称是 Abstract Queued Synchronizer, 从字面意思理解也就是 抽象队列同步器 ,实际上AQS
确实就是排队同步队列 , 也是一个抽象类,须要 自定义 同步队列中 可执行权 的 获取和释放中的逻辑(从新定义获取和释放语义),也就是重写 tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
等方法,固然也能够 自定义方法 来经过调用 AQS
提供的 判断方法进行逻辑判断。在 JDK9
以前 AQS
是依赖于 CAS
的,其底层是经过 Unsafe
的compareAndSwap*
方法实现同步更改,在以后则是使用 VarHandle
, 也替代了 Unsafe
。说白了 AQS 利用 VarHandle 保证操做的原子性。java
大白话就能够理解为: 表示某件事情同一时间点仅有一人能够进行操做,若有多人则须要排队等待, 等到当前操做人完成后通知下一我的。node
在源码中 AbstractQueuedSynchronizer
继承了 AbstractOwnableSynchronizer
, 同时也就继承了 exclusiveOwnerThread
属性,也就是 独占模式同步器的拥有者 ** , 也就意味着该线程是当前正在执行的线程**。git
在 AQS
中有几个重点方法,分别是: acquire
acquireInterruptibly
tryAcquireNanos
release
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
releaseShared
下面逐一分析。github
在分析源代码以前,先来看一张图来了解一下 AQS排队同步队列
和 Node
节点中的 waitStatus
状态 。并发
waitStatus
状态都分为是什么app
等待超时
或者 被中断
,须要从同步队列中剔除,节点进入该状态之后不会再发生变化了。官方解释就是 Acquires in exclusive mode, ignoring interrupts 获取独占模式并忽略interrupt(中断), 翻译成大白话就是就能够理解为获取独占模式, 看一下源码oop
public final void acquire(int arg) {
// 判断线程是否有可继续执行的权限, 若是没有则建立node 加入到队列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
在 acquire
方法中分别调用了 tryAcquire
acquireQueued
和 addWaiter
方法,其中 tryAcquire
方法是须要自定义(重写) 获取、 执行权限 的逻辑,这里咱们以 AbstractQueuedSynchronizer
的实现 ReentrantLock
为例,简单分析一下,先看 tryAcquire
方法源码分析
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前线程的重入次数 若是是 0 则表明第一次
int c = getState();
if (c == 0) {
// 判断是否存在队列 && 能够获取到可执行权
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// 设置独占模式同步器的拥有者 也就是是哪一个线程持有
setExclusiveOwnerThread(current);
return true;
}
}
// 若是进入线程是 持有可执行权的线程 则作重入 + 1 操做
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
tryAcquire
方法核心代码就是 判断执行权限 ,这里就不具体分析了,会在下一篇文章中进行ReentrantLock的源码分析,接下来重点看 acquireQueued
和 addWaiter
方法。ui
private Node addWaiter(Node mode) {
// 经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
// 若是 tail 不是 null 则 设置 新建Node的前驱节点(prev) 指向 tail节点 反之 初始化同步队列
if (oldTail != null) {
// 设置 新建Node的前驱节点(prev) 指向 tail节点
node.setPrevRelaxed(oldTail);
// 从新设置 tail 节点 指向 新建的Node节点
// 白话就是 队列中的最后一个节点 == tail节点
if (compareAndSetTail(oldTail, node)) {
// 设置 未更改时的 tail节点 中 next 节点, 指向 新建Node节点
oldTail.next = node;
return node;
}
} else {
// 初始化同步队列器
initializeSyncQueue();
}
}
}
复制代码
// 若是未存在同步队列 则初始化同步队列
private final void initializeSyncQueue() {
Node h;
// 设置 AQS head节点 为一个新建节点
if (HEAD.compareAndSet(this, null, (h = new Node())))
// 赋值操做
tail = h;
}
复制代码
在 addWaiter
和 initializeSyncQueue
方法中,核心就是新建 Node 节点并经过 acquireQueued
方法将节点加入到 AQS
中,接下来分析一下 addWaiter
具体作了什么this
经过构造方法建立新的Node节点,并经过入参 mode
指定Node节点的模式,共享或独占。固然这里是设置的独占模式。
循环操做 新建Node节点并将 新建节点 和 tail
节点创建关系。首先判断 tail
是不是null,若是是则 步骤3
,反之 步骤4
若是 tail
节点不为null, 首先将新建的 Node节点
中的 前驱节(prev)
点设置为当前的 tail
节点,而后经过 VarHandle
将 AQS
的 tail
节点改成 新建的Node
节点,若是修改为功则将上一步 未更改时的 tail
节点 (也就是代码中的oldTail) 中的 next
指向 新建的Node节点
,反之则可能由于并发操做致使 tial
节点已经被其余线程变动,须要再次循环操做直至成功。
若是 tial
节点是null, 则须要实例化同步对列,也就是 AQS
, 经过调用 initializeSyncQueue
进行初始化操做,经过 VarHandle
设置 AQS
的 head
指向一个新建节点 (new Node) , 而后将 head
节点的引用赋值给 tail
节点。这里注意一下,是将 head
节点的 引用
赋值给 tail
节点, 也就是这时候 head
节点 和 tail
节点是同时指向一块内存地址 , 这里的用意就是在新建队列的时候, head
节点和新建节点的 prev
节点要保持是同一个引用 ,由于在后续的判断中, 获取可执行权的条件就是 AQS
的 head
节点是否等于当前节点的 prev
节点。
由于 addWaiter
方法中是一个循环,在 建立队列后 须要将队列新建的Node节点作关联,因此还须要在执行一次 步骤3
addWaiter
方法分析完后,再来看一下 acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
// 线程中断状态
boolean interrupted = false;
try {
for (;;) {
// 获取经过 addWaiter 建立的Node方法
final Node p = node.predecessor();
// 判断 新建的Node节点是否等于head节点 && 能够获取到 可执行权
if (p == head && tryAcquire(arg)) {
// 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
// 节点设置为当前线程建立的Node节点,能够保证head节点永远均可以和后续节点有关联关系
setHead(node);
// 设置 next
p.next = null; // help GC
// 返回
return interrupted;
}
// 判断Node节点的线程是否符合被 wait ,在这里用的是 park
if (shouldParkAfterFailedAcquire(p, node))
// 将线程 wait 而且线程被唤醒后 判断线程是否被中断
// |= 操做等于 interrupted = interrupted | parkAndCheckInterrupt()
// |(按位或) 会将 | 两边的值进行二进制计算,运算规则一个为真即为真, 例如 1|1 = 1 或 1|0 = 1,
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
复制代码
acquireQueued
也是核心方法,在其中会对线程进行 LockSupport.park
进行控制,其实现方式是 循环 ,下面就具体分析一下
当前线程
所建立的 Node
节点中的 前置节点(prev)
。前置节点(prev)
是否等于 AQS
的 head
节点 && 能够获取到 可执行权
,若是这两个条件成立则看 步骤3
,反之看 步骤4
, 若是知足这两个条件,也就表明着 head
节点 所对应的线程
已经执行完成而且作了释放**(release方法)**操做。步骤2
条件成立,也就是 线程被唤醒后并获取到了可执行权,则将 head
节点设置为 当前线程建立的Node节点 。步骤2
条件不成立,则判断 Node
节点所对应的线程的状态是否符合改成 wait
状态。这个逻辑在 shouldParkAfterFailedAcquire
方法中, 接下来看一下。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 若是前置节点的 waitStatus == Signal 也就是 == -1
// 若是是 知足 线程 wait 条件
if (ws == Node.SIGNAL)
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
// 若是状态 > 0 也就是1 也就是线程已经被中断了
// 在这里就会判断 前置节点的前置节点 是否仍是被中断,若是是 循环继续判断前置节点,
// 若是不是 则将前置节点的next节点改成 入参的 node 节点 而后 返回false 继续循环判断
// 这里的做用就是 排除掉已经被中断的线程
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
// 不然设置 状态为 -1 等待唤醒状态 再次进来之后就会被 wait
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
复制代码
在 shouldParkAfterFailedAcquire
方法中主要就是判断节点所属的线程是否符规则,也就是更改成 wait
状态
waitStatus
是不是 SIGNAL,若是是知足条件,返回 true
,线程将会 wait
。waitStatus
是否大于 0, 也就是1 ,若是条件成立,则表明 当前线程节点 的 前置节点 所对应的线程已经被中断了,须要从新指定当前线程节点的前置节点(prev),经过循环的方式找到前置节点的节点,若是依然被中断,则继续循环,直到找到未中断线程所对应的Node节点为止。若是条件不成立则将 waitStatus
状态改成 SIGNAL
返回false, 再经过 acquireQueued
方法中的循环在执行一次 。prev
节点中的 waitStatus
状态,是由于只有 前置节点(prev)
的 waitStatus
等于 SIGNAL 也就是 -1
时,就表明当前线程新建的Node节点的线程处于等待状态,在当前节点的前置节点(prev)
的线程释放了同步状态或被取消,将会通知当前节点,使当前节点的线程得以运行到这里咱们整个的 acquire
方法就解析完了,接下来分享 release,有获取才有释放,会在release讲完后为你们分享一下 acquire 到 release的整个流程。
release
字面一次就是释放,释放经过 acquire
获取的独占模式,可让 AQS
后续节点所对应的线程能够获得执行权,下面就看一下 release 方法
public final boolean release(int arg) {
// 步骤 1 2 3
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
在 release
方法中首先会调用 tryRelease
方法,这里 tryRelease
方法将会有子类实现,先以 RenntrantLook
为例,这里就不展现代码了,就简单描述一下逻辑
state
减去 arg
,state
表明重入次数。步骤1
结果是0,则将 独占模式同步器的拥有者 改成null并返回true。步骤1
结果不是0, 则从新设置 state,返回false,表示还不能够释放。接下来判断 AQS
的 head
节点不是null而且 waitStatus
状态不等于0,表明 释放成功,而后进入 unparkSuccessor 方法,进行对下一个Node节点所对应的线程进行唤醒操做。
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
// 若是head的waitStatus<0 则将head的waitStatus改成0
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
// 若是 head 节点的 next 节点 == null 或者 节点 的状态 大于0 也就是1 也就是 下一个节点所对应的线程被中断了
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 将会循环整个同步队列,从tail节点开始 往前循环,直到只找到 waitStatus <= 0 的Node节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 若是节点不是 null 则唤醒该节点的线程
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
接下来分析一下在 unparkSuccessor 方法中都作了什么
waitStatus
<0, 则将 head
的 waitStatus
改成0。head
节点的 next
节点等于null 或者 waitStatus
状态 大于0也就是1, 表示 head
节点所对应的 next
节点所对应的线程已经被中断了,将会循环整个同步队列,从 tail
节点开始往前循环,直到找到最前面的一个 waitStatus <= 0 的Node节点。步骤2
条件不知足 则表明 head 的 next 节点不是null 或 waitStatus状态不等于1,调用 unpark
方法唤醒线程。至此 release
方法就解析完成了,很简单,核心功能仅仅是若是符合规则,则调用 unpark
方法唤醒 AQS
队列中下一个节点所对应的线程。下面就分析一下 acquire 和 releae 整个流程。
acquire
和 release
的总体流程接下来分析 acquireInterruptibly
方法,acquireInterruptibly
方法和 acquire
实际上是同样的,只不过多判断了一下是否被中断。
acquireInterruptibly
方法就是 可中断的获取可执行权 ,具体流程和 acquire
类似。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
复制代码
在 acquireInterruptibly
方法中,首先会经过 Thread.interrupted()
方法判断线程是否被中断,如已经被中断,则抛出 InterruptedException
, 反之则调用 tryAcquire
方法,判断是否 获取到执行权,若是未获取到则调用 doAcquireInterruptibly
方法进行建立 AQS
和 新的Node节点
,并将 新建的Node节点
和 AQS
的 head
节点进行关联。 到这里可能就会想到,这不是和 acquire
方法是同样的嘛,没错,就是同样。看一下源码
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 新建 Node 节点 并将节点 和 AQS 队列简历关联
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
复制代码
看到源码是否是很熟悉,这不就是上边咱们分析过的 acquire
方法嘛,惟一和 acquire
方法不一样的就是,若是线程在被唤醒之后,也就是 head
节点的线程调用了 release
释放了可执行权,而且经过 LockSupport.park
方法唤醒了 head 的 next节点所属的线程时,head
的 next
节点所属的线程已经被中断了就会抛出 InterruptedException
异常。
这里就不进行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源码展现了,若是还不明白就看一下上边 acquire
方法的源码分析。
tryAcquireNanos
方法的含义就是 可超时的获取执行权 ,若是设置的 超时时间 到了,还未获取到可执行权,则直接返回 false 。这里的超时时间单位是 纳秒 ns
, 1秒(s)=1000000000纳秒(ns)
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
复制代码
看到 tryAcquireNanos
方法会想到什么? 看到方法上的 throws InterruptedException
就一下想到了上面刚刚刚说的 acquireInterruptibly
方法,支持可中断的获取执行权。首先这里会先调用 tryAcquire
方法获取执行权,若是能够获取到执行权则直接返回,反之则调用 doAcquireNanos(arg, nanosTimeout)
方法进行 新建 Node 节点
并和 AQS
的 head
节点进行关联,而且会将节点加入到 AQS的队列中,而后将节点所属的线程放入等待队列中。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 判断超时时间 是否小于等于 0 若是这 则直接返回 false
if (nanosTimeout <= 0L)
return false;
// 使用当前时间的 纳秒 + 超时时间的纳秒 = 将来超时的超时时间,用来作parkNanos,
// 就至关于 Object.wait(long timeoutMillis) 方法
final long deadline = System.nanoTime() + nanosTimeout;
//经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 判断 新建的Node节点是否等于head节点 && 能够获取到'可执行权'
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
// 节点设置为当前线程建立的Node节点,能够保证head节点永远均可以和后续节点有关联关系
setHead(node);
p.next = null; // help GC
return true;
}
// 判断计算过的 deadline 时间 - 当前时间 是否小于0 是则 超时时间已过,返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 判断Node节点的线程是否符合被 wait ,在这里用的是 park 而且 纳秒必须大于 1000
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 判断线程是否被中断 若是中断则抛出 InterruptedException 异常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
复制代码
若是在 acquire
和 release
分析中理解其中原理是否是以为这里很简单,这里也不列举已经分析过的方法了,直接说出不一样点
LockSupport.parkNanos(this, nanosTimeout)
方法,也就至关于 Object.wait(long timeoutMillis)
方法,等待的线程的状态会在超时时间失效从 wait
变为 run
deadline时间
- 当前时间
是否小于0, 若果是则表明超时时间已过,直接返回false,反之则继续执行。InterruptedException
异常。是否是很简单,读者要把重点放到 acquire
和 release
上,其余的就很容易了。上面的内容均是获取的独占模式,下面来说解一下 共享模式。
public final void acquireShared(int arg) {
// 经过调用 tryAcquireShared 方法获取可执行权,若是未获取到则调用 doAcquireShared
// 方法进行 新建 Node节点 并和 AQS的 head 节点创建关系,
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
经过 acquireShared
方法能够看到和 acquire
并无什么区别,获取可执行权的代码须要 自定义同步器
实现,在共享模式分析中就不对 ReentrantReadWriteLock
源码进行分析了 ,会在后面对 ReentrantLock
和 ReentrantReadWriteLock
进行源码分析,接下来看一下 doAcquireShared
,看它是否是和 acquireQueued
方法也是同样的逻辑呢?
private void doAcquireShared(int arg) {
// 获取经过 addWaiter 建立的Node方法
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
// 获取新建节点的前置节点
final Node p = node.predecessor();
// 判断 新建的Node节点是否等于head节点
if (p == head) {
// 若是上边的 p==head 须要在此判断是否能够获取到'可执行权'
int r = tryAcquireShared(arg);
if (r >= 0) {
// 若是获取到了可执行权
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判断Node节点的线程是否符合被 wait ,在这里用的是 park
if (shouldParkAfterFailedAcquire(p, node))
// 将线程 wait 而且线程被唤醒后 判断线程是否被中断
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
复制代码
在 doAcquireShared
方法中,咱们看到,首先依然是调用 addWaiter
方法进行新建Node,这里就很少说,能够看一下上边的方法详解,doAcquireShared
也是核心方法,在其中会对线程进行 LockSupport.park
进行控制,其实现方式是 循环 ,下面就具体分析一下
当前线程
所建立的 Node
节点中的 前置节点(prev)
。前置节点(prev)
是否等于 AQS
的 head
节点,若是条件成立则看 步骤3
,反之看 步骤4
, 若是知足条件,也就表明着 head
节点 所对应的线程
已经执行完成而且作了释放**(release方法)**操做。步骤2
条件成立,则再次判断 当前线程是否能够获取到可执行权 ,若是能够则设置 AQS
的 head
节点为当前线程的 新建的Node节点
, 反之则看 步骤3
。步骤2
或 步骤3
条件不成立,则判断 Node
节点所对应的线程的状态是否符合改成 wait
状态,也就是是否能够加入到等待队列中。这个逻辑在 shouldParkAfterFailedAcquire
方法中,能够看一下上边的方法详解。public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
复制代码
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 获取经过 addWaiter 建立的Node方法
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 获取新建节点的前置节点
final Node p = node.predecessor();
// 判断 新建的Node节点是否等于head节点
if (p == head) {
// 若是获取到了可执行权
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判断Node节点的线程是否符合被 wait && 将线程 wait 而且线程被唤醒后判断线程是否被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
复制代码
能够看到 acquireSharedInterruptibly
和 acquireShared
方法并无什么太大区别,惟一的区别就是在调用 parkAndCheckInterrupt
线程状态被 wait ,等到当前节点 prev
节点的所属线程调用了 release
方法后,唤醒当前节点所属线程时,若是当前线程被中断了会抛出 InterruptedException
异常。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
复制代码
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// 若是超时时间 小于等于0 则直接 返回失败
if (nanosTimeout <= 0L)
return false;
// 使用当前时间的 纳秒 + 超时时间的纳秒 = 将来超时的超时时间,用来作parkNanos,
// 就至关于 Object.wait(long timeoutMillis) 方法
final long deadline = System.nanoTime() + nanosTimeout;
//经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占。这里是共享
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 判断 新建的Node节点是否等于head节点
final Node p = node.predecessor();
if (p == head) {
// 是否能够得到可执行权
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
// 判断计算过的 deadline 时间 - 当前时间 是否小于或等于0 是则超时时间已过,返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 判断Node节点的线程是否符合被 wait ,在这里用的是 park 而且 纳秒必须大于 1000
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// // 判断线程是否被中断 若是中断则抛出 InterruptedException 异常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
复制代码
有没有发现,doAcquireSharedNanos
方法和 doAcquireNanos
方法很类似呢,若是在 acquireShared
分析中理解其原理是否是以为这里很简单,这里也不列举已经分析过的方法了,直接说出不一样点
LockSupport.parkNanos(this, nanosTimeout)
方法,也就至关于 Object.wait(long timeoutMillis)
方法,等待的线程的状态会在超时时间失效从 wait
变为 run
deadline时间
- 当前时间
是否小于0, 若果是则表明超时时间已过,直接返回false,反之则继续执行。InterruptedException
异常。是否是很简单,读者要把重点放到 acquire
和 acquireShared
上,其余的就很容易了。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 若是更新失败则循环
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒 head 节点的 next 节点所属的线程
unparkSuccessor(h);
}
// 若是更新失败则循环
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若是 head 改变了则再次循环
if (h == head) // loop if head changed
break;
}
}
复制代码
tryReleaseShared 方法和 release 方法稍微有一点区别,下面咱们就具体分析一下
tryReleaseShared
方法,若是释放成功了,就表明有资源空闲出来,那么就看 步骤2doReleaseShared
去唤醒后续结点, 在 doReleaseShared
方法中采用了 loop
,每一次循环的过程都是首先得到 head
节点,若是 head
结点不为空且不等于 tail
结点,那么先得到该节点的状态,若是是SIGNAL的状态,则表明它须要有后继结点去唤醒,首先将其的状态变为0(初始状态),而后经过 unparkSuccessor
方法唤醒后续节点所属的线程,若是结点状态一开始就是0,那么就给他转换成 PROPAGATE 状态,保证在后续获取资源的时候,还可以向后面传播。至此咱们已经分析完了 AbstractQueuedSynchronizer 的源码,是否是很简单呢?最主要的仍是要理解AQS的总体流程,说白了AQS是依赖两大利器,也就是 VarHandle 和 LockSupport。
博客地址:lantaoblog.site