CountDownLatch使用方法很是简单,主要就是两个方法,await()方法和countDown()方法,await()方法会使线程阻塞。countDown()会将线程同步状态减1,当同步状态为0使唤醒线程。node
仍是经过源码来理解这个类。oop
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
与重入锁同样,也是有一个内部Sync类。从代码中能够看出,这个类得初始化须要设置一个大于0得count,这个count用来标记线程的同步状态。构造方法就很少说了,这里主要讲最重要的await()和countDown()方法。ui
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())
// 判断线程是否处于阻塞状态,若是是,抛出异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //内部Sync类中的方法,线程同步状态为0,返回1,不然返回-1 doAcquireSharedInterruptibly(arg); }
而后咱们再来看doAcquireSharedInterruptibly(arg)这个方法this
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//生成一个共享模式的新节点,而且将这个节点添加到同步队列的队尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {
//找到这个节点的前一个节点 final Node p = node.predecessor(); if (p == head) {
// 若是前一个节点是head节点,也就是说这个节点处于同步队列的最前面 int r = tryAcquireShared(arg); if (r >= 0) {
// 查询线程的同步状态,若是大于等于0,将这个节点置为head节点。而且唤醒这个线程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } }
// 判断是否须要将当前线程阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed)
// 若是失败,将这个节点从同步队列移除 cancelAcquire(node); } }
这里涉及到了三个方法,setHeadAndPropagate(),shouldPardAfterFailedAcquire()和cancelAcquire()方法,下面一一介绍。不过在这以前先要说一下Node的几种等待状态,他方便后面理解代码。spa
(1)CANCELLED:值为1,因为在同步队列中等待的线程等待超时或被中断,须要从同步队列中取消等待,节点进入该状态将不会变化线程
(2)SINGAL:值为-1,后继节点的线程处于等待状态,当前节点若是释放了同步状态,将会通知后继节点,使后继节点得以运行debug
(3)CONDITION:值为-2,节点在等待队列中(这个在Condition的博客里会讲到),节点线程等待在Condition上,当其余线程对Condition调用了singal后,该节点会从等待队列转移到同步队列,加入到对同步状态的获取中去code
(4)PROPAGEATE:值为-3,表示下一次共享式同步状态获取将会无条件被传播下去blog
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 将这个节点设置为head节点
setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())
// 唤醒处于同步队列最前面的线程,这个方法后面再说 doReleaseShared(); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) {
// 若是等待状态大于0,也就是处于CANCELLED状态,这里的循环是为了过滤同步状态中等待状态为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {
// CAS操做,将前置节点的等待状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 声明node的前置节点pred Node pred = node.prev;
// 过滤掉同步队列中等待状态为CANCELLED的节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 声明pred的下一个节点predNext Node predNext = pred.next; node.waitStatus = Node.CANCELLED;
//若是node为尾节点,设置pred为tail节点,而且设置pred的next节点为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0)
// 设置node节点的下一个节点为pred的下一个节点 compareAndSetNext(pred, predNext, next); } else {
//唤醒线程 unparkSuccessor(node); } node.next = node; // help GC } }
整个await()方法的全部代码就是这些。若是认真看源代码而且仔细梳理的话会发现其实也没那么难懂,下面介绍countDown()方法。队列
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) {
//线程同步状态-1,若是状态为0返回true,不然返回false if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
相信看代码已经很清楚了,那么咱们再来看doReleaseShared()这个方法。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {
// 若是等待状态为SIGNAL,将head节点的等待状态改成0,若是成功,唤醒线程 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS }
// 若是执行了unparkSuccessor(h)方法,head节点会变化,这样下面的h == head就不成立了,就会继续执行循环 if (h == head) // loop if head changed break; } }
整个CountDownLatch类,我断断续续看了几天才看完,主要是开始看AQS这个类感受比较绕,就看的不是很认真,比较浮躁。其实若是真正用心,跟着代码一步一步去走,一步一步去理解也没那么难,而后经过debug跟着代码去走,查看每一步作了什么,变量,属性的变化,这样有助于深入理解代码。