前面两篇咱们以ReentrantLock为例了解了AQS独占锁的获取与释放,本篇咱们来看看共享锁。因为AQS对于共享锁与独占锁的实现框架比较相似,所以若是你搞定了前面的独占锁模式,则共享锁也就很容易弄懂了。java
系列文章目录node
共享锁与独占锁最大的区别在于,独占锁是独占的,排他的,所以在独占锁中有一个exclusiveOwnerThread
属性,用来记录当前持有锁的线程。当独占锁已经被某个线程持有时,其余线程只能等待它被释放后,才能去争锁,而且同一时刻只有一个线程能争锁成功。segmentfault
而对于共享锁而言,因为锁是能够被共享的,所以它能够被多个线程同时持有。换句话说,若是一个线程成功获取了共享锁,那么其余等待在这个共享锁上的线程就也能够尝试去获取锁,而且极有可能获取成功。并发
共享锁的实现和独占锁是对应的,咱们能够从下面这张表中看出:框架
独占锁 | 共享锁 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
能够看出,除了最后一个属于共享锁的doReleaseShared()
方法没有对应外,其余的方法,独占锁和共享锁都是一一对应的。oop
事实上,其实与doReleaseShared()
对应的独占锁的方法应当是unparkSuccessor(h)
,只是doReleaseShared()
逻辑不只仅包含了unparkSuccessor(h)
,还包含了其余操做,这一点咱们下面分析源码的时候再看。性能
另外,尤为须要注意的是,在独占锁模式中,咱们只有在获取了独占锁的节点释放锁时,才会唤醒后继节点——这是合理的,由于独占锁只能被一个线程持有,若是它尚未被释放,就没有必要去唤醒它的后继节点。优化
然而,在共享锁模式下,当一个节点获取到了共享锁,咱们在获取成功后就能够唤醒后继节点了,而不须要等到该节点释放锁的时候,这是由于共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点均可以直接来获取。所以,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。 这一点也是doReleaseShared()
方法与unparkSuccessor(h)
方法没法直接对应的根本缘由所在。ui
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
咱们拿它和独占锁模式对比一下:this
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这二者的结构看上去彷佛有点差异,但事实上是同样的,只不过是共享锁模式下,将与addWaiter(Node.EXCLUSIVE)
对应的addWaiter(Node.SHARED)
,以及selfInterrupt()
操做所有移到了doAcquireShared
方法内部,这一点咱们在下面分析doAcquireShared
方法时就一目了然了。
不过这里先插一句,相对于独占的锁的tryAcquire(int arg)
返回boolean类型的值,共享锁的tryAcquireShared(int acquires)
返回的是一个整型值:
所以,只要该返回值大于等于0,就表示获取共享锁成功。
acquireShared
中的tryAcquireShared
方法由具体的子类负责实现,这里咱们暂且不表。
接下来咱们看看doAcquireShared
方法,它对应于独占锁的acquireQueued
,二者其实很相似,咱们把它们相同的部分注释掉,只看不一样的部分:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); /*boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();*/ if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } /*if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }*/ }
关于上面的if部分,独占锁对应的acquireQueued
方法为:
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
所以,综合来看,这二者的逻辑仅有两处不一样:
addWaiter(Node.EXCLUSIVE)
-> addWaiter(Node.SHARED)
setHead(node)
-> setHeadAndPropagate(node, r)
这里第一点不一样就是独占锁的acquireQueued
调用的是addWaiter(Node.EXCLUSIVE)
,而共享锁调用的是addWaiter(Node.SHARED)
,代表了该节点处于共享模式,这两种模式的定义为:
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
该模式被赋值给了节点的nextWaiter
属性:
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
咱们知道,在条件队列中,nextWaiter
是指向条件队列中的下一个节点的,它将条件队列中的节点串起来,构成了单链表。可是在sync queue
队列中,咱们只用prev
,next
属性来串联节点,造成双向链表,nextWaiter
属性在这里只起到一个标记做用,不会串联节点,这里不要被Node SHARED = new Node()
所指向的空节点迷惑,这个空节点并不属于sync queue
,不表明任何线程,它只起到标记做用,仅仅用做判断节点是否处于共享模式的依据:
// Node#isShard() final boolean isShared() { return nextWaiter == SHARED; }
这里的第二点不一样就在于获取锁成功后的行为,对于独占锁而言,是直接调用了setHead(node)
方法,而共享锁调用的是setHeadAndPropagate(node, r)
:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
在该方法内部咱们不只调用了setHead(node)
,还在必定条件下调用了doReleaseShared()
来唤醒后继的节点。这是由于在共享锁模式下,锁能够被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就能够直接通知后继节点来拿锁,而没必要等待锁被释放的时候再通知。
关于这个doReleaseShared
方法,咱们到下面分析锁释放的时候再看。
咱们使用releaseShared(int arg)
方法来释放共享锁:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
该方法对应于独占锁的release(int arg)
方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在独占锁模式下,因为头节点就是持有独占锁的节点,在它释放独占锁后,若是发现本身的waitStatus不为0,则它将负责唤醒它的后继节点。
在共享锁模式下,头节点就是持有共享锁的节点,在它释放共享锁后,它也应该唤醒它的后继节点,可是值得注意的是,咱们在以前的setHeadAndPropagate
方法中可能已经调用过该方法了,也就是说它可能会被同一个头节点调用两次,也有可能在咱们从releaseShared
方法中调用它时,当前的头节点已经易主了,下面咱们就来详细看看这个方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
该方法多是共享锁模式最难理解的方法了,在看该方法时,咱们须要明确如下几个问题:
(1) 该方法有几处调用?
该方法有两处调用,一处在acquireShared
方法的末尾,当线程成功获取到共享锁后,在必定条件下调用该方法;一处在releaseShared
方法中,当线程释放共享锁的时候调用。
(2) 调用该方法的线程是谁?
在独占锁中,只有获取了锁的线程才能调用release释放锁,所以调用unparkSuccessor(h)唤醒后继节点的必然是持有锁的线程,该线程可看作是当前的头节点(虽然在setHead方法中已经将头节点的thread属性设为了null,可是这个头节点曾经表明的就是这个线程)
在共享锁中,持有共享锁的线程能够有多个,这些线程均可以调用releaseShared
方法释放锁;而这些线程想要得到共享锁,则它们必然曾经成为过头节点,或者就是如今的头节点。所以,若是是在releaseShared
方法中调用的doReleaseShared
,可能此时调用方法的线程已经不是头节点所表明的线程了,头节点可能已经被易主好几回了。
(3) 调用该方法的目的是什么?
不管是在acquireShared
中调用,仍是在releaseShared
方法中调用,该方法的目的都是在当前共享锁是可获取的状态时,唤醒head节点的下一个节点。这一点看上去和独占锁彷佛同样,可是它们的一个重要的差异是——在共享锁中,当头节点发生变化时,是会回到循环中再当即唤醒head节点的下一个节点的。也就是说,在当前节点完成唤醒后继节点的任务以后将要退出时,若是发现被唤醒后继节点已经成为了新的头节点,则会当即触发唤醒head节点的下一个节点的操做,如此周而复始。
(4) 退出该方法的条件是什么
该方法是一个自旋操做(for(;;)
),退出该方法的惟一办法是走最后的break语句:
if (h == head) // loop if head changed break;
即,只有在当前head没有易主时,才会退出,不然继续循环。
这个怎么理解呢?
为了说明问题,这里咱们假设目前sync queue队列中依次排列有
dummy node -> A -> B -> C -> D
如今假设A已经拿到了共享锁,则它将成为新的dummy node,
dummy node (A) -> B -> C -> D
此时,A线程会调用doReleaseShared,咱们写作doReleaseShared[A]
,在该方法中将唤醒后继的节点B,它很快得到了共享锁,成为了新的头节点:
dummy node (B) -> C -> D
此时,B线程也会调用doReleaseShared,咱们写作doReleaseShared[B]
,在该方法中将唤醒后继的节点C,可是别忘了,在doReleaseShared[B]
调用的时候,doReleaseShared[A]
还没运行结束呢,当它运行到if(h == head)
时,发现头节点如今已经变了,因此它将继续回到for循环中,与此同时,doReleaseShared[B]
也没闲着,它在执行过程当中也进入到了for循环中。。。
因而可知,咱们这里造成了一个doReleaseShared的“调用风暴”,大量的线程在同时执行doReleaseShared,这极大地加速了唤醒后继节点的速度,提高了效率,同时该方法内部的CAS操做又保证了多个线程同时唤醒一个节点时,只有一个线程能操做成功。
那若是这里doReleaseShared[A]
执行结束时,节点B尚未成为新的头节点时,doReleaseShared[A]
方法不就退出了吗?是的,但即便这样也没有关系,由于它已经成功唤醒了线程B,即便doReleaseShared[A]
退出了,当B线程成为新的头节点时,doReleaseShared[B]
就开始执行了,它也会负责唤醒后继节点的,这样即便变成这种每一个节点只唤醒本身后继节点的模式,从功能上讲,最终也能够实现唤醒全部等待共享锁的节点的目的,只是效率上没有以前的“调用风暴”快。
由此咱们知道,这里的“调用风暴”事实上是一个优化操做,由于在咱们执行到该方法的末尾的时候,unparkSuccessor
基本上已经被调用过了,而因为如今是共享锁模式,因此被唤醒的后继节点极有可能已经获取到了共享锁,成为了新的head节点,当它成为新的head节点后,它可能仍是要在setHeadAndPropagate
方法中调用doReleaseShared
唤醒它的后继节点。
明确了上面几个问题后,咱们再来详细分析这个方法,它最重要的部分就是下面这两个if语句:
if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
第一个if很好理解,若是当前ws值为Node.SIGNAL,则说明后继节点须要唤醒,这里采用CAS操做先将Node.SIGNAL状态改成0,这是由于前面讲过,可能有大量的doReleaseShared方法在同时执行,咱们只须要其中一个执行unparkSuccessor(h)
操做就好了,这里经过CAS操做保证了unparkSuccessor(h)
只被执行一次。
比较难理解的是第二个else if,首先咱们要弄清楚ws啥时候为0,一种是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)
会致使ws为0,可是很明显,若是是由于这个缘由,则它是不会进入到else if语句块的。因此这里的ws为0是指当前队列的最后一个节点成为了头节点。为何是最后一个节点呢,由于每次新的节点加进来,在挂起前必定会将本身的前驱节点的waitStatus修改为Node.SIGNAL的。(对这一点不理解的详细看这里)
其次,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
这个操做何时会失败?既然这个操做失败,说明就在执行这个操做的瞬间,ws此时已经不为0了,说明有新的节点入队了,ws的值被改成了Node.SIGNAL,此时咱们将调用continue
,在下次循环中直接将这个刚刚新入队但准备挂起的线程唤醒。
其实,若是咱们再结合外部的总体条件,就很容易理解这种状况所针对的场景,不要忘了,进入上面这段还有一个条件是
if (h != null && h != tail)
它处于最外层:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 注意这里说明了队列至少有两个节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
这个条件意味着,队列中至少有两个节点。
结合上面的分析,咱们能够看出,这个
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
描述了一个极其严苛且短暂的状态:
else if
语句,说明咱们跳过了前面的if条件,说明头节点是刚刚成为头节点的,它的waitStatus值还为0,尾节点是在这以后刚刚加进来的,它须要执行shouldParkAfterFailedAcquire
,将它的前驱节点(即头节点)的waitStatus值修改成Node.SIGNAL
,可是目前这个修改操做尚未来的及执行。这种状况使咱们得以进入else if的前半部分else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
这一条件,说明此时头节点的waitStatus
已经不是0了,这说明以前那个没有来得及执行的 在shouldParkAfterFailedAcquire
将前驱节点的的waitStatus值修改成Node.SIGNAL
的操做如今执行完了。因而可知,else if
的 &&
链接了两个不一致的状态,分别对应了shouldParkAfterFailedAcquire
的compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
执行成功前和执行成功后,由于doReleaseShared
和shouldParkAfterFailedAcquire
是能够并发执行的,因此这一条件是有可能知足的,只是知足的条件很是严苛,可能只是一瞬间的事。
这里不得不说,若是以上的分析没有错的话,那做者对于AQS性能的优化已经到了“使人发指”的地步!!!虽然说这种短暂的瞬间确实存在,也确实有必要从新回到for循环中再次去唤醒后继节点,可是这种优化也太太太~~~过于精细了吧!
咱们来看看若是不加入这个精细的控制条件有什么后果呢?
这里咱们复习一下新节点入队的过程,前面说过,在发现新节点的前驱不是head节点的时候,它将调用shouldParkAfterFailedAcquire
:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
因为前驱节点的ws值如今还为0,新节点将会把它改成Node.SIGNAL,
但修改后,该方法返回的是false,也就是说线程不会当即挂起,而是回到上层再尝试一次抢锁:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // shouldParkAfterFailedAcquire的返回处 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
当咱们再次回到for(;;)
循环中,因为此时当前节点的前驱节点已经成为了新的head,因此它能够参与抢锁,因为它抢的是共享锁,因此大几率它是抢的到的,因此极有可能它不会被挂起。这有可能致使在上面的doReleaseShared
调用unparkSuccessor
方法unpark
了一个并无被park
的线程。然而,这一操做是被容许的,当咱们unpark
一个并无被park
的线程时,该线程在下一次调用park
方法时就不会被挂起,而这一行为是符合咱们的场景的——由于当前的共享锁处于可获取的状态,后继的线程应该直接来获取锁,不该该被挂起。
事实上,我我的认为:
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
这一段其实也能够省略,固然有了这一段确定会加速唤醒后继节点的过程,做者针对上面那种极其短暂的状况进行了优化能够说是和它以前“调用风暴”的设计一脉相承,可能也正是因为做者对于性能的极致追求才使得AQS如此之优秀吧。
(完)