JDK源码分析-AbstractQueuedSynchronizer(3)

概述node


前文「JDK源码分析-AbstractQueuedSynchronizer(2)」分析了 AQS 在独占模式下获取资源的流程,本文分析共享模式下的相关操做。并发


其实两者的操做大部分是相似的,理解了前面对独占模式的分析,再分析共享模式就相对容易了。app


共享模式工具


方法概述oop


与独占模式相似,共享模式下也有与之相似的相应操做,分别以下:源码分析


1. acquireShared(int arg): 以共享模式获取资源,忽略中断;flex


2. acquireSharedInterruptibly(int arg): 以共享模式获取资源,响应中断;ui


3. tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取资源,响应中断,且有超时等待;this


4. releaseShared(int arg): 释放资源,唤醒后继节点,并确保传播url


它们的操做与独占模式也比较相似,下面具体分析。


方法分析


1. 共享模式获取资源(忽略中断)


acquireShared:

public final void acquireShared(int arg) {// 返回值小于 0,表示获取失败    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}

// 尝试以共享模式获取资源(返回值为 int 类型)protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}

与独占模式的 tryAcquire 方法相似,tryAcquireShared 方法在 AQS 中也抛出异常,由子类实现其逻辑。


不一样的地方在于,tryAcquire 方法的返回结果是 boolean 类型,表示获取成功与否;而 tryAcquireShared 的返回结果是 int 类型,分别为:

1) 负数:表示获取失败;

2) 0:表示获取成功,但后续共享模式的获取会失败;

3) 正数:表示获取成功,后续共享模式的获取可能会成功(须要进行检测)。


tryAcquireShared 获取成功,则直接返回;不然执行 doAcquireShared 方法:

private void doAcquireShared(int arg) {    // 把当前线程封装成共享模式的 Node 节点,插入主队列末尾    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        // 中断标志位        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            // 若前驱节点为头节点,则尝试获取资源            if (p == head) {                int r = tryAcquireShared(arg);                // 这里表示当前线程成功获取到了资源                if (r >= 0) {                    // 设置头节点,并传播状态(注意这里与独占模式不一样)                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            // 是否应该休眠(与独占模式相同,再也不赘述)            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            // 取消操做(与独占模式相同)            cancelAcquire(node);    }}

doAcquireShared 方法会把当前线程封装成一个共享模式(SHARED)的节点,并插入主队列末尾。addWaiter(Node mode) 方法前文已经分析过,再也不赘述。


该方法与 acquireQueued 方法的区别在于 setHeadAndPropagate 方法,把当前节点设置为头节点以后,还会有传播(propagate)行为:

private void setHeadAndPropagate(Node node, int propagate) {    // 记录旧的头节点    Node h = head; // Record old head for check below    // 将 node 设置为头节点    setHead(node);    /*     * Try to signal next queued node if:     *   Propagation was indicated by caller,     *     or was recorded (as h.waitStatus either before     *     or after setHead) by a previous operation     *     (note: this uses sign-check of waitStatus because     *      PROPAGATE status may transition to SIGNAL.)     * and     *   The next node is waiting in shared mode,     *     or we don't know, because it appears null     *     * The conservatism in both of these checks may cause     * unnecessary wake-ups, but>     * racing acquires/releases, so most need signals now or soon     * anyway.     */    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        // 后继节点为空或共享模式唤醒        if (s == null || s.isShared())            doReleaseShared();    }}

doReleaseShared:

private void doReleaseShared() {    /*     * Ensure that a release propagates, even if there are other     * in-progress acquires/releases.  This proceeds in the usual     * way of trying to unparkSuccessor of head if it needs     * signal. But if it does not, status is set to PROPAGATE to     * ensure that upon release, propagation continues.     * Additionally, we must loop in case a new node is added     * while we are doing this. Also, unlike other uses of     * unparkSuccessor, we need to know if CAS to reset status     * fails, if so rechecking.     */    for (;;) {        // 这里的头节点已是上面设置后的头节点了        Node h = head;        // 因为该方法有两个入口(setHeadAndPropagate 和 releaseShared),需考虑并发控制        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                // 唤醒后继节点                unparkSuccessor(h);            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop>        }        // 若头节点不变,则跳出循环;不然继续循环        if (h == head)                   // loop if head changed            break;    }}

该方法与独占模式下的获取方法 acquire 大致类似,不一样在于该方法中,节点获取资源后会传播状态,即,有可能会继续唤醒后继节点。值得注意的是:该方法有两个入口 setHeadAndPropagate 和 releaseShared,可能有多个线程操做,需考虑并发控制。


此外,本人对于将节点设置为 PROPAGATE 状态的理解还不是很清晰,网上说法也不止一种,待后续研究明白再补充。


2. 以共享模式获取资源(响应中断)


该方法与 acquireShared 相似:

public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}

tryAcquireShared 方法前面已分析,若获取资源失败,则会执行 doAcquireSharedInterruptly 方法:

private void doAcquireSharedInterruptibly(int arg)    throws InterruptedException {    // 把当前线程封装成共享模式节点,并插入主队列    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return;                }            }            // 与 doAcquireShared 相比,区别在于这里抛出了异常            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

从代码能够看到,acquireSharedInterruptibly 方法与 acquireShared 方法几乎彻底同样,不一样之处仅在于前者会抛出 InterruptedException 异常响应中断;然后者仅记录标志位,获取结束后才响应。


3. 以共享模式获取资源(响应中断,且有超时)


代码以下(该方法可与前文独占模式下的超时获取方法比较分析)

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    return tryAcquireShared(arg) >= 0 ||        doAcquireSharedNanos(arg, nanosTimeout);}

doAcquireSharedNanos: 

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (nanosTimeout <= 0L)        return false;    final long deadline = System.nanoTime() + nanosTimeout;    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return true;                }            }            nanosTimeout = deadline - System.nanoTime();            if (nanosTimeout <= 0L)                return false;            if (shouldParkAfterFailedAcquire(p, node) &&                nanosTimeout > spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            if (Thread.interrupted())                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

该方法可与独占模式下的超时等待方法 tryAcquireNanos(int arg, long nanosTimeout) 进行对比,两者操做基本一致,再也不详细分析。


4. 释放资源,唤醒节点,传播状态


以下:

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

tryReleaseShared:

protected boolean tryReleaseShared(int arg) {    throw new UnsupportedOperationException();}

doReleaseShared() 方法前面已经分析过了。本方法与独占模式的 release 方法相似,不一样的地方在于“传播”二字。


场景分析


为了便于理解独占模式和共享模式下队列和节点的状态,下面简要举例分析。

场景以下:有 T0~T4 共 5 个线程按前后顺序获取资源,其中 T2 和 T3 为共享模式,其余均为独占模式。


就此场景分析:T0 先获取到资源(假设占用时间较长),然后 T1~T4 再获取则失败,会依次进入主队列。此时主队列中各个节点的状态示意图以下:


以后,T0 操做完毕并释放资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 方法的循环中继续获取资源,这时会获取成功,并将 T1 设置为头节点(T 被移除)。此时主队列节点示意图以下:

此时,T1 获取到资源并进行相关操做。


然后,T1 操做完释放资源,并唤醒下一个节点 T2,T2(共享模式) 继续从 doAcquireShared(int) 方法的循环中执行。此时 T2 获取资源成功,将自身设为头节点(T1 被移除),因为后继节点 T3 也是共享模式,所以 T1 会继续唤醒 T3;T3 唤醒后的操做与 T2 相同,但后继节点 T4 不是共享模式,所以再也不继续唤醒。此时队列节点状态示意图以下:

此时,T2 和 T3 同时获取到资源。


以后,当两者都释放资源后会唤醒 T4:

T4 获取资源的与 T1 相似。


PS: 该场景仅供参考,只为便于理解,如有不当之处敬请指正。


小结


本文分析了以共享模式获取资源的三种方式,以及释放资源的操做。分别为:


1. acquireShared: 共享模式获取资源,忽略中断;

2. acquireSharedInterruptibly: 共享模式获取资源,响应中断;

3. tryAcquireSharedNanos: 共享模式获取资源,响应中断,有超时;

4. releaseShared: 释放资源,唤醒后继节点,并确保传播。


并简要分析一个场景下主队列中各个节点的状态。此外,AQS 中还有嵌套类 ConditionObject 及条件队列的相关操做,后面涉及到的时候再进行分析。


单独去分析 AQS 的源码比较枯燥,后文会结合 ReentrantLock、CountdownLatch 等经常使用并发工具类的源码进行分析。


上述解析是参考其余资料及我的理解,如有不当之处欢迎指正。


相关阅读:

JDK源码分析-AbstractQueuedSynchronizer(2)

JDK源码分析-AbstractQueuedSynchronizer(1)



Stay hungry, stay foolish.

相关文章
相关标签/搜索