CountDownLatch理解

  CountDownLatch使用方法很是简单,主要就是两个方法,await()方法和countDown()方法,await()方法会使线程阻塞。countDown()会将线程同步状态减1,当同步状态为0使唤醒线程。node

仍是经过源码来理解这个类。oop

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

与重入锁同样,也是有一个内部Sync类。从代码中能够看出,这个类得初始化须要设置一个大于0得count,这个count用来标记线程的同步状态。构造方法就很少说了,这里主要讲最重要的await()和countDown()方法。ui

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
       // 判断线程是否处于阻塞状态,若是是,抛出异常
throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //内部Sync类中的方法,线程同步状态为0,返回1,不然返回-1 doAcquireSharedInterruptibly(arg); }

而后咱们再来看doAcquireSharedInterruptibly(arg)这个方法this

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //生成一个共享模式的新节点,而且将这个节点添加到同步队列的队尾
final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {
          //找到这个节点的前一个节点
final Node p = node.predecessor(); if (p == head) {
            // 若是前一个节点是head节点,也就是说这个节点处于同步队列的最前面
int r = tryAcquireShared(arg); if (r >= 0) {
              // 查询线程的同步状态,若是大于等于0,将这个节点置为head节点。而且唤醒这个线程 setHeadAndPropagate(node, r); p.next
= null; // help GC failed = false; return; } }
// 判断是否须要将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed)
// 若是失败,将这个节点从同步队列移除 cancelAcquire(node); } }

这里涉及到了三个方法,setHeadAndPropagate(),shouldPardAfterFailedAcquire()和cancelAcquire()方法,下面一一介绍。不过在这以前先要说一下Node的几种等待状态,他方便后面理解代码。spa

(1)CANCELLED:值为1,因为在同步队列中等待的线程等待超时或被中断,须要从同步队列中取消等待,节点进入该状态将不会变化线程

(2)SINGAL:值为-1,后继节点的线程处于等待状态,当前节点若是释放了同步状态,将会通知后继节点,使后继节点得以运行debug

(3)CONDITION:值为-2,节点在等待队列中(这个在Condition的博客里会讲到),节点线程等待在Condition上,当其余线程对Condition调用了singal后,该节点会从等待队列转移到同步队列,加入到对同步状态的获取中去code

(4)PROPAGEATE:值为-3,表示下一次共享式同步状态获取将会无条件被传播下去blog

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将这个节点设置为head节点
     setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())
// 唤醒处于同步队列最前面的线程,这个方法后面再说 doReleaseShared(); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
// 若是等待状态大于0,也就是处于CANCELLED状态,这里的循环是为了过滤同步状态中等待状态为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {
// CAS操做,将前置节点的等待状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;
        // 声明node的前置节点pred
        Node pred = node.prev;
// 过滤掉同步队列中等待状态为CANCELLED的节点
while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 声明pred的下一个节点predNext Node predNext = pred.next; node.waitStatus = Node.CANCELLED;
//若是node为尾节点,设置pred为tail节点,而且设置pred的next节点为null
if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0)
// 设置node节点的下一个节点为pred的下一个节点 compareAndSetNext(pred, predNext, next); }
else {
//唤醒线程 unparkSuccessor(node); } node.next
= node; // help GC } }

整个await()方法的全部代码就是这些。若是认真看源代码而且仔细梳理的话会发现其实也没那么难懂,下面介绍countDown()方法。队列

public void countDown() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
//线程同步状态-1,若是状态为0返回true,不然返回false
if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

相信看代码已经很清楚了,那么咱们再来看doReleaseShared()这个方法。

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
// 若是等待状态为SIGNAL,将head节点的等待状态改成0,若是成功,唤醒线程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS }
// 若是执行了unparkSuccessor(h)方法,head节点会变化,这样下面的h == head就不成立了,就会继续执行循环
if (h == head) // loop if head changed break; } }

整个CountDownLatch类,我断断续续看了几天才看完,主要是开始看AQS这个类感受比较绕,就看的不是很认真,比较浮躁。其实若是真正用心,跟着代码一步一步去走,一步一步去理解也没那么难,而后经过debug跟着代码去走,查看每一步作了什么,变量,属性的变化,这样有助于深入理解代码。

相关文章
相关标签/搜索