本文分析一下CountDownLatch是如何运用AQS的node
CountDownLatch顾名思义它是一个Latch(门闩),它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当state为0时,锁就会被释放,凡是以前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。app
public static void main(String[] args) throws InterruptedException{ int threadSize = 3; CountDownLatch doneSignal = new CountDownLatch(threadSize); for (int i = 1; i <= threadSize; i++) { final int threadNum = i; new Thread(() -> { System.out.println("thread" + threadNum + ":start"); try { Thread.sleep(1000 * threadNum); } catch (InterruptedException e) { System.out.println("thread" + threadNum + ":exception"); } doneSignal.countDown(); System.out.println("thread" + threadNum + ":complete"); }).start(); } System.out.println("main thread:await"); doneSignal.await(); System.out.println("main thread:go on"); }
例子中主线程启动了三条子线程,睡眠一段时间,此时主线程在等待全部子线程结束后才会继续执行下去;
看一下输出结果:函数
main thread:await thread1:start thread2:start thread3:start thread1:complete thread2:complete thread3:complete main thread:go on Process finished with exit code 0
既然CountDownLatch也是AQS的一种使用方式,咱们看一下它的内部类Syc是怎么实现AQS的:oop
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //构造函数,初始化同步状态state的值,即线程个数 Sync(int count) { setState(count); } int getCount() { return getState(); } //这里重写了方法,在共享模式下,告诉调用者是否能够抢占state锁了,正数表明能够,负数表明否认;当state为0时返回正数 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //共享模式下释放锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); //state为0时说明没有什么可释放 if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //CAS对state操做成功后返回state值是否为0,为0则释放成功 return nextc == 0; } } }
看完了重写的AQS同步器后,咱们了解了CountDownLatch对state锁的描述。接下来先看主线程调用的await方法,在await方法里调用了AQS的acquireSharedInterruptibly:ui
//在共享模式下尝试抢占锁 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试抢占前先查询一下是否能够抢占,若是返回值大于0程序往下执行,小于0则等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //在Reentrant解析中咱们看过,往队列中新增node(共享模式) final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //若是当前node的前继时head,立刻尝试抢占锁 int r = tryAcquireShared(arg); if (r >= 0) { //若是state==0即容许往下执行,从新设置head并往下传播信号 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; //获得往下执行的容许 return; } } //如下都跟Reentrant同样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //将当前node设置为head,清空node的thread、prev setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //若是propagate大于0,或者原来head的等待状态小于0或者如今head的等待状态小于0 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //准备唤醒下一个节点 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //若是head的状态为SIGNAL,更改状态为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继节点 unparkSuccessor(h); } //若是head状态为0,更改状态为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //若是head没有改变,结束当前loop,若是遇到head被别的线程改变,继续loop if (h == head) // loop if head changed break; } }
释放锁的信号一直向后传播,直到全部node被唤醒并继续执行,那第一个信号时什么时候发起的呢?咱们来看一下CountDownLatch的countDown方法,该方法调用了sync的releaseShared方法:this
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //若是同步状态state为0时,调用doReleaseShared,在这里就发出了第一个唤醒全部等待node的信号,而后信号自动日后传播 doReleaseShared(); return true; } return false; }
CountDownLatch在调用await的时候判断state释放为0,若是大于0则阻塞当前线程,将当前线程的node添加到队列中等待;在调用countDown时当遇到state减到0时,发出释放共享锁的信号,从头节点的后记节点开始日后传递信号,将队列等待的线程逐个唤醒并继续往下执行;
在这里state跟Reentrant的state独占锁含义不一样,state的含义是由AQS的子类去描述的。线程