线程间的同步与通讯(6)——CountDownLatch源码分析

前言

系列文章目录 java

CountDownLatch是一个颇有用的工具,latch是门闩的意思,该工具是为了解决某些操做只能在一组操做所有执行完成后才能执行的情景。例如,小组早上开会,只有等全部人到齐了才能开;再如,游乐园里的过山车,一次能够坐10我的,为了节约成本,一般是等够10我的了才开。CountDown是倒数计数,因此CountDownLatch的用法一般是设定一个大于0的值,该值即表明须要等待的总任务数,每完成一个任务后,将总任务数减一,直到最后该值为0,说明全部等待的任务都执行完了,“门闩”此时就被打开,后面的任务能够继续执行。node

CountDownLatch自己是基于共享锁实现的,若是你还不了解共享锁,建议先读一下逐行分析AQS源码(3)——共享锁的获取与释放,而后再继续往下看。segmentfault

核心属性

CountDownLatch主要是经过AQS的共享锁机制实现的,所以它的核心属性只有一个sync,它继承自AQS,同时覆写了tryAcquireSharedtryReleaseShared,以完成具体的实现共享锁的获取与释放的逻辑。框架

private final Sync sync;
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

构造函数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在构造函数中,咱们就是简单传入了一个不小于0的任务数,由上面Sync的构造函数可知,这个任务数就是AQS的state的初始值。函数

核心方法

CountDownLatch最核心的方法只有两个,一个是countDown方法,每调用一次,就会将当前的count减一,当count值为0时,就会唤醒全部等待中的线程;另外一个是await方法,它有两种形式,一种是阻塞式,一种是带超时机制的形式,该方法用于将当前等待“门闩”开启的线程挂起,直到count值为0,这一点很相似于条件队列,至关于等待的条件就是count值为0,然而其底层的实现并非用条件队列,而是共享锁。工具

countDown()

public void countDown() {
    sync.releaseShared(1);
}

前面说过,countDown()方法的目的就是将count值减一,而且在count值为0时,唤醒全部等待的线程,它内部调用的实际上是释放共享锁的操做:学习

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法由AQS实现,可是tryReleaseShared方法由Sync类本身实现:ui

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

该方法的实现很简单,就是获取当前的state值,若是已经为0了,直接返回false;不然经过CAS操做将state值减一,以后返回的是nextc == 0,因而可知,该方法只有在count值原来不为0,可是调用后变为0时,才会返回true,不然返回false,而且也能够看出,该方法在返回true以后,后面若是再次调用,仍是会返回false。也就是说,调用该方法只有一种状况会返回true,那就是state值从大于0变为0值时,这时也是全部在门闩前的任务都完成了。this

tryReleaseShared返回true之后,将调用doReleaseShared方法唤醒全部等待中的线程,该方法咱们在前面的文章中已经详细分析过了,这里就再也不赘述了。spa

值得一提的是,咱们其实并不关心releaseShared的返回值,而只关心tryReleaseShared的返回值,或者只关心count到0了没有,这里更像是借了共享锁的“壳”,来完成咱们的目的,事实上咱们彻底能够本身设一个全局变量count来实现相同的效果,只不过对这个全局变量的操做也必须使用CAS。

await()

Condition的await()方法的语义相同,该方法是阻塞式地等待,而且是响应中断的,只不过它不是在等待signal操做,而是在等待count值为0:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可见,await方法内部调用的是acquireSharedInterruptibly方法,至关于借用了获取共享锁的“壳”:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

咱们来回忆一下独占模式下对应的方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可见,二者用的是同一个框架,只是这里:

  • tryAcquire(arg) 换成了 tryAcquireShared(arg) (子类实现)
  • doAcquireInterruptibly(arg) 换成了 doAcquireSharedInterruptibly(arg) (AQS提供)

咱们先来看看Sync子类对于tryAcquireShared的实现:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

该方法彷佛有点挂羊头卖狗肉的感受——所谓的获取共享锁,事实上并非什么抢锁的行为,没有任何CAS操做,它就是判断当前的state值是否是0,是就返回1,不是就返回-1。

值得注意的是,在逐行分析AQS源码(3)——共享锁的获取与释放中咱们特别提到过tryAcquireShared返回值的含义:

  • 若是该值小于0,则表明当前线程获取共享锁失败
  • 若是该值大于0,则表明当前线程获取共享锁成功,而且接下来其余线程尝试获取共享锁的行为极可能成功
  • 若是该值等于0,则表明当前线程获取共享锁成功,可是接下来其余线程尝试获取共享锁的行为会失败

因此,当该方法的返回值不小于0时,就说明抢锁成功,能够直接退出了,所对应的就是count值已经为0,全部等待的事件都知足了。不然,咱们调用doAcquireSharedInterruptibly(arg)将当前线程封装成Node,丢到sync queue中去阻塞等待:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

前面咱们介绍共享锁的获取时,已经分析过了doAcquireShared方法,只是它是不抛出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中断版本,咱们能够直接对比一下:
doAcquireShared-vs-doAcquireSharedInterruptibly

可见,它们仅仅是在对待中断的处理方式上有所不一样,其余部分都是同样的,因为doAcquireShared前面的文章中咱们已经详细分析过了,这里就再也不赘述了。

await(long timeout, TimeUnit unit)

相较于await()方法,await(long timeout, TimeUnit unit)提供了超时等待机制:

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,在tryAcquireSharedNanos方法中,咱们用到了doAcquireSharedNanos的返回值,若是该方法由于超时而退出时,则将返回false。因为await()方法是阻塞式的,也就是说没有获取到锁是不会退出的,所以它没有返回值,换句话说,若是它正常返回了,则必定是由于获取到了锁而返回; 而await(long timeout, TimeUnit unit)因为有了超时机制,它是有返回值的,返回值为true则表示获取锁成功,为false则表示获取锁失败。doAcquireSharedNanos的这个返回值有助于咱们理解该方法到底是由于获取到了锁而返回,仍是由于超时时间到了而返回。

至于doAcquireSharedNanos的实现细节,因为他和doAcquireSharedInterruptibly相比只是多了一个超时机制:
doAcquireSharedInterruptibly-vs-doAcquireSharedNanos

代码自己很简单,就不赘述了。

实战

接下来咱们来学习一个使用CountDownLatch的实际例子,Java的官方源码已经为咱们提供了一个使用的示例代码:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

在这个例子中,有两个“闸门”,一个是CountDownLatch startSignal = new CountDownLatch(1),它开启后,等待在这个“闸门”上的任务才能开始运行;另外一个“闸门”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N个任务都执行完成后,才能继续往下。

Worker实现了Runnable接口,表明了要执行的任务,在它的run方法中,咱们先调用了startSignal.await(),等待startSignal这一“闸门”开启,闸门开启后,咱们就执行本身的任务,任务完成后再执行doneSignal.countDown(),将等待的总任务数减一。

代码自己的逻辑很是简单好懂,这里不赘述了。

总结

  • CountDownLatch至关于一个“门栓”,一个“闸门”,只有它开启了,代码才能继续往下执行。一般状况下,若是当前线程须要等其余线程执行完成后才能执行,咱们就可使用CountDownLatch。
  • 使用CountDownLatch#await方法阻塞等待一个“闸门”的开启。
  • 使用CountDownLatch#countDown方法减小闸门所等待的任务数。
  • CountDownLatch基于共享锁实现。
  • CountDownLatch是一次性的,“闸门”开启后,没法再重复使用,若是想重复使用,应该用[CyclicBarrier]()

(完)

系列文章目录

相关文章
相关标签/搜索