一 概述
二 总览
三 await方法
四 countDown方法
五 应用
六 总结
复制代码
CountDownLatch是一个同步计数工具类,它能够用于控制一个或多个线程等待其余线程任务完成。初始一个计数count后,每当线程完成任务则调用countDown方法使计数count减1,当调用await方法时则会阻塞等待count为0(也就是全部线程任务完成)后才会取消阻塞继续执行。java
public class CountDownLatch {
// 继承AQS的内部同步控制器
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 初始计数值,复用的AQS中的state属性
Sync(int count) { setState(count); }
// 获取计数器
int getCount() { return getState(); }
// 初始尝试获取共享锁方法
protected int tryAcquireShared(int acquires) { …… }
// 释放共享锁方法
protected boolean tryReleaseShared(int releases) { …… }
}
// 内部同步器属性
private final Sync sync;
// 构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 响应中断的阻塞等待全部任务完成的方法
public void await() throws InterruptedException {……}
// 响应中断且超时结束的阻塞等待全部任务完成的方法
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { …… }
// 完成一个任务,计数器减1,当计数为0时则从AQS队列中唤醒全部等待的线程
public void countDown() { …… }
// 获取当前计数值
public long getCount() { …… }
// 重写的toString
public String toString() { …… }
}
复制代码
总的来看CountDownLatch的内部构造很简单,内部维护了一个继承于AQS的同步控制器sync,使用AQS中的state属性做为计数值,并提供了如下几种操做方法:node
方法 | 说明 |
---|---|
await() | 调用该方法的线程阻塞等待全部线程任务完成后返回,支持响应中断 |
await(long timeout, TimeUnit unit) | 调用该方法的线程阻塞等待全部线程任务完成后返回,支持响应中断及超时后直接返回 |
countDown() | 完成一个任务,计数器减1,当计数为0时则从AQS队列中唤醒全部等待的线程 |
getCount() | 获取当前计数值 |
toString() | 重写的toString,输出当前计数器值 |
主方法体微信
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
实际调用的是AQS中的acquireSharedInterruptibly方法:工具
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
复制代码
先尝试一次检查当前计数值是否为0,为0则说明全部任务都完成了,则直接返回成功oop
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
复制代码
不然建立同步等待节点并加入到AQS同步等待队列中进行阻塞等待:ui
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 在同步队列中增长等待节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 若是前驱节点为head节点,表示当前节点是同步等待队列中的第一个,故继续尝试一次获取锁
if (p == head) {
// 尝试获取令牌,此时会跳转到semaphore中(由于重写了该方法)
int r = tryAcquireShared(arg);
// 返回大于0则表示成功获取到令牌了
if (r >= 0) {
// 将当前节点设为head节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 自旋几回后为避免强占CPU,则对该线程进行休眠处理
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 因中断请求则取消排队请求
if (failed)
cancelAcquire(node);
}
}
复制代码
主方法体this
public void countDown() {
sync.releaseShared(1);
}
复制代码
调用AQS中的releaseShared方法:spa
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
先对计数器进行减1操做,采用AQS方式进行,当计数器为0则返回true;线程
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
计数器返回0则须要去唤醒在AQS同步队列中休眠的线程code
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
countDownLatch经常使用于一个线程须要等待其余几个线程的结果才执行的场景,这个场景与FutureTask中的get方法阻塞等待任务返回结果相似。简单demo以下:
CountDownLatch countDownLatch = new CountDownLatch(10);
try {
for (int i = 0; i < 10; i++){
new Thread(() -> {
try {
// 执行任务
} catch (Exception e) {
e.printStackTrace();
} finally {
// 任务执行完后释放
countDownLatch.countDown();
}
});
}
// 等全部的线程跑完,并设置20分钟超时
countDownLatch.await(20, TimeUnit.MINUTES);
// 继续执行其余业务
}catch (Exception e){
e.printStackTrace();
}
复制代码
CountDownLatch内部构造比较简单,基本都是彻底复用的AQS的功能来实现的,好比采用state属性做为计数值,依赖计数器值个数任务完成的线程会加入到AQS同步等待队列中等待。每一个持有计数值的线程任务完成后对计数器减1,减为0的时候去同步队列中唤醒等待的线程。