CountDownLatch
是多线程控制的一种工具,它被称为 门阀
、 计数器
或者 闭锁
。这个工具常常用来用来协调多个线程之间的同步,或者提及到线程之间的通讯(而不是用做互斥的做用)。下面咱们就来一块儿认识一下 CountDownLatchjava
我把本身以往的文章汇总成为了 Github ,欢迎各位大佬 star
https://github.com/crisxuan/bestJavaernode
CountDownLatch 可以使一个线程在等待另一些线程完成各自工做以后,再继续执行。它至关因而一个计数器,这个计数器的初始值就是线程的数量,每当一个任务完成后,计数器的值就会减一,当计数器的值为 0 时,表示全部的线程都已经任务了,而后在 CountDownLatch 上等待的线程就能够恢复执行接下来的任务。git
CountDownLatch 提供了一个构造方法,你必须指定其初始值,还指定了 countDown
方法,这个方法的做用主要用来减少计数器的值,当计数器变为 0 时,在 CountDownLatch 上 await
的线程就会被唤醒,继续执行其余任务。固然也能够延迟唤醒,给 CountDownLatch 加一个延迟时间就能够实现。程序员
其主要方法以下github
CountDownLatch 主要有下面这几个应用场景微信
典型的应用场景就是当一个服务启动时,同时会加载不少组件和服务,这时候主线程会等待组件和服务的加载。当全部的组件和服务都加载完毕后,主线程和其余线程在一块儿完成某个任务。数据结构
CountDownLatch 还能够实现学生一块儿比赛跑步的程序,CountDownLatch 初始化为学生数量的线程,鸣枪后,每一个学生就是一条线程,来完成各自的任务,当第一个学生跑彻底程后,CountDownLatch 就会减一,直到全部的学生完成后,CountDownLatch 会变为 0 ,接下来再一块儿宣布跑步成绩。多线程
顺着这个场景,你本身就能够延伸、拓展出来不少其余任务场景。并发
下面咱们经过一个简单的计数器来演示一下 CountDownLatch 的用法框架
public class TCountDownLatch { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(5); Increment increment = new Increment(latch); Decrement decrement = new Decrement(latch); new Thread(increment).start(); new Thread(decrement).start(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Decrement implements Runnable { CountDownLatch countDownLatch; public Decrement(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { for(long i = countDownLatch.getCount();i > 0;i--){ Thread.sleep(1000); System.out.println("countdown"); this.countDownLatch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Increment implements Runnable { CountDownLatch countDownLatch; public Increment(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("await"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } }
在 main 方法中咱们初始化了一个计数器为 5 的 CountDownLatch,在 Decrement 方法中咱们使用 countDown
执行减一操做,而后睡眠一段时间,同时在 Increment 类中进行等待,直到 Decrement 中的线程完成计数减一的操做后,唤醒 Increment 类中的 run 方法,使其继续执行。
下面咱们再来经过学生赛跑这个例子来演示一下 CountDownLatch 的具体用法
public class StudentRunRace { CountDownLatch stopLatch = new CountDownLatch(1); CountDownLatch runLatch = new CountDownLatch(10); public void waitSignal() throws Exception{ System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令"); stopLatch.await(); System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("选手" + Thread.currentThread().getName() + "到达终点"); runLatch.countDown(); } public void waitStop() throws Exception{ Thread.sleep((long) (Math.random() * 10000)); System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令"); stopLatch.countDown(); System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待全部选手到达终点"); runLatch.await(); System.out.println("全部选手都到达终点"); System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名"); } public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); StudentRunRace studentRunRace = new StudentRunRace(); for (int i = 0; i < 10; i++) { Runnable runnable = () -> { try { studentRunRace.waitSignal(); } catch (Exception e) { e.printStackTrace(); } }; service.execute(runnable); } try { studentRunRace.waitStop(); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
下面咱们就来一块儿分析一下 CountDownLatch
的源码
CountDownLatch 使用起来比较简单,可是却很是有用,如今你能够在你的工具箱中加上 CountDownLatch 这个工具类了。下面咱们就来深刻认识一下 CountDownLatch。
CountDownLatch 的底层是由 AbstractQueuedSynchronizer
支持,而 AQS 的数据结构的核心就是两个队列,一个是 同步队列(sync queue)
,一个是条件队列(condition queue)
。
CountDownLatch 在其内部是一个 Sync ,它继承了 AQS 抽象类。
private static final class Sync extends AbstractQueuedSynchronizer {...}
CountDownLatch 其实其内部只有一个 sync
属性,而且是 final 的
private final Sync sync;
CountDownLatch 只有一个带参数的构造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
也就是说,初始化的时候必须指定计数器的数量,若是数量为负会直接抛出异常。
而后把 count 初始化为 Sync 内部的 count,也就是
Sync(int count) { setState(count); }
注意这里有一个 setState(count),这是什么意思呢?见闻知意这只是一个设置状态的操做,可是实际上不仅仅是,还有一层意思是 state 的值表明着待达到条件的线程数。这个咱们在聊 countDown 方法的时候再讨论。
getCount()
方法的返回值是 getState()
方法,它是 AbstractQueuedSynchronizer 中的方法,这个方法会返回当前线程计数,具备 volatile 读取的内存语义。
// ---- CountDownLatch ---- int getCount() { return getState(); } // ---- AbstractQueuedSynchronizer ---- protected final int getState() { return state; }
tryAcquireShared()
方法用于获取·共享状态下对象的状态,判断对象是否为 0 ,若是为 0 返回 1 ,表示可以尝试获取,若是不为 0,那么返回 -1,表示没法获取。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // ---- getState() 方法和上面的方法相同 ----
这个 共享状态
属于 AQS 中的概念,在 AQS 中分为两种模式,一种是 独占模式
,一种是 共享模式
。
tryReleaseShared()
方法用于共享模式下的释放
protected boolean tryReleaseShared(int releases) { // 减少数量,变为 0 的时候进行通知。 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
这个方法是一个无限循环,获取线程状态,若是线程状态是 0 则表示没有被线程占有,没有占有的话那么直接返回 false ,表示已经释放;而后下一个状态进行 - 1 ,使用 compareAndSetState CAS 方法进行和内存值的比较,若是内存值也是 1 的话,就会更新内存值为 0 ,判断 nextc 是否为 0 ,若是 CAS 比较不成功的话,会再次进行循环判断。
若是 CAS 用法不清楚的话,读者朋友们能够参考这篇文章 告诉你一个 AtomicInteger 的惊天大秘密!
await()
方法是 CountDownLatch 一个很是重要的方法,基本上能够说只有 countDown 和 await 方法才是 CountDownLatch 的精髓所在,这个方法将会使当前线程在 CountDownLatch 计数减至零以前一直等待,除非线程被中断。
CountDownLatch 中的 await 方法有两种,一种是不带任何参数的 await()
,一种是能够等待一段时间的await(long timeout, TimeUnit unit)
。下面咱们先来看一下 await() 方法。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await 方法内部会调用 acquireSharedInterruptibly 方法,这个 acquireSharedInterruptibly 是 AQS 中的方法,以共享模式进行中断。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
能够看到,acquireSharedInterruptibly 方法的内部会首先判断线程是否中断
,若是线程中断,则直接抛出线程中断异常。若是没有中断,那么会以共享的方式获取。若是可以在共享的方式下不能获取锁,那么就会以共享的方式断开连接。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这个方法有些长,咱们分开来看
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
首先会设置头节点,而后进行一系列的判断,获取节点的获取节点的后继,以共享模式进行释放,就会调用 doReleaseShared 方法,咱们再来看一下 doReleaseShared 方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
这个方法会以无限循环的方式首先判断头节点是否等于尾节点,若是头节点等于尾节点的话,就会直接退出。若是头节点不等于尾节点,会判断状态是否为 SIGNAL,不是的话就继续循环 compareAndSetWaitStatus,而后断开后继节点。若是状态不是 SIGNAL,也会调用 compareAndSetWaitStatus 设置状态为 PROPAGATE,状态为 0 而且不成功,就会继续循环。
也就是说 setHeadAndPropagate 就是设置头节点而且释放后继节点的一系列过程。
shouldParkAfterFailedAcquire(p, node)
这里if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
若是上面 Node p = node.predecessor() 获取前驱节点不是头节点,就会进行 park 断开操做,判断此时是否可以断开,判断的标准以下
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这个方法会判断 Node p 的前驱节点的结点状态(waitStatus)
,节点状态一共有五种,分别是
CANCELLED(1)
:表示当前结点已取消调度。当超时或被中断(响应中断的状况下),会触发变动为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1)
:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。
CONDITION(-2)
:表示结点等待在 Condition 上,当其余线程调用了 Condition 的 signal() 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3)
:共享模式下,前继结点不只会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0
:新结点入队时的默认状态。
若是前驱节点是 SIGNAL 就会返回 true 表示能够断开,若是前驱节点的状态大于 0 (此时为何不用 ws == Node.CANCELLED ) 呢?由于 ws 大于 0 的条件只有 CANCELLED 状态了。而后就是一系列的查找遍历操做直到前驱节点的 waitStatus > 0。若是 ws <= 0 ,并且还不是 SIGNAL 状态的话,就会使用 CAS 替换前驱节点的 ws 为 SIGNAL 状态。
若是检查判断是中断状态的话,就会返回 false。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
这个方法使用 LockSupport.park
断开链接,而后返回线程是否中断的标志。
cancelAcquire()
用于取消等待队列,若是等待过程当中没有成功获取资源(如timeout,或者可中断的状况下被中断了),那么取消结点在队列中的等待。private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
因此,对 CountDownLatch 的 await 调用大体会有以下的调用过程。
一个和 await 重载的方法是 await(long timeout, TimeUnit unit)
,这个方法和 await 最主要的区别就是这个方法可以能够等待计数器一段时间再执行后续操做。
countDown 是和 await 同等重要的方法,countDown 用于减小计数器的数量,若是计数减为 0 的话,就会释放全部的线程。
public void countDown() { sync.releaseShared(1); }
这个方法会调用 releaseShared 方法,此方法用于共享模式下的释放操做,首先会判断是否可以进行释放,判断的方法就是 CountDownLatch 内部类 Sync 的 tryReleaseShared 方法
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // ---- CountDownLatch ---- protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared 会进行 for 循环判断线程状态值,使用 CAS 不断尝试进行替换。
若是可以释放,就会调用 doReleaseShared 方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
能够看到,doReleaseShared 其实也是一个无限循环不断使用 CAS 尝试替换的操做。
本文是 CountDownLatch 的基本使用和源码分析,CountDownLatch 就是一个基于 AQS 的计数器,它内部的方法都是围绕 AQS 框架来谈的,除此以外还有其余好比 ReentrantLock、Semaphore 等都是 AQS 的实现,因此要研究并发的话,离不开对 AQS 的探讨。CountDownLatch 的源码看起来不多,比较简单,可是其内部好比 await 方法的调用链路却很长,也值得花费时间深刻研究。
我是 cxuan,一枚技术创做的程序员。若是本文你以为不错的话,跪求读者点赞、在看、分享!
另外,我本身肝了六本 PDF,微信搜索「程序员cxuan」关注公众号后,在后台回复 cxuan ,领取所有 PDF,这些 PDF 以下