逐行分析AQS源码(3)——共享锁的获取与释放

前言

前面两篇咱们以ReentrantLock为例了解了AQS独占锁的获取释放,本篇咱们来看看共享锁。因为AQS对于共享锁与独占锁的实现框架比较相似,所以若是你搞定了前面的独占锁模式,则共享锁也就很容易弄懂了。java

系列文章目录node

共享锁与独占锁的区别

共享锁与独占锁最大的区别在于,独占锁是独占的,排他的,所以在独占锁中有一个exclusiveOwnerThread属性,用来记录当前持有锁的线程。当独占锁已经被某个线程持有时,其余线程只能等待它被释放后,才能去争锁,而且同一时刻只有一个线程能争锁成功。segmentfault

而对于共享锁而言,因为锁是能够被共享的,所以它能够被多个线程同时持有。换句话说,若是一个线程成功获取了共享锁,那么其余等待在这个共享锁上的线程就也能够尝试去获取锁,而且极有可能获取成功。并发

共享锁的实现和独占锁是对应的,咱们能够从下面这张表中看出:框架

独占锁 共享锁
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

能够看出,除了最后一个属于共享锁的doReleaseShared()方法没有对应外,其余的方法,独占锁和共享锁都是一一对应的。oop

事实上,其实与doReleaseShared()对应的独占锁的方法应当是unparkSuccessor(h),只是doReleaseShared()逻辑不只仅包含了unparkSuccessor(h),还包含了其余操做,这一点咱们下面分析源码的时候再看。性能

另外,尤为须要注意的是,在独占锁模式中,咱们只有在获取了独占锁的节点释放锁时,才会唤醒后继节点——这是合理的,由于独占锁只能被一个线程持有,若是它尚未被释放,就没有必要去唤醒它的后继节点。优化

然而,在共享锁模式下,当一个节点获取到了共享锁,咱们在获取成功后就能够唤醒后继节点了,而不须要等到该节点释放锁的时候,这是由于共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点均可以直接来获取。所以,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。 这一点也是doReleaseShared()方法与unparkSuccessor(h)方法没法直接对应的根本缘由所在。ui

共享锁的获取

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

咱们拿它和独占锁模式对比一下:this

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这二者的结构看上去彷佛有点差异,但事实上是同样的,只不过是共享锁模式下,将与addWaiter(Node.EXCLUSIVE)对应的addWaiter(Node.SHARED),以及selfInterrupt()操做所有移到了doAcquireShared方法内部,这一点咱们在下面分析doAcquireShared方法时就一目了然了。

不过这里先插一句,相对于独占的锁的tryAcquire(int arg)返回boolean类型的值,共享锁的tryAcquireShared(int acquires)返回的是一个整型值:

  • 若是该值小于0,则表明当前线程获取共享锁失败
  • 若是该值大于0,则表明当前线程获取共享锁成功,而且接下来其余线程尝试获取共享锁的行为极可能成功
  • 若是该值等于0,则表明当前线程获取共享锁成功,可是接下来其余线程尝试获取共享锁的行为会失败

所以,只要该返回值大于等于0,就表示获取共享锁成功。

acquireShared中的tryAcquireShared方法由具体的子类负责实现,这里咱们暂且不表。

接下来咱们看看doAcquireShared方法,它对应于独占锁的acquireQueued,二者其实很相似,咱们把它们相同的部分注释掉,只看不一样的部分:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    /*boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();*/
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            /*if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }*/
}

关于上面的if部分,独占锁对应的acquireQueued方法为:

if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

所以,综合来看,这二者的逻辑仅有两处不一样:

  1. addWaiter(Node.EXCLUSIVE) -> addWaiter(Node.SHARED)
  2. setHead(node) -> setHeadAndPropagate(node, r)

这里第一点不一样就是独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE),而共享锁调用的是addWaiter(Node.SHARED),代表了该节点处于共享模式,这两种模式的定义为:

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

该模式被赋值给了节点的nextWaiter属性:

Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

咱们知道,在条件队列中,nextWaiter是指向条件队列中的下一个节点的,它将条件队列中的节点串起来,构成了单链表。可是在sync queue队列中,咱们只用prev,next属性来串联节点,造成双向链表,nextWaiter属性在这里只起到一个标记做用,不会串联节点,这里不要被Node SHARED = new Node()所指向的空节点迷惑,这个空节点并不属于sync queue,不表明任何线程,它只起到标记做用,仅仅用做判断节点是否处于共享模式的依据:

// Node#isShard()
final boolean isShared() {
    return nextWaiter == SHARED;
}

这里的第二点不一样就在于获取锁成功后的行为,对于独占锁而言,是直接调用了setHead(node)方法,而共享锁调用的是setHeadAndPropagate(node, r)

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

在该方法内部咱们不只调用了setHead(node),还在必定条件下调用了doReleaseShared()来唤醒后继的节点。这是由于在共享锁模式下,锁能够被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就能够直接通知后继节点来拿锁,而没必要等待锁被释放的时候再通知。

关于这个doReleaseShared方法,咱们到下面分析锁释放的时候再看。

共享锁的释放

咱们使用releaseShared(int arg)方法来释放共享锁:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法对应于独占锁的release(int arg)方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在独占锁模式下,因为头节点就是持有独占锁的节点,在它释放独占锁后,若是发现本身的waitStatus不为0,则它将负责唤醒它的后继节点。

在共享锁模式下,头节点就是持有共享锁的节点,在它释放共享锁后,它也应该唤醒它的后继节点,可是值得注意的是,咱们在以前的setHeadAndPropagate方法中可能已经调用过该方法了,也就是说它可能会被同一个头节点调用两次,也有可能在咱们从releaseShared方法中调用它时,当前的头节点已经易主了,下面咱们就来详细看看这个方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

该方法多是共享锁模式最难理解的方法了,在看该方法时,咱们须要明确如下几个问题:

(1) 该方法有几处调用?

该方法有两处调用,一处在acquireShared方法的末尾,当线程成功获取到共享锁后,在必定条件下调用该方法;一处在releaseShared方法中,当线程释放共享锁的时候调用。

(2) 调用该方法的线程是谁?

在独占锁中,只有获取了锁的线程才能调用release释放锁,所以调用unparkSuccessor(h)唤醒后继节点的必然是持有锁的线程,该线程可看作是当前的头节点(虽然在setHead方法中已经将头节点的thread属性设为了null,可是这个头节点曾经表明的就是这个线程)

在共享锁中,持有共享锁的线程能够有多个,这些线程均可以调用releaseShared方法释放锁;而这些线程想要得到共享锁,则它们必然曾经成为过头节点,或者就是如今的头节点。所以,若是是在releaseShared方法中调用的doReleaseShared,可能此时调用方法的线程已经不是头节点所表明的线程了,头节点可能已经被易主好几回了。

(3) 调用该方法的目的是什么?

不管是在acquireShared中调用,仍是在releaseShared方法中调用,该方法的目的都是在当前共享锁是可获取的状态时,唤醒head节点的下一个节点。这一点看上去和独占锁彷佛同样,可是它们的一个重要的差异是——在共享锁中,当头节点发生变化时,是会回到循环中再当即唤醒head节点的下一个节点的。也就是说,在当前节点完成唤醒后继节点的任务以后将要退出时,若是发现被唤醒后继节点已经成为了新的头节点,则会当即触发唤醒head节点的下一个节点的操做,如此周而复始。

(4) 退出该方法的条件是什么

该方法是一个自旋操做(for(;;)),退出该方法的惟一办法是走最后的break语句:

if (h == head)   // loop if head changed
    break;

即,只有在当前head没有易主时,才会退出,不然继续循环。
这个怎么理解呢?
为了说明问题,这里咱们假设目前sync queue队列中依次排列有

dummy node -> A -> B -> C -> D

如今假设A已经拿到了共享锁,则它将成为新的dummy node,

dummy node (A) -> B -> C -> D

此时,A线程会调用doReleaseShared,咱们写作doReleaseShared[A],在该方法中将唤醒后继的节点B,它很快得到了共享锁,成为了新的头节点:

dummy node (B) -> C -> D

此时,B线程也会调用doReleaseShared,咱们写作doReleaseShared[B],在该方法中将唤醒后继的节点C,可是别忘了,在doReleaseShared[B]调用的时候,doReleaseShared[A]还没运行结束呢,当它运行到if(h == head)时,发现头节点如今已经变了,因此它将继续回到for循环中,与此同时,doReleaseShared[B]也没闲着,它在执行过程当中也进入到了for循环中。。。

因而可知,咱们这里造成了一个doReleaseShared的“调用风暴”,大量的线程在同时执行doReleaseShared,这极大地加速了唤醒后继节点的速度,提高了效率,同时该方法内部的CAS操做又保证了多个线程同时唤醒一个节点时,只有一个线程能操做成功。

那若是这里doReleaseShared[A]执行结束时,节点B尚未成为新的头节点时,doReleaseShared[A]方法不就退出了吗?是的,但即便这样也没有关系,由于它已经成功唤醒了线程B,即便doReleaseShared[A]退出了,当B线程成为新的头节点时,doReleaseShared[B]就开始执行了,它也会负责唤醒后继节点的,这样即便变成这种每一个节点只唤醒本身后继节点的模式,从功能上讲,最终也能够实现唤醒全部等待共享锁的节点的目的,只是效率上没有以前的“调用风暴”快。

由此咱们知道,这里的“调用风暴”事实上是一个优化操做,由于在咱们执行到该方法的末尾的时候,unparkSuccessor基本上已经被调用过了,而因为如今是共享锁模式,因此被唤醒的后继节点极有可能已经获取到了共享锁,成为了新的head节点,当它成为新的head节点后,它可能仍是要在setHeadAndPropagate方法中调用doReleaseShared唤醒它的后继节点。

明确了上面几个问题后,咱们再来详细分析这个方法,它最重要的部分就是下面这两个if语句:

if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
        continue;            // loop to recheck cases
    unparkSuccessor(h);
}
else if (ws == 0 &&
         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;                // loop on failed CAS

第一个if很好理解,若是当前ws值为Node.SIGNAL,则说明后继节点须要唤醒,这里采用CAS操做先将Node.SIGNAL状态改成0,这是由于前面讲过,可能有大量的doReleaseShared方法在同时执行,咱们只须要其中一个执行unparkSuccessor(h)操做就好了,这里经过CAS操做保证了unparkSuccessor(h)只被执行一次。

比较难理解的是第二个else if,首先咱们要弄清楚ws啥时候为0,一种是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)会致使ws为0,可是很明显,若是是由于这个缘由,则它是不会进入到else if语句块的。因此这里的ws为0是指当前队列的最后一个节点成为了头节点。为何是最后一个节点呢,由于每次新的节点加进来,在挂起前必定会将本身的前驱节点的waitStatus修改为Node.SIGNAL的。(对这一点不理解的详细看这里)

其次,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这个操做何时会失败?既然这个操做失败,说明就在执行这个操做的瞬间,ws此时已经不为0了,说明有新的节点入队了,ws的值被改成了Node.SIGNAL,此时咱们将调用continue,在下次循环中直接将这个刚刚新入队但准备挂起的线程唤醒。

其实,若是咱们再结合外部的总体条件,就很容易理解这种状况所针对的场景,不要忘了,进入上面这段还有一个条件是

if (h != null && h != tail)

它处于最外层:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 注意这里说明了队列至少有两个节点
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        if (h == head)
            break;
    }
}

这个条件意味着,队列中至少有两个节点。

结合上面的分析,咱们能够看出,这个

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

描述了一个极其严苛且短暂的状态:

  1. 首先,大前提是队列里至少有两个节点
  2. 其次,要执行到else if语句,说明咱们跳过了前面的if条件,说明头节点是刚刚成为头节点的,它的waitStatus值还为0,尾节点是在这以后刚刚加进来的,它须要执行shouldParkAfterFailedAcquire,将它的前驱节点(即头节点)的waitStatus值修改成Node.SIGNAL可是目前这个修改操做尚未来的及执行。这种状况使咱们得以进入else if的前半部分else if (ws == 0 &&
  3. 紧接着,要知足!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这一条件,说明此时头节点的waitStatus已经不是0了,这说明以前那个没有来得及执行的 shouldParkAfterFailedAcquire将前驱节点的的waitStatus值修改成Node.SIGNAL的操做如今执行完了。

因而可知,else if&& 链接了两个不一致的状态,分别对应了shouldParkAfterFailedAcquirecompareAndSetWaitStatus(pred, ws, Node.SIGNAL)执行成功前和执行成功后,由于doReleaseShared
shouldParkAfterFailedAcquire是能够并发执行的,因此这一条件是有可能知足的,只是知足的条件很是严苛,可能只是一瞬间的事。

这里不得不说,若是以上的分析没有错的话,那做者对于AQS性能的优化已经到了“使人发指”的地步!!!虽然说这种短暂的瞬间确实存在,也确实有必要从新回到for循环中再次去唤醒后继节点,可是这种优化也太太太~~~过于精细了吧!

咱们来看看若是不加入这个精细的控制条件有什么后果呢?

这里咱们复习一下新节点入队的过程,前面说过,在发现新节点的前驱不是head节点的时候,它将调用shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

因为前驱节点的ws值如今还为0,新节点将会把它改成Node.SIGNAL,

但修改后,该方法返回的是false,也就是说线程不会当即挂起,而是回到上层再尝试一次抢锁:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire的返回处
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当咱们再次回到for(;;)循环中,因为此时当前节点的前驱节点已经成为了新的head,因此它能够参与抢锁,因为它抢的是共享锁,因此大几率它是抢的到的,因此极有可能它不会被挂起。这有可能致使在上面的doReleaseShared调用unparkSuccessor方法unpark了一个并无被park的线程。然而,这一操做是被容许的,当咱们unpark一个并无被park的线程时,该线程在下一次调用park方法时就不会被挂起,而这一行为是符合咱们的场景的——由于当前的共享锁处于可获取的状态,后继的线程应该直接来获取锁,不该该被挂起。

事实上,我我的认为:

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;  // loop on failed CAS

这一段其实也能够省略,固然有了这一段确定会加速唤醒后继节点的过程,做者针对上面那种极其短暂的状况进行了优化能够说是和它以前“调用风暴”的设计一脉相承,可能也正是因为做者对于性能的极致追求才使得AQS如此之优秀吧。

总结

  • 共享锁的调用框架和独占锁很类似,它们最大的不一样在于获取锁的逻辑——共享锁能够被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
  • 因为共享锁同一时刻能够被多个线程持有,所以当头节点获取到共享锁时,能够当即唤醒后继节点来争锁,而没必要等到释放锁的时候。所以,共享锁触发唤醒后继节点的行为可能有两处,一处在当前节点成功得到共享锁后,一处在当前节点释放共享锁后。

(完)

系列文章目录

相关文章
相关标签/搜索