AbstractQueuedSynchronizer源码解析

AQS是什么?

AQS是什么,相信你们都不陌生这个题目,那么AQS究竟是什么呢? AQS的全称是 Abstract Queued Synchronizer, 从字面意思理解也就是 抽象队列同步器 ,实际上AQS确实就是排队同步队列 , 也是一个抽象类,须要 自定义 同步队列中 可执行权获取和释放中的逻辑(从新定义获取和释放语义),也就是重写 tryAcquire tryRelease tryAcquireShared tryReleaseShared 等方法,固然也能够 自定义方法 来经过调用 AQS 提供的 判断方法进行逻辑判断。在 JDK9 以前 AQS 是依赖于 CAS 的,其底层是经过 UnsafecompareAndSwap* 方法实现同步更改,在以后则是使用 VarHandle , 也替代了 Unsafe说白了 AQS 利用 VarHandle 保证操做的原子性java

大白话就能够理解为: 表示某件事情同一时间点仅有一人能够进行操做,若有多人则须要排队等待, 等到当前操做人完成后通知下一我的。node

AQS源码解析

前言

在源码中 AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer , 同时也就继承了 exclusiveOwnerThread 属性,也就是 独占模式同步器的拥有者 ** , 也就意味着该线程是当前正在执行的线程**。git

AQS 中有几个重点方法,分别是: acquire acquireInterruptibly tryAcquireNanos release acquireShared acquireSharedInterruptibly tryAcquireSharedNanos releaseShared 下面逐一分析。github

在分析源代码以前,先来看一张图来了解一下 AQS排队同步队列Node 节点中的 waitStatus 状态 。并发

waitStatus 状态都分为是什么app

  • CANCELLED,值为1,表明同步队列中等待的线程 等待超时 或者 被中断,须要从同步队列中剔除,节点进入该状态之后不会再发生变化了。
  • SIGNAL,值为-1,表明后继节点的线程处于等待状态,而若是当前节点的线程若是释放了同步状态或被取消,将会通知后继结点,使后继节点的线程得以运行。
  • CONDITION, 值为-2,节点在等待队列,节点线程等待在Condition上,当其余线程调用Condition的signal方法后,该节点将会从等待队列中转移到同步队列中。
  • PROPAGATE, 值为-3,表示共享式同步状态回去将会无条件的被传播下去,
  • INITAL, 值为0,初始状态。

acquire(获取)

官方解释就是 Acquires in exclusive mode, ignoring interrupts 获取独占模式并忽略interrupt(中断), 翻译成大白话就是就能够理解为获取独占模式, 看一下源码oop

public final void acquire(int arg) {
    // 判断线程是否有可继续执行的权限, 若是没有则建立node 加入到队列中
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
复制代码

acquire 方法中分别调用了 tryAcquire acquireQueuedaddWaiter 方法,其中 tryAcquire 方法是须要自定义(重写) 获取、 执行权限 的逻辑,这里咱们以 AbstractQueuedSynchronizer 的实现 ReentrantLock 为例,简单分析一下,先看 tryAcquire 方法源码分析

protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前线程的重入次数 若是是 0 则表明第一次
    int c = getState();
    if (c == 0) {
        // 判断是否存在队列 && 能够获取到可执行权
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            // 设置独占模式同步器的拥有者 也就是是哪一个线程持有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是进入线程是 持有可执行权的线程 则作重入 + 1 操做
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
复制代码

tryAcquire 方法核心代码就是 判断执行权限 ,这里就不具体分析了,会在下一篇文章中进行ReentrantLock的源码分析,接下来重点看 acquireQueuedaddWaiter 方法。ui

private Node addWaiter(Node mode) {
    // 经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
  	Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
      	// 若是 tail 不是 null 则 设置 新建Node的前驱节点(prev) 指向 tail节点 反之 初始化同步队列
        if (oldTail != null) {
            // 设置 新建Node的前驱节点(prev) 指向 tail节点
            node.setPrevRelaxed(oldTail);
          	// 从新设置 tail 节点 指向 新建的Node节点
            // 白话就是 队列中的最后一个节点 == tail节点
            if (compareAndSetTail(oldTail, node)) {
                // 设置 未更改时的 tail节点 中 next 节点, 指向 新建Node节点
                oldTail.next = node;
                return node;
            }
        } else {
            // 初始化同步队列器
            initializeSyncQueue();
        }
    }
}
复制代码
// 若是未存在同步队列 则初始化同步队列
private final void initializeSyncQueue() {
    Node h;
  	// 设置 AQS head节点 为一个新建节点
    if (HEAD.compareAndSet(this, null, (h = new Node())))
      	// 赋值操做
        tail = h;
}
复制代码

addWaiterinitializeSyncQueue 方法中,核心就是新建 Node 节点并经过 acquireQueued 方法将节点加入到 AQS 中,接下来分析一下 addWaiter 具体作了什么this

  1. 经过构造方法建立新的Node节点,并经过入参 mode 指定Node节点的模式,共享或独占。固然这里是设置的独占模式。

  2. 循环操做 新建Node节点并将 新建节点tail 节点创建关系。首先判断 tail 是不是null,若是是则 步骤3,反之 步骤4

  3. 若是 tail 节点不为null, 首先将新建的 Node节点 中的 前驱节(prev) 点设置为当前的 tail 节点,而后经过 VarHandleAQStail 节点改成 新建的Node 节点,若是修改为功则将上一步 未更改时的 tail 节点 (也就是代码中的oldTail) 中的 next 指向 新建的Node节点 ,反之则可能由于并发操做致使 tial 节点已经被其余线程变动,须要再次循环操做直至成功。

  4. 若是 tial 节点是null, 则须要实例化同步对列,也就是 AQS , 经过调用 initializeSyncQueue 进行初始化操做,经过 VarHandle 设置 AQShead 指向一个新建节点 (new Node) , 而后将 head 节点的引用赋值给 tail 节点。这里注意一下,是将 head 节点的 引用 赋值给 tail 节点, 也就是这时候 head 节点 和 tail 节点是同时指向一块内存地址 , 这里的用意就是在新建队列的时候, head 节点和新建节点的 prev 节点要保持是同一个引用 ,由于在后续的判断中, 获取可执行权的条件就是 AQShead 节点是否等于当前节点的 prev 节点。

    由于 addWaiter 方法中是一个循环,在 建立队列后 须要将队列新建的Node节点作关联,因此还须要在执行一次 步骤3

addWaiter 方法分析完后,再来看一下 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {
    // 线程中断状态
    boolean interrupted = false;
    try {
        for (;;) {
          	// 获取经过 addWaiter 建立的Node方法
            final Node p = node.predecessor();
          	// 判断 新建的Node节点是否等于head节点 && 能够获取到 可执行权
            if (p == head && tryAcquire(arg)) {
              	// 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
                // 节点设置为当前线程建立的Node节点,能够保证head节点永远均可以和后续节点有关联关系
                setHead(node);
                // 设置 next 
                p.next = null; // help GC
                // 返回
                return interrupted;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 
            if (shouldParkAfterFailedAcquire(p, node))
                // 将线程 wait 而且线程被唤醒后 判断线程是否被中断
                // |= 操做等于 interrupted = interrupted | parkAndCheckInterrupt()
              	// |(按位或) 会将 | 两边的值进行二进制计算,运算规则一个为真即为真, 例如 1|1 = 1 或 1|0 = 1,
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
复制代码

acquireQueued 也是核心方法,在其中会对线程进行 LockSupport.park 进行控制,其实现方式是 循环 ,下面就具体分析一下

  1. 首先会获取 当前线程 所建立的 Node 节点中的 前置节点(prev)
  2. 判断 前置节点(prev) 是否等于 AQShead 节点 && 能够获取到 可执行权 ,若是这两个条件成立则看 步骤3 ,反之看 步骤4, 若是知足这两个条件,也就表明着 head 节点 所对应的线程 已经执行完成而且作了释放**(release方法)**操做。
  3. 若是 步骤2 条件成立,也就是 线程被唤醒后并获取到了可执行权,则将 head 节点设置为 当前线程建立的Node节点
  4. 若是 步骤2 条件不成立,则判断 Node节点所对应的线程的状态是否符合改成 wait 状态。这个逻辑在 shouldParkAfterFailedAcquire 方法中, 接下来看一下。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 若是前置节点的 waitStatus == Signal 也就是 == -1 
    // 若是是 知足 线程 wait 条件
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    // 若是状态 > 0 也就是1 也就是线程已经被中断了
    // 在这里就会判断 前置节点的前置节点 是否仍是被中断,若是是 循环继续判断前置节点, 
    // 若是不是 则将前置节点的next节点改成 入参的 node 节点 而后 返回false 继续循环判断 
    // 这里的做用就是 排除掉已经被中断的线程
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        // 不然设置 状态为 -1 等待唤醒状态 再次进来之后就会被 wait 
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}
复制代码

shouldParkAfterFailedAcquire 方法中主要就是判断节点所属的线程是否符规则,也就是更改成 wait 状态

  1. 判断当前线程节点前置节点waitStatus 是不是 SIGNAL,若是是知足条件,返回 true,线程将会 wait
  2. 判断当前线程节点前置节点waitStatus 是否大于 0, 也就是1 ,若是条件成立,则表明 当前线程节点前置节点 所对应的线程已经被中断了,须要从新指定当前线程节点的前置节点(prev),经过循环的方式找到前置节点的节点,若是依然被中断,则继续循环,直到找到未中断线程所对应的Node节点为止。若是条件不成立则将 waitStatus 状态改成 SIGNAL 返回false, 再经过 acquireQueued 方法中的循环在执行一次 。
  3. 这里要说一下为何要更改当前节点的 prev 节点中的 waitStatus 状态,是由于只有 前置节点(prev)waitStatus 等于 SIGNAL 也就是 -1 时,就表明当前线程新建的Node节点的线程处于等待状态在当前节点的前置节点(prev) 的线程释放了同步状态或被取消,将会通知当前节点,使当前节点的线程得以运行

到这里咱们整个的 acquire 方法就解析完了,接下来分享 release,有获取才有释放,会在release讲完后为你们分享一下 acquire 到 release的整个流程。

release(释放)

release 字面一次就是释放,释放经过 acquire 获取的独占模式,可让 AQS 后续节点所对应的线程能够获得执行权,下面就看一下 release 方法

public final boolean release(int arg) {
    // 步骤 1 2 3 
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

release 方法中首先会调用 tryRelease 方法,这里 tryRelease 方法将会有子类实现,先以 RenntrantLook 为例,这里就不展现代码了,就简单描述一下逻辑

  1. 首先会先用 state 减去 argstate 表明重入次数。
  2. 若是 步骤1 结果是0,则将 独占模式同步器的拥有者 改成null并返回true。
  3. 若是 步骤1 结果不是0, 则从新设置 state,返回false,表示还不能够释放。

接下来判断 AQShead 节点不是null而且 waitStatus 状态不等于0,表明 释放成功,而后进入 unparkSuccessor 方法,进行对下一个Node节点所对应的线程进行唤醒操做。

private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    // 若是head的waitStatus<0 则将head的waitStatus改成0
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    // 若是 head 节点的 next 节点 == null 或者 节点 的状态 大于0 也就是1 也就是 下一个节点所对应的线程被中断了
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 将会循环整个同步队列,从tail节点开始 往前循环,直到只找到 waitStatus <= 0 的Node节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 若是节点不是 null 则唤醒该节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码

接下来分析一下在 unparkSuccessor 方法中都作了什么

  1. 首先若是 head 的 waitStatus <0, 则将 headwaitStatus 改成0。
  2. 若是 head 节点的 next 节点等于null 或者 waitStatus 状态 大于0也就是1, 表示 head 节点所对应的 next 节点所对应的线程已经被中断了,将会循环整个同步队列,从 tail 节点开始往前循环,直到找到最前面的一个 waitStatus <= 0 的Node节点
  3. 若是 步骤2 条件不知足 则表明 head 的 next 节点不是null 或 waitStatus状态不等于1,调用 unpark 方法唤醒线程。

至此 release 方法就解析完成了,很简单,核心功能仅仅是若是符合规则,则调用 unpark 方法唤醒 AQS 队列中下一个节点所对应的线程。下面就分析一下 acquire 和 releae 整个流程。

总结 acquire 和 release

分析一次 acquirerelease 的总体流程

接下来分析 acquireInterruptibly 方法,acquireInterruptibly 方法和 acquire 实际上是同样的,只不过多判断了一下是否被中断。

acquireInterruptibly

acquireInterruptibly 方法就是 可中断的获取可执行权 ,具体流程和 acquire 类似。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
复制代码

acquireInterruptibly 方法中,首先会经过 Thread.interrupted() 方法判断线程是否被中断,如已经被中断,则抛出 InterruptedException, 反之则调用 tryAcquire 方法,判断是否 获取到执行权,若是未获取到则调用 doAcquireInterruptibly 方法进行建立 AQS新的Node节点,并将 新建的Node节点AQShead 节点进行关联。 到这里可能就会想到,这不是和 acquire 方法是同样的嘛,没错,就是同样。看一下源码

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 新建 Node 节点 并将节点 和 AQS 队列简历关联
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

看到源码是否是很熟悉,这不就是上边咱们分析过的 acquire 方法嘛,惟一和 acquire 方法不一样的就是,若是线程在被唤醒之后,也就是 head 节点的线程调用了 release 释放了可执行权,而且经过 LockSupport.park 方法唤醒了 head 的 next节点所属的线程时,headnext 节点所属的线程已经被中断了就会抛出 InterruptedException 异常。

这里就不进行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源码展现了,若是还不明白就看一下上边 acquire 方法的源码分析。

tryAcquireNanos

tryAcquireNanos 方法的含义就是 可超时的获取执行权 ,若是设置的 超时时间 到了,还未获取到可执行权,则直接返回 false 。这里的超时时间单位是 纳秒 ns1秒(s)=1000000000纳秒(ns)

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
复制代码

看到 tryAcquireNanos 方法会想到什么? 看到方法上的 throws InterruptedException 就一下想到了上面刚刚刚说的 acquireInterruptibly 方法,支持可中断的获取执行权。首先这里会先调用 tryAcquire 方法获取执行权,若是能够获取到执行权则直接返回,反之则调用 doAcquireNanos(arg, nanosTimeout) 方法进行 新建 Node 节点 并和 AQShead 节点进行关联,而且会将节点加入到 AQS的队列中而后将节点所属的线程放入等待队列中

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  // 判断超时时间 是否小于等于 0 若是这 则直接返回 false 
  if (nanosTimeout <= 0L)
        return false;
    // 使用当前时间的 纳秒 + 超时时间的纳秒 = 将来超时的超时时间,用来作parkNanos, 
    // 就至关于 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  
    //经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            // 判断 新建的Node节点是否等于head节点 && 能够获取到'可执行权'
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 设置 head 节点为 当前线程的新建的Node节点,也就是线程被唤醒后并获取到了可执行权,则将head
                // 节点设置为当前线程建立的Node节点,能够保证head节点永远均可以和后续节点有关联关系
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 判断计算过的 deadline 时间 - 当前时间 是否小于0 是则 超时时间已过,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 而且 纳秒必须大于 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            // 判断线程是否被中断 若是中断则抛出 InterruptedException 异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

若是在 acquirerelease 分析中理解其中原理是否是以为这里很简单,这里也不列举已经分析过的方法了,直接说出不一样点

  1. 增长超时时间,在这里使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就至关于 Object.wait(long timeoutMillis) 方法,等待的线程的状态会在超时时间失效从 wait 变为 run
  2. 线程被唤醒之后,也就是过了超时时间则会判断计算过的 deadline时间 - 当前时间 是否小于0, 若果是则表明超时时间已过,直接返回false,反之则继续执行。
  3. 支持中断 ,若是已中断则会抛出 InterruptedException 异常。

是否是很简单,读者要把重点放到 acquirerelease 上,其余的就很容易了。上面的内容均是获取的独占模式,下面来说解一下 共享模式。

acquireShared

public final void acquireShared(int arg) {
    // 经过调用 tryAcquireShared 方法获取可执行权,若是未获取到则调用 doAcquireShared
    // 方法进行 新建 Node节点 并和 AQS的 head 节点创建关系,
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
复制代码

经过 acquireShared 方法能够看到和 acquire 并无什么区别,获取可执行权的代码须要 自定义同步器 实现,在共享模式分析中就不对 ReentrantReadWriteLock 源码进行分析了 ,会在后面对 ReentrantLockReentrantReadWriteLock 进行源码分析,接下来看一下 doAcquireShared ,看它是否是和 acquireQueued 方法也是同样的逻辑呢?

private void doAcquireShared(int arg) {
  	// 获取经过 addWaiter 建立的Node方法
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            // 获取新建节点的前置节点
            final Node p = node.predecessor();
         	  // 判断 新建的Node节点是否等于head节点
            if (p == head) {
                // 若是上边的 p==head 须要在此判断是否能够获取到'可执行权'
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                  	// 若是获取到了可执行权
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 
            if (shouldParkAfterFailedAcquire(p, node))
                // 将线程 wait 而且线程被唤醒后 判断线程是否被中断
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}
复制代码

doAcquireShared 方法中,咱们看到,首先依然是调用 addWaiter 方法进行新建Node,这里就很少说,能够看一下上边的方法详解,doAcquireShared 也是核心方法,在其中会对线程进行 LockSupport.park 进行控制,其实现方式是 循环 ,下面就具体分析一下

  1. 首先会获取 当前线程 所建立的 Node 节点中的 前置节点(prev)
  2. 判断 前置节点(prev) 是否等于 AQShead 节点,若是条件成立则看 步骤3 ,反之看 步骤4, 若是知足条件,也就表明着 head 节点 所对应的线程 已经执行完成而且作了释放**(release方法)**操做。
  3. 若是 步骤2 条件成立,则再次判断 当前线程是否能够获取到可执行权 ,若是能够则设置 AQShead 节点为当前线程的 新建的Node节点, 反之则看 步骤3
  4. 若是 步骤2步骤3 条件不成立,则判断 Node节点所对应的线程的状态是否符合改成 wait 状态,也就是是否能够加入到等待队列中。这个逻辑在 shouldParkAfterFailedAcquire 方法中,能够看一下上边的方法详解。

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  	// 获取经过 addWaiter 建立的Node方法
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 获取新建节点的前置节点
            final Node p = node.predecessor();
            // 判断 新建的Node节点是否等于head节点
            if (p == head) {
                // 若是获取到了可执行权
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判断Node节点的线程是否符合被 wait && 将线程 wait 而且线程被唤醒后判断线程是否被中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

能够看到 acquireSharedInterruptiblyacquireShared 方法并无什么太大区别,惟一的区别就是在调用 parkAndCheckInterrupt 线程状态被 wait ,等到当前节点 prev 节点的所属线程调用了 release 方法后,唤醒当前节点所属线程时,若是当前线程被中断了会抛出 InterruptedException 异常。

tryAcquireSharedNanos

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
复制代码
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 若是超时时间 小于等于0 则直接 返回失败
    if (nanosTimeout <= 0L)
        return false;
    // 使用当前时间的 纳秒 + 超时时间的纳秒 = 将来超时的超时时间,用来作parkNanos, 
    // 就至关于 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  	
    //经过构造方法 新建 Node节点, 根据入参mode指定了 Node的模式,共享或独占。这里是共享
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 判断 新建的Node节点是否等于head节点
            final Node p = node.predecessor();
            if (p == head) {
                // 是否能够得到可执行权
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
          
            // 判断计算过的 deadline 时间 - 当前时间 是否小于或等于0 是则超时时间已过,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判断Node节点的线程是否符合被 wait ,在这里用的是 park 而且 纳秒必须大于 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
          	// // 判断线程是否被中断 若是中断则抛出 InterruptedException 异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码

有没有发现,doAcquireSharedNanos 方法和 doAcquireNanos 方法很类似呢,若是在 acquireShared 分析中理解其原理是否是以为这里很简单,这里也不列举已经分析过的方法了,直接说出不一样点

  1. 增长超时时间,在这里使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就至关于 Object.wait(long timeoutMillis) 方法,等待的线程的状态会在超时时间失效从 wait 变为 run
  2. 线程被唤醒之后,也就是过了超时时间则会判断计算过的 deadline时间 - 当前时间 是否小于0, 若果是则表明超时时间已过,直接返回false,反之则继续执行。
  3. 支持中断 ,若是已中断则会抛出 InterruptedException 异常。

是否是很简单,读者要把重点放到 acquireacquireShared 上,其余的就很容易了。

releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码
private void doReleaseShared() {
        /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 若是更新失败则循环
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒 head 节点的 next 节点所属的线程
                    unparkSuccessor(h);
                }
                // 若是更新失败则循环
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 若是 head 改变了则再次循环
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

tryReleaseShared 方法和 release 方法稍微有一点区别,下面咱们就具体分析一下

  1. 首先去尝试释放资源经过 tryReleaseShared 方法,若是释放成功了,就表明有资源空闲出来,那么就看 步骤2
  2. 调用doReleaseShared 去唤醒后续结点, 在 doReleaseShared 方法中采用了 loop,每一次循环的过程都是首先得到 head 节点,若是 head 结点不为空且不等于 tail 结点,那么先得到该节点的状态,若是是SIGNAL的状态,则表明它须要有后继结点去唤醒,首先将其的状态变为0(初始状态),而后经过 unparkSuccessor 方法唤醒后续节点所属的线程,若是结点状态一开始就是0,那么就给他转换成 PROPAGATE 状态,保证在后续获取资源的时候,还可以向后面传播。

至此咱们已经分析完了 AbstractQueuedSynchronizer 的源码,是否是很简单呢?最主要的仍是要理解AQS的总体流程,说白了AQS是依赖两大利器,也就是 VarHandle 和 LockSupport。

博客地址:lantaoblog.site

相关文章
相关标签/搜索