从源码看JDK8并发工具类CountDownLatch的实现原理

CountDownLatch,是几个重要的并发编程工具类之一,字面意思就是门锁的意思,内部会维护一个计数器的常量,这个常量表明执行的线程数。java

在多线程协做完成业务功能时,有时候须要等待其余多个线程完成任务以后,主线程才能继续往下执行业务功能,在这种的业务场景下,一般可使用Thread类的join方法,让主线程等待被join的线程执行完以后,主线程才能继续往下执行。固然,使用线程间消息通讯机制也能够完成。其实,java并发工具类中为咱们提供了相似“倒计时”(CountDownLatch)这样的工具类,能够十分方便的完成所说的这种业务场景。编程

CountDownLatch容许一个或多个线程等待其余线程完成操做,调用await()方法的线程回去判断count的值来判断是否会被挂起,它会等待直到count值为0才会继续执行。控制台输出count=0最后输出,这个时候就看cpu切换到哪一个线程上执行了,在初始化的时候咱们会设置好count的值,当每调用一次countDown()方法,会使count的值减一也就是将AQS维护同步状态的state值减一。安全

在咱们阅读源码以前,若是你看过AQS源码(www.jianshu.com/p/e0066f934…)与跟可重入锁(www.jianshu.com/p/5d57573b0…)相关的内容,你会更加对CountDownLatch自己是如何实现的以及他的本质有一个更透彻的理解。bash

说说个人理解以前看看他的类多线程

能够看到他一样运用了一个继承了AQS同步器的静态内部类来重写父类AQS里面的一些方法而后再调用该父类里面的获取锁的方法来实现具体的功能。

来看看构造方法中:并发

//设置初始化count的值,并传递给Sync类
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码

Sync类的源码以下:CountDownLatch的实现依赖于AQSdom

先介绍下两个方法 countDown()每执行一次该方法,也就是将由AQS维护的同步状态值state值减1,其通常是执行任务的线程调用。 调用countDown()释放同步状态,每次调用同步状态值-1。ide

public void countDown() {
        sync.releaseShared(1);
    }

//父类AQS中
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {   //若是释放同步状态线程成功,若是返回false,则表示,获取失败同步状态。
  //返回flase,以CountDownLatch的实现角度来说,此时还要等待N(N>0)个线程,由于state还没减到等于0,若是返回true,表示此时已经执行N次了,此时state已经减到0了,这时候会执行doReleaseShared(),表示释放其余处于等待的节点。
            doReleaseShared();   //唤醒后续处于等待的节点,看下面具体的解释。
            return true;
        }
        return false;
    }

//在CountDownLatch的静态内部工具类Sync继承了AQS重写的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // 自旋
            for (;;) {
                int c = getState();  //获取AQS维护的state值
                if (c == 0)   //若是为0,表示没有一个线程在运行返回false
                    return false;
                int nextc = c-1;     //若是不等于0,这里确定会>0的,因此减去1
                if (compareAndSetState(c, nextc))  //CAS去直接修改内存地址的偏移量去修改值,保证线程安全。
                 return nextc == 0;       //重点来了。这里的意思是若是共享式获取同步状态后,state还不是为0,则获取失败。返回false
            }
        }
复制代码

下面这个类,在个人这篇文章也解析过了。 工具

await(),当执行该方法是,内部会检查那个计数常量的值,若是不等于0,就会进入等待(waiting)状态,直到执行了countDown使内部的值减到0的时候,就会恢复线程,同时执行,咱们来看看实现:测试

public void await() throws InterruptedException {
        //共享式获取
        sync.acquireSharedInterruptibly(1);
    }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())  //响应中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)   
      //tryAcquireShared(arg) 返回1,此时state=0不阻塞,返回的是-1,执行doAcquireSharedInterruptibly(arg);
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;   
        }
复制代码

这里须要解释下doAcquireSharedInterruptibly的主要做用:一、将当前线程构形成共享模式的节点,经过自旋的方式尝试获取同步状态二、若是获取同步状态成功,则唤醒后续处于共享模式的节点;若是没有获取到同步状态,则对调用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法挂起当前线程,这样能够避免该线程无限循环而获取不到共享锁,从而形成资源浪费。这里须要注意的是:当有多个线程调用await()方法时,这些线程都会经过addWaiter(Node.SHARED)方法被构形成节点加入到等待队列中。当最后一个调用countDown()方法的线程执行了countDown()后(这里有点拗口),会唤醒处于等待队列中距离头节点最近的一个节点,也就是说该线程被唤醒以后会继续自旋尝试获取同步状态,此时执行到tryAcquireShared(int)方法时,发现r大于0(由于state已经被置为0了),该线程就会调用setHeadAndPropagate(Node, int)方法将唤醒传递下去,而且退出当前循环,开始执行awat()方法以后的代码。

而后说说CountDownLatch的两种用法:

1.能够设置new CountDownLatch(1); 若是须要控制多个线程同时开始执行的时候,能够每一个线程刚开始执行run的时候,先执行await, 进入等待状态。当最后全部线程都准备好了,就调用countDown,减一,这时全部线程就会主动同时开始执行。 2.假设能够设置new CountDownLatch(10),这时有10个线程,咱们须要作的是等10个线程,依次执行countDown(),等到全部线程都执行好了,这时候再执行await。全部线程都准备就绪了。

await(long timeout,TimeUtil unit) 做用使线程在指定的最大时间内,处于await状态,超过这个时间就会自动唤醒了。 getCount()
可以获取当前计数的值。

下面举一个实现的例子:

默认10个运动员进行跑步比赛的全过程:

public class MyThread extends Thread{

    /**等待运动员到来*/
    private CountDownLatch comingTag;
    /**等待裁判说开始*/
    private CountDownLatch waitTag;
    /** 等待起跑*/
    private CountDownLatch waitRunTag;
    /**起跑*/
    private CountDownLatch beginTag;
    /** 全部运动员道终点*/
    private CountDownLatch endTag;

    public MyThread(CountDownLatch comingTag, CountDownLatch waitTag, CountDownLatch waitRunTag, CountDownLatch beginTag, CountDownLatch endTag) {
        super();
        this.comingTag = comingTag;
        this.waitTag = waitTag;
        this.waitRunTag = waitRunTag;
        this.beginTag = beginTag;
        this.endTag = endTag;
    }

    @Override
    public void run() {
        try {
            System.out.println("运动员正陆续入场");
            Thread.sleep((int)Math.random()*10000);
            System.out.println(Thread.currentThread().getName()+"到起跑点了");
            comingTag.countDown();
            System.out.println("等待裁判说准备");
            waitTag.await();
            System.out.println("准备。。。。。开始");
            waitRunTag.countDown();
            beginTag.await();
            System.out.println(Thread.currentThread().getName()+"开始跑,而且跑步过程不肯定");
            Thread.sleep((int)Math.random()*10000);
            endTag.countDown();
            System.out.println(Thread.currentThread().getName()+"到达终点");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

测试类:

public class Run {

    public static void main(String[] args) {


        CountDownLatch comingTag = new CountDownLatch(10);
        CountDownLatch waitTag=new CountDownLatch(1);
        CountDownLatch waitRunTag = new CountDownLatch(10);
        CountDownLatch beginTag=new CountDownLatch(1);
        CountDownLatch endTag = new CountDownLatch(10);

        MyThread[] threads=new MyThread[10];

        for(int i=0;i<threads.length;i++){
            threads[i]=new MyThread(comingTag,waitTag,waitRunTag,beginTag,endTag);
            threads[i].setName("运动员"+(i+1));
            threads[i].start();
        }

        try {
            System.out.println("裁判正在等待选手的到来。。。。");
            comingTag.await();
            System.out.println("全部的运动员都到齐了,准备开始,各就位。。。。预备");
            Thread.sleep(5000);
            waitTag.countDown();
            System.out.println("各就各位。。。。");
            waitRunTag.await();
            Thread.sleep(2000);
            System.out.println("命令枪,开!!!");
            beginTag.countDown();
            endTag.await();
            System.out.println("全部运动员都到得终点了。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
复制代码

image.png
image.png

通常来讲,都会把CountDownLatch与CyclicBarrier进行比较?

CountDownLatch通常用于某个线程A等待若干个其余线程执行完任务以后,它才执行;而CyclicBarrier通常用于一组线程互相等待至某个状态,而后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等你们都完成,再携手共进。 调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程所有都到达了指定点的时候,才能继续往下执行; CountDownLatch方法比较少,操做比较简单,而CyclicBarrier提供的方法更多,好比可以经过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,而且CyclicBarrier的构造方法能够传入barrierAction,指定当全部线程都到达时执行的业务功能; CountDownLatch是不能复用的,而CyclicBarrier是能够复用的。就是说,当CountDownLatch执行countDown时若是此时countDown执行的state的值减到0了,这时候再调用,不能循环执行了,而CyclicBarrier是能够的,能够看一下这篇文章: www.jianshu.com/p/ff6c2ef5e…

整理不易,喜欢能够关注我

相关文章
相关标签/搜索