CountDownLatch源码分析

1、简介

          CountDownLatch是并发包中提供的一个同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待。 用给定的计数值初始化CountDownLatch。调用countDown()方法将计数减一,因此在当前计数到达零以前,调用await()方法会一直受阻塞, 直到这个CountDownLatch对象的计数值减到0为止。计数值等于0后,会释放全部等待的线程,await 的全部后续调用都将当即返回。 计数没法被重置,若是CountDownLatch的计数减为0时,后续有线程调用await()方法会直接经过。CountDownLatch也是经过AQS的共享模式进行实现。若是须要重置计数, 请考虑使用CyclicBarrier,能够看个人另外一篇CyclicBarrier源码分析juejin.im/post/5d3bf8…。基于Java8。java

2、属性

//CountDownLatch是经过AbstractQueuedSynchronizer的实现类Sync进行实现,能够看下下面对Sync的介绍
private final Sync sync;复制代码

3、内部类

//Sync实现AbstractQueuedSynchronizer,AQS不清楚的能够看下个人另外一篇AbstractQueuedSynchronizer源码分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        //传入计数值(即AQS的状态,表示调用await方法的线程须要等待调用count次的countDown方法)
        Sync(int count) {
            //设置AQS的属性state值
            setState(count);
        }
        
        //获取当前的计数值,即AQS中的state属性
        int getCount() {
            //返回AQS中的state属性
            return getState();
        }
        
        //重写AQS中的tryAcquireShared方法,tryAcquireShared在AQS中是模板方法,子类进行实现,不然抛出UnsupportedOperationException异常,AQS的tryAcquireSharedNanos方法中进行调用,await调用AQS的tryAcquireSharedNanos方法,返回1表示await方法无需等待,返回-1调用await方法线程须要等待 
        protected int tryAcquireShared(int acquires) {
            //若是当前计数值等于0,返回1,表示获取共享锁成功,不然返回-1,获取共享锁失败,返回1即调用await方法的线程能够直接执行,无需等待
            return (getState() == 0) ? 1 : -1;
        }
        
        //重写AQS中的tryReleaseShared方法,tryReleaseShared在AQS中是模板方法,子类进行实现,不然抛出UnsupportedOperationException异常,AQS的releaseShared方法中进行调用,countDown调用AQS的releaseShared方法,返回true表示调用线程将其计数值减为0,若是原先就等于0,直接返回false
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //使用循环的将计数值减1,由于tryReleaseShared方法会被多个线程进行调用,循环进行递减成功,或者计数值已经被递减为0
            for (;;) {
                //获取计数值,即AQS的属性值
                int c = getState();
                //若是计数值已经等于0
                if (c == 0) 
                    //直接返回失败 
                    return false;
                //将计数值作减1操做
                int nextc = c-1;
                //使用CAS将计数值,从c修改为nextc
                if (compareAndSetState(c, nextc))
                    //若是cas更改计数值成功,而且计数值减到0,返回true
                    return nextc == 0;
            }
        }
    }复制代码

4、构造函数

//传入count计数值构造CountDownLatch实例
public CountDownLatch(int count) {
        //若是count小于0,抛出IllegalArgumentException异常
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //调用上面的介绍的Sync的构造函数,将计数值count设置为sync从AQS继承下来的state属性,不清楚的能够看下上面对Sync的介绍
        this.sync = new Sync(count);
}复制代码

5、线程等待流程

//调用await方法的线程等待CountDownLatch的计数值等于0,在等待CountDownLatch的计数值等于0的时候等待过程当中被中断,抛出中断异常,支持中断
public void await() throws InterruptedException {
       //调用sync从AQS中继承下来的acquireSharedInterruptibly方法进行判断计数值是否等于0,即AQS的属性state是否等于0 
       sync.acquireSharedInterruptibly(1);
}

//Sync从AbstractQueuedSynchronizer继承下来的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        //检查调用线程是否有被中断
        if (Thread.interrupted())
            //有的话直接抛出中断异常 
            throw new InterruptedException();
        //调用Sync重写AQS的tryAcquireShared方法,判断计数值是否等于0,若是等于0返回1,就无需调用doAcquireSharedInterruptibly方法等待计数值等于0,详细的能够看下上面对Sync的tryAcquireShared方法的介绍 
        if (tryAcquireShared(arg) < 0)
            //doAcquireSharedInterruptibly是线程在等待CountDownLatch的计数值等于0的时候过程当中被中断,线程等待被唤醒时,若是有中断请求直接抛出中断异常 
            doAcquireSharedInterruptibly(arg);
}

//Sync从AbstractQueuedSynchronizer继承下来的doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        //往同步队列中从尾节点加入一个模式为共享锁的节点,看下面对addWaiter方法的介绍
        final Node node = addWaiter(Node.SHARED);
        //执行是否失败的标志位,若是失败会将此节点从同步队列中移除 
        boolean failed = true;
        try {
            //死循环的判断CountDownLatch的计数值是否等于0,即AQS的state属性是否等于0,除非CountDownLatch的计数值等于0,获取线程在等待的过程当中,被中断,抛出中断异常
            for (;;) {
                //获取当前节点的前一个节点,前一个节点不存在抛出空指针异常,CLH(同步队列必定会有个有效的前置节点),predecessor方法会在下面进行介绍
                final Node p = node.predecessor();
                //若是当前节点的前置节点为表头
                if (p == head) {
                    //使用上面介绍的Sync重写的tryAcquireShared尝试判断CountDownLatch的计数值是否等于0,若是等于0表示线程await方法能够直接执行 
                    int r = tryAcquireShared(arg);
                    //若是r大于0,即r等于1,CountDownLatch的计数值等于0
                    if (r >= 0) {
                        //设置当前节点为头节点,而且唤醒等待CountDownLatch的计数值等于0的下一个节点,能够看下面对setHeadAndPropagate方法的介绍
                        setHeadAndPropagate(node, r);
                        //将当前节点的前置节点下一个节点置为空,即当前节点的前置节点从同步队列中移除
                        p.next = null; // help GC
                        //CountDownLatch的计数值等于0,线程执行成功
                        failed = false;
                        //直接返回 
                        return;
                    }
                }
                //判断当前节点的前置节点的状态值是否为SIGNAL,若是是调用parkAndCheckInterrupt方法阻塞当前线程,shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法能够看下面对这两个方法的详细介绍
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //若是线程在等待CountDownLatch的计数值等于0的过程当中,有其余线程调用当前线程的interrupt方法,中断请求,抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            //若是线程等待CountDownLatch的计数值等于0失败,即等待线程被中断 
            if (failed)
                //将当前节点从同步队列中移除,能够看下面对cancelAcquire方法的介绍 
                cancelAcquire(node);
        }
}

//Sync从AbstractQueuedSynchronizer继承下来的addWaiter方法
//@param mode 要建立节点的模式,是要等待CountDownLatch的计数值等于0
private Node addWaiter(Node mode) {
        //根据当前线程和传入的节点模式建立新节点 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //获取同步队列(CLH)的尾节点
        Node pred = tail;
        //若是尾节点不为空 
        if (pred != null) {
            //将新建节点的前置节点设置为尾节点
            node.prev = pred;
            //使用CAS将新建节点设置为尾节点 
            if (compareAndSetTail(pred, node)) {
                //若是CAS成功,尾节点的下一节点为新建节点 
                pred.next = node;
                //返回新建节点
                return node;
            }
        }
        //不然调用enq方法进行循环的将新建节点加入同步队列中,作为同步队列的尾节点,详细的能够看下面enq方法的介绍
        enq(node);
        //返回新建节点 
        return node;
}

//Sync从AbstractQueuedSynchronizer继承下来的enq方法
private Node enq(final Node node) {
        //死循环的将传入节点加入到同步队列中,作为同步队列的尾节点,直到节点加入队列成功为止
        for (;;) {
            //获取尾节点 
            Node t = tail;
            //若是尾节点为空,代表同步队列不存在节点
            if (t == null) {
                //新建个节点作为同步队列的头节点,使用CAS进行头节点的设置
                if (compareAndSetHead(new Node()))
                    //若是头节点设置成功,将尾节点设置为头节点
                    tail = head;
            } else {//不然队列不为空
                //将新建节点的前置节点设置为尾节点
                node.prev = t;
                //使用CAS将新建节点设置为尾节点 
                if (compareAndSetTail(t, node)) {
                    //若是CAS成功,尾节点的下一节点为新建节点
                    t.next = node;
                    //返回新建节点
                    return t;
                }
            }
        }
}

//AbstractQueuedSynchronizer中的内部类Node的predecessor方法
//获取当前节点的前驱节点,若是调用节点的前置节点为null,则抛出空指针异常
final Node predecessor() throws NullPointerException {
            //当前节点的前驱节点
            Node p = prev;
            //若是前驱节点为空
            if (p == null)
                //抛出空指针异常
                throw new NullPointerException();
            else
                //返回当前节点的前驱节点
                return p;
}

//Sync从AbstractQueuedSynchronizer继承下来的setHeadAndPropagate方法,从新设置CLH队列头,若是CLH队列头的下一个节点为null或者头节点的下一节点模式为共享模式,那么就要唤醒同步队列的下一等待的线程
private void setHeadAndPropagate(Node node, int propagate) {
        //获取同步队列的头节点
        Node h = head; // Record old head for check below
        //将传入进来的节点设置为同步队列的表头,将传入进来的前置节点和线程都置为空
        setHead(node);
        //当propagate等于1,CountDownLatch的计数值等于0,或者头节点为空,或者头节点的状态为SIGNAL
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 获取新的CLH队列头的下一个节点s 
            Node s = node.next;
            // 若是节点s是空或者共享模式节点,那么就要唤醒同步队列上等待的线程 
            if (s == null || s.isShared())
                //唤醒下一个等待队列的线程,在下面对此方法进行介绍 
                doReleaseShared();
        }
}

//Sync从AbstractQueuedSynchronizer继承下来的doReleaseShared方法,唤醒同步队列头节点的下一等待CountDownLatch的计数值等于0的节点
private void doReleaseShared() {
        for (;;) {
            // 将同步队列头赋值给节点h
            Node h = head;
             // 若是节点h不为null,且不等于同步队列尾
            if (h != null && h != tail) {
                //获得节点h的状态
                int ws = h.waitStatus;
                //若是状态是Node.SIGNAL,就要唤醒节点h后继节点的线程
                if (ws == Node.SIGNAL) {
                    // 将节点h的状态设置成0,若是设置失败,就继续循环,再试一次。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒节点h后继节点的线程
                    unparkSuccessor(h);
                }
                //若是节点h的状态是0,就设置ws的状态是PROPAGATE,对AQS的内部类Node节点的状态不清楚的,能够看下个人另外一篇对AQS源码的分析 
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 若是同步队列头head节点发生改变,继续循环,
            // 若是没有改变,就跳出循环 
            if (h == head)                   // loop if head changed
                break;
        }
}

/** * Sync从AbstractQueuedSynchronizer继承下来的shouldParkAfterFailedAcquire方法,根据前一个节点pred的状态,来判断当前节点对应的线程是否应该被阻塞 * @param pred : node节点的前一个节点 * @param node : 等待CountDownLatch的计数值等于0的节点 * @return 返回true 表示当前线程应该被阻塞,而后调用parkAndCheckInterrupt方法来阻塞当前线程 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前置节点的状态值 
        int ws = pred.waitStatus;
        //若是前置节点的状态值为SIGNAL 
        if (ws == Node.SIGNAL)
            // 若是前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞
            return true;
        //若是前置节点已经取消,循环获取不是取消的前置节点 
        if (ws > 0) {
            // 若是前一个节点状态是Node.CANCELLED(大于0就是CANCELLED),
            // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。
            // 因此从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev,
            // 表示node节点的前一个节点已经改变
            do {
                //从新赋值当前节点的前置节点 
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //不是取消的前置节点的下一节点从新赋值为当前节点
            pred.next = node;
        } else {
            // 此时前一个节点pred的状态只能是0或者PROPAGATE,不多是CONDITION状态
            // CONDITION(只在condition条件队列中节点存在,CLH同步队列中没有此状态的节点)
            // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //返回要等待CountDownLatch的计数值等于0的节点对应的线程不会阻塞 
        return false;
}

//Sync从AbstractQueuedSynchronizer继承下来的parkAndCheckInterrupt方法,阻塞当前线程,而且唤醒时检验当前线程在等待的过程当中是否有其余线程发起中断请求
private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this);
        //当前线程被唤醒后,返回当前线程的中断标志位
        return Thread.interrupted();
}

//将传入进来的节点从同步队列中移除,将传入节点对应的线程置为空,状态置为CANCELLED
private void cancelAcquire(Node node) {
        //若是当前节点为空,直接退出
        if (node == null)
            //直接退出
            return;
        //将当前节点对应的线程置为空
        node.thread = null;
        
        //获取当前要取消节点的前置节点
        Node pred = node.prev;
        //循环跳过前置节点状态为CANNELLED的值
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        //获取状态不是取消的前置的节点的下一个节点,在设置前置节点的下一个节点使用到
        Node predNext = pred.next;
        //将当前要取消的节点状态赋值为CANCELLED
        node.waitStatus = Node.CANCELLED;
        
        //若是要取消节点为尾节点,将尾节点设置为要取消节点的前一个节点 
        if (node == tail && compareAndSetTail(node, pred)) {
            //若是设置成功,将要取消节点的前置节点的下一个节点设置为空
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            //若是前置不是头节点,而且前置节点的状态值为SIGNAL,或者将前置节点的状态值设置为SIGNAL,而且前置节点的线程不为空
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //获取要取消节点的下一个节点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //不然的话唤醒node节点的下一个节点
                unparkSuccessor(node);
            }
            //将要取消节点的下一节点设置为自身,加快gc
            node.next = node;
        }
}

//调用await方法的线程,在一段时间内等待CountDownLatch的计数值等于0,在等待CountDownLatch的计数值等于0的时候等待过程当中被中断,抛出中断异常,支持中断
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        //调用sync从AQS中继承下来的tryAcquireSharedNanos方法在必定时间内进行判断计数值是否等于0,即AQS的属性state是否等于0,可能阻塞当前线程,在必定时间内CountDownLatch的计数值不等于0,返回失败,不然CountDownLatch的计数值等于0,唤醒当前线程 
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//Sync从AbstractQueuedSynchronizer继承下来的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        //检查线程是否有被中断 
        if (Thread.interrupted())
            //有的话直接抛出中断异常
            throw new InterruptedException();
        //调用Sync重写AQS的tryAcquireShared方法,若是CountDownLatch的计数值等于0,返回1,不然返回-1,若是返回-1调用doAcquireSharedNanos方法,详细的能够看下Sync的tryAcquireShared方法 
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
}

//Sync从AbstractQueuedSynchronizer继承下来的doAcquireSharedNanos方法,死循环的判断CountDownLatch的计数值是否等于0,直到超时、或者获取到写锁
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        //若是传入的超时时间小于等于0 
        if (nanosTimeout <= 0L)
            //返回调用await(long timeout, TimeUnit unit)方法的线程等待CountDownLatch的计数值等于0失败
            return false;
        //当前时间加上超时时间获得死亡时间
        final long deadline = System.nanoTime() + nanosTimeout;
        //根据当前线程和共享模式建立新节点,addWaiter方法能够看上面的介绍 
        final Node node = addWaiter(Node.SHARED);
        //在等待CountDownLatch的计数值等于0是否失败,若是失败在finally中将节点从同步队列中移除
        boolean failed = true;
        try {
            for (;;) {
                //获取新建节点的前置节点 
                final Node p = node.predecessor();
                //新建节点的前置节点若是为头节点 
                if (p == head) {
                    //调用Sync重写AQS的tryAcquireShared方法,若是CountDownLatch的计数值等于0,返回1,不然返回-1,若是返回-1调用doAcquireSharedNanos方法,详细的能够看下Sync的tryAcquireShared方法 
                    int r = tryAcquireShared(arg);
                    //若是r大于0,即r等于1,CountDownLatch的计数值等于0
                    if (r >= 0) {
                        //设置当前节点为头节点,而且唤醒等待CountDownLatch的计数值等于0的下一个节点,能够看下面对setHeadAndPropagate方法的介绍 
                        setHeadAndPropagate(node, r);
                        //新建节点的前置节点的下一节点设置为空 
                        p.next = null; // help GC
                        //等待CountDownLatch的计数值等于0标志位成功
                        failed = false;
                        //返回等待CountDownLatch的计数值等于0成功
                        return true;
                    }
                }
                //死亡时间减去当前时间,获得超时时间 
                nanosTimeout = deadline - System.nanoTime();
                //若是超时时间小于等于0,直接返回等待CountDownLatch的计数值等于0失败
                if (nanosTimeout <= 0L)
                    //返回等待CountDownLatch的计数值等于0失败 
                    return false;
                //shouldParkAfterFailedAcquire方法在上一次等待CountDownLatch的计数值等于0失败时,是否须要阻塞,根据当前节点的前置节点状态来判断,详细的能够看上面的介绍
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold) //超时时间大于必定的阈值,才会阻塞等待CountDownLatch的计数值等于0的线程
                    //超时阻塞要等待CountDownLatch的计数值等于0的线程
                    LockSupport.parkNanos(this, nanosTimeout);
                //在等待CountDownLatch的计数值等于0的线程被唤醒时,等待的过程当中有其余线程发起中断请求,直接抛出中断异常
                if (Thread.interrupted())
                    //抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            //若是等待CountDownLatch的计数值等于0标志位失败 
            if (failed)
                //若是等待CountDownLatch的计数值等于0标志位失败,从同步队列中移除当前节点,根据当前节点的前置节点状态是否唤醒当前节点的不为空的下一节点线程,cancelAcquire方法能够看上面详细介绍
                cancelAcquire(node);
        }
}
复制代码

6、线程等待释放流程

//CountDownLatch的计数值减1操做,即AQS的属性state值作减1操做,使用Sync对AQS的属性state值作减1操做
public void countDown() {
        sync.releaseShared(1);
}

//Sync从AbstractQueuedSynchronizer继承下来的releaseShared方法,参数arg没有使用到
public final boolean releaseShared(int arg) {
        //调用Sync重写AQS的tryReleaseShared方法,尝试将CountDownLatch的计数值减1,若是计数值减一操做成功,而且减1操做后计数值等于0,表示CountDownLatch的计数值等于0,唤醒等待CountDownLatch的计数值等于0的全部线程
        if (tryReleaseShared(arg)) {
            //唤醒等待CountDownLatch的计数值等于0的全部线程
            doReleaseShared();
            //返回计数值减一操做,而且减1操做后计数值等于0,唤醒等待CountDownLatch的计数值等于0的全部线程成功
            return true;
        }
        //返回失败,可能计数值减一操做成功,但减1操做后计数值不等于0,或者计数值原先就等于0 
        return false;
}

//Sync从AbstractQueuedSynchronizer继承下来的doReleaseShared方法,唤醒等待CountDownLatch的计数值等于0的全部线程
private void doReleaseShared() {        
        for (;;) {
            // 将同步队列头赋值给节点h
            Node h = head;
            // 若是节点h不为null,且不等于同步队列尾 
            if (h != null && h != tail) {
                // 获得节点h的状态
                int ws = h.waitStatus;
                //若是头节点的状态是Node.SIGNAL,就要唤醒节点h后继节点的线程
                if (ws == Node.SIGNAL) {
                    // 将节点h的状态设置成0,若是设置失败,就继续循环,再试一次。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    // 唤醒头节点h后继节点的线程
                    unparkSuccessor(h);
                }
                // 若是节点h的状态是0,就设置ws的状态是PROPAGATE。 
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            // 若是同步队列头head节点发生改变,继续循环,
            // 若是没有改变,就跳出循环 
            if (h == head)  
                break;
        }
    }
复制代码

7、其余方法

//获取CountDownLatch当前的计数值
public long getCount() {
        //直接调用sync的getCount方法,获取计数值,即AQS的state属性
        return sync.getCount();
}

//CountDownLatch重写Object的toString方法
public String toString() {
        //返回Object的toString方法和CountDownLatch的计数值组合
        return super.toString() + "[Count = " + sync.getCount() + "]";
}
复制代码
相关文章
相关标签/搜索