CountDownLatch源码解析

一 概述
   二 总览
   三 await方法
   四 countDown方法
   五 应用
   六 总结
复制代码

一 概述

CountDownLatch是一个同步计数工具类,它能够用于控制一个或多个线程等待其余线程任务完成。初始一个计数count后,每当线程完成任务则调用countDown方法使计数count减1,当调用await方法时则会阻塞等待count为0(也就是全部线程任务完成)后才会取消阻塞继续执行。java

二 总览

public class CountDownLatch {
    // 继承AQS的内部同步控制器
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 初始计数值,复用的AQS中的state属性
        Sync(int count) { setState(count); }
        // 获取计数器
        int getCount() { return getState(); }
        // 初始尝试获取共享锁方法
        protected int tryAcquireShared(int acquires) { …… }
        // 释放共享锁方法
        protected boolean tryReleaseShared(int releases) { …… }
    }
    // 内部同步器属性
    private final Sync sync;
    // 构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    // 响应中断的阻塞等待全部任务完成的方法
    public void await() throws InterruptedException {……}
    // 响应中断且超时结束的阻塞等待全部任务完成的方法
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { …… }
    // 完成一个任务,计数器减1,当计数为0时则从AQS队列中唤醒全部等待的线程
    public void countDown() { …… }
    // 获取当前计数值
    public long getCount() { …… }
    // 重写的toString
    public String toString() { …… }
}
复制代码

总的来看CountDownLatch的内部构造很简单,内部维护了一个继承于AQS的同步控制器sync,使用AQS中的state属性做为计数值,并提供了如下几种操做方法:node

方法 说明
await() 调用该方法的线程阻塞等待全部线程任务完成后返回,支持响应中断
await(long timeout, TimeUnit unit) 调用该方法的线程阻塞等待全部线程任务完成后返回,支持响应中断及超时后直接返回
countDown() 完成一个任务,计数器减1,当计数为0时则从AQS队列中唤醒全部等待的线程
getCount() 获取当前计数值
toString() 重写的toString,输出当前计数器值

三 await方法

主方法体微信

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码

实际调用的是AQS中的acquireSharedInterruptibly方法:工具

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码

先尝试一次检查当前计数值是否为0,为0则说明全部任务都完成了,则直接返回成功oop

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
复制代码

不然建立同步等待节点并加入到AQS同步等待队列中进行阻塞等待:ui

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 在同步队列中增长等待节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 若是前驱节点为head节点,表示当前节点是同步等待队列中的第一个,故继续尝试一次获取锁
            if (p == head) {
                // 尝试获取令牌,此时会跳转到semaphore中(由于重写了该方法)
                int r = tryAcquireShared(arg);
                // 返回大于0则表示成功获取到令牌了
                if (r >= 0) {
                    // 将当前节点设为head节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 自旋几回后为避免强占CPU,则对该线程进行休眠处理
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 因中断请求则取消排队请求
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

四 countDown方法

主方法体this

public void countDown() {
    sync.releaseShared(1);
}
复制代码

调用AQS中的releaseShared方法:spa

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码

先对计数器进行减1操做,采用AQS方式进行,当计数器为0则返回true;线程

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
复制代码

计数器返回0则须要去唤醒在AQS同步队列中休眠的线程code

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
复制代码

五 应用

countDownLatch经常使用于一个线程须要等待其余几个线程的结果才执行的场景,这个场景与FutureTask中的get方法阻塞等待任务返回结果相似。简单demo以下:

CountDownLatch countDownLatch = new CountDownLatch(10);
try {
    for (int i = 0; i < 10; i++){
        new Thread(() -> {
            try {
                // 执行任务
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 任务执行完后释放
                countDownLatch.countDown();
            }
        });
    }
    // 等全部的线程跑完,并设置20分钟超时
    countDownLatch.await(20, TimeUnit.MINUTES);
    // 继续执行其余业务
}catch (Exception e){
    e.printStackTrace();
}
复制代码

六 总结

CountDownLatch内部构造比较简单,基本都是彻底复用的AQS的功能来实现的,好比采用state属性做为计数值,依赖计数器值个数任务完成的线程会加入到AQS同步等待队列中等待。每一个持有计数值的线程任务完成后对计数器减1,减为0的时候去同步队列中唤醒等待的线程。


更多原创文章请关注微信公众号
👇👇👇
唠吧嗑吧

相关文章
相关标签/搜索