JUC锁框架——CountDownLatch

CountDownLatch简单介绍

CountDownLatch是同步工具类之一,能够指定一个计数值,在并发环境下由线程进行减1操做,当计数值变为0以后,被await方法阻塞的线程将会唤醒,实现线程间的同步。java

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,以后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。node

CountDownLatch的简单应用

public static void main(String[] args) {
        int threadNum = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int tNum = i + 1;
            new Thread(() -> {
                System.out.println("thread " + tNum + " start");
                Random random = new Random();
                try {
                    Thread.sleep(random.nextInt(10000) + 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread " + tNum + " finish");
                countDownLatch.countDown();//每个线程执行完就会减一
            }).start();
        }
        //主线程启动10个子线程后阻塞在await方法,须要等子线程都执行完毕,主线程才能唤醒继续执行。
        try {
            countDownLatch.await();//当前线程被阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(threadNum + " thread finish");
    }

CountDownLatch的源码分析

CountDownLatch和ReentrantLock同样,内部使用Sync继承AQS。Sync的构造函数接收了计数值并设置为state。咱们知道AQS的state是一个由子类决定含义的“状态”。对于ReentrantLock来讲,state是线程获取锁的次数;对于CountDownLatch来讲,则表示计数值的大小。并发

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Sync(int count) {
    setState(count);//设置state,表示计数值的大小
}

CountDownLatch的await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())//判断当前线程是否被中断
        throw new InterruptedException();
    //tryAcquireShared:是否须要阻塞当前线程
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);//由CountDownLatch实现阻塞当前线程的逻辑。
}
/**
* tryAcquireShared方法其实就是判断一下当前计数器的值,是否为0。
* 为0则返回1:表示当前线程不会阻塞在countDownLatch.await()
*/
protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//建立共享的node节点,并加入等待队列
    boolean failed = true;
    try {
        for (;;) {//死循环
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//判断state是否为0
                if (r >= 0) {
                    setHeadAndPropagate(node, r);//设置本身为head,并通知其余等待的线程
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

以上对于countDownLatch.await()方法是怎么“阻塞”当前线程的咱们已经很明白了,即当咱们调用了countDownLatch.await()方法后,你当前线程就会进入了一个死循环当中,在这个死循环里面,会不断的进行判断,经过调用tryAcquireShared方法,不断判断咱们上面说的那个计数器,看看它的值是否为0了(为0的时候,其实就是咱们调用了足够屡次数的countDownLatch.countDown()方法的时候),若是是为0的话,tryAcquireShared就会返回1,代码会执行if(r>=0){...},而后跳出了循环,也就再也不“阻塞”当前线程了。须要注意的是,说是在不停的循环,其实也并不是在不停的执行for循环里面的内容,由于在后面调用parkAndCheckInterrupt()方法时,在这个方法里面是会调用 LockSupport.park(this);,来禁用当前线程的。dom

CountDownLatch的countDown方法

//释放锁的操做,每调用一次,计数值减小1
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放锁,具体实如今CountDownLatch中
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))//state的减1操做
            return nextc == 0;//当计数值等于0,表明全部子线程都执行完毕,被await阻塞的线程就能够被唤醒
    }
}
//唤醒被await阻塞的线程
private void doReleaseShared() {
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
               //头节点状态若是SIGNAL,则状态重置为0,
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;
               unparkSuccessor(h);//唤醒下个节点
           }
           //被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态,
           //表明状态要向后传播。
           else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;
       }
       if (h == head)
           break;
   }
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//设置非CANCELLED节点的等待状态为0
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒节点
}
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())//若是节点为共享类型
            doReleaseShared();//释放,下个节点被唤醒后,再次唤醒后续的等待节点,达到共享状态向后传播。
    }
}

参考地址:函数

相关文章
相关标签/搜索