CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析


本文主要介绍和对比咱们经常使用的几种并发工具类,主要涉及 CountDownLatchCyclicBarrierSemaphoreExchanger 相关的内容,若是对多线程相关内容不熟悉,能够看笔者以前的一些文章:java


  • 介绍 CountDownLatchCyclicBarrier 二者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段,
  • 介绍 SemaphoreExchanger 的使用,semaphore 是信号量,能够用来控制容许的线程数,而 Exchanger 能够用来交换两个线程间的数据。

CountDownLatch

  • CountDownLatchJDK5 以后加入的一种并发流程控制工具,它在 java.util.concurrent 包下
  • CountDownLatch 容许一个或多个线程等待其余线程完成操做,这里须要注意,是能够是一个等待也能够是多个来等待
  • CountDownLatch 的构造函数以下,它接受一个 int 类型的参数做为计数器,即若是你想等待N 个线程完成,那么这里就传入 N
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
  • 其中有两个核心的方法 countDownawait ,其中 当咱们调用 countDown 方法时相应的 N 的值减 1,而 await 方法则会阻塞当前线程,直到 N 的值变为零。
  • 提及来比较抽象,下面咱们经过实际案例来讲明。

多个线程等待一个线程

  • 在咱们生活中最典型的案例就是体育中的跑步,假设如今咱们要进行一场赛跑,那么全部的选手都须要等待裁判员的起跑命令,这时候,咱们将其抽象化每一个选手对应的是一个线程,而裁判员也是一个线程,那么就是多个选手的线程再等待裁判员线程的命令来执行
  • 咱们经过 CountDownLatch 来实现这一案例,那么等待的个数 N 就是上面的裁判线程的个数,即为 1,
/**
     * @url i-code.onlien
     * 云栖简码
     */
    public static void main(String[] args) throws InterruptedException {
        //模拟跑步比赛,裁判说开始,全部选手开始跑,咱们可使用countDownlatch来实现

        //这里须要等待裁判说开始,因此时等着一个线程
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手1").start();
        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("裁判:预备~~~");
        countDownLatch.countDown();
        System.out.println("裁判:跑~~~");
    }
  • 运行结果以下:

在上述代码中,咱们首先建立了一个计数为1 的 CountDownLatch 对象,这表明咱们须要等待的线程数,以后再建立了两个线程,用来表明选手线程,同时在选手的线程中咱们都调用了 await 方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程 main 方法中咱们等待 1秒后执行 countDown 方法,这个方法就是减一,此时的 N 则为零了,那么选手线程则开始执行后面的内容,总体的输出如上图所示面试

一个/多个线程等待多个线程

  • 一样从咱们生活中的场景来抽象,假设公司要组织出游,大巴车接送,当凑够五我的大巴车则发车出发,这里就是大巴车须要等待这五我的所有到齐才能继续执行,咱们抽象以后用 CountDownLatch 来实现,那么的计数个数 N 则为5,由于要等待这五个,经过代码实现以下:
public static void main(String[] args) throws InterruptedException {
        /**
         * i-code.online
         * 云栖简码 
         */
        //等待的个数
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "从住所出发...");
                try {
                    TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 到达目的地-----");
                countDownLatch.countDown();
            },"人员-"+i).start();
        }

        System.out.println("大巴正在等待人员中.....");
        countDownLatch.await();
        System.out.println("-----全部人到齐,出发-----");
    }
  • 上面代码执行结果以下:

从上述代码中咱们能够看到,定义了一个计数为5的 countDownLatch ,以后经过循环建立五个线程,模拟五我的员,当他们到达指定地点后执行 countDown 方法,对计数减一。主线程至关因而大巴车的线程,执行 await 方法进行阻塞,只有当 N 的值减到0后则执行后面的输出算法

CountDownLatch 主要方法介绍

  • 构造函数:
public CountDownLatch(int count) {  };

它的构造函数是传入一个参数,该参数 count 是须要倒数的数值。数据库

  • await() :调用 await() 方法的线程开始等待,直到倒数结束,也就是 count 值为 0 的时候才会继续执行。
  • await(long timeout, TimeUnit unit)await() 有一个重载的方法,里面会传入超时参数,这个方法的做用和 await() 相似,可是这里能够设置超时时间,若是超时就再也不等待了。
  • countDown():把数值倒数 1,也就是将 count 值减 1,直到减为 0 时,以前等待的线程会被唤起。

上面的案例介绍了 CountDownLatch 的使用,可是 CountDownLatch 有个特色,那就是不可以重用,好比已经完成了倒数,那可不能够在下一次继续去从新倒数呢?是能够的,一旦倒数到0 则结束了,没法再次设置循环执行,可是咱们实际需求中有不少场景中须要循环来处理,这时候咱们可使用 CyclicBarrier 来实现编程

CyclicBarrier

  • CyclicBarrierCountDownLatch 比较类似,当等待到必定数量的线程后开始执行某个任务
  • CyclicBarrier 的字面意思是能够循环使用的屏障,它的功能就是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开会,此时全部被屏障阻塞的线程都将继续执行。以下演示

  • 上图中能够看到,到线程到达屏障后阻塞,直到最后一个也到达后,则所有放行
  • 首先咱们来看下它的构造函数,以下:
public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • CyclicBarrier(int parties) 构造函数提供了int 类型的参数,表明的是须要拦截的线程数量,而每一个线程经过调用 await 方法来告诉 CyclicBarrier 我到达屏障点了,而后阻塞
  • CyclicBarrier(int parties, Runnable barrierAction) 构造函数是为咱们提供的一个高级方法,加了一个 barrierAction 的参数,这是一个Runnable类型的,也就是一个线程,它表示当全部线程到达屏障后,清闲触发 barrierAction 线程执行,再执行各个线程以后的内容

案例

  • 假设你要和你女友约会,约定了一个时间地点,那么无论大家谁先到都会等待另外一个到才会出发取约会~ 那么这时候咱们经过CyclicBarrier 的来实现,这里咱们须要来拦截的线程就是两个。具体实现 以下:
/*
    CyclicBarrier 与countDownLatch 比较类似,也是等待线程完成,
    不过countDownLatch 是await等待其余的线程经过countDown的数量,达到必定数则执行,
    而 CyclicBarrier 则是直接看await的数量,达到必定数量直接所有执行,
     */
    public static void main(String[] args) {
        //比如情侣约会,无论谁先到都的等另外一个,这里就是两个线程,
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{
            System.out.println("快速收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("到了约会地点等待女友前来~~");
                cyclicBarrier.await();
                System.out.println("女友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        },"男友").start();
        new Thread(() ->{
            System.out.println("慢慢收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("到了约会地点等待男友前来~~");
                cyclicBarrier.await();
                System.out.println("男友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        },"女友").start();

    }
  • 代码执行结果以下:

上面代码,相对简单,建立一个拦截数为2的屏障,以后建立两个线程,调用await方法,只有当调用两次才会触发后面的流程。微信

  • 咱们再写一个案例sh,使用含有Runnable 参数的构造函数;和以前 CountDownLatch 的案例类似,公司组织出游,这时候确定有不少大巴在等待接送,大巴不会等全部的 人都到才出发,而是每坐满一辆车就出发一辆,这种场景咱们就可使用 CyclicBarrier 来实现,实现以下:
/*
    CyclicBarrier是可重复使用到,也就是每当几个知足是再也不等待执行,
    好比公司组织出游,安排了好多辆大把,每坐满一辆就发车,再也不等待,相似这种场景,实现以下:
     */

    public static void main(String[] args) {
        //公司人数
        int peopleNum = 2000;
        //每二十五我的一辆车,凑够二十五则发车~
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
            //达到25人出发
            System.out.println("------------25人数凑齐出发------------");
        });

        for (int j = 1; j <= peopleNum; j++) {
            new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
        }

    }

    static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(name+"从家里出发,正在前往聚合地....");
            try {
                TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name+"到达集合地点,等待其余人..");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

CyclicBarrier 和 CountDownLatch 的异同

相同点:多线程

  • 都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发

不一样点:并发

  • 可重复性:CountDownLatch 的计数器只能使用一次,到到达0后就不能再次使用了,除非新建实例;而 CyclicBarrier 的计数器是能够复用循环的,因此 CyclicBarrier 能够用在更复杂的场景,能够随时调用 reset方法来重制拦截数,如计算发生错误时能够直接充值计数器,让线程从新执行一次。
  • 做用对象:CyclicBarrier 要等固定数量的线程都到达了屏障位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0,也就是说 CountDownLatch 做用于事件,但 CyclicBarrier 做用于线程;CountDownLatch 是在调用了 countDown 方法以后把数字倒数减 1,而 CyclicBarrier 是在某线程开始等待后把计数减 1
  • 执行动做:CyclicBarrier 有执行动做 barrierAction,而 CountDownLatch 没这个功能。

Semaphore

  • Semaphore (信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源,

  • 从图中能够看出,信号量的一个最主要的做用就是,来控制那些须要限制并发访问量的资源。具体来说,信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证(acquire 方法)。线程能够从信号量中去“获取”一个许可证,一旦线程获取以后,信号量持有的许可证就转移过去了,因此信号量手中剩余的许可证要减一。
  • 同理,线程也能够“释放”一个许可证,若是线程释放了许可证(release 方法),这个许可证至关于被归还给信号量了,因而信号量中的许可证的可用数量加一。当信号量拥有的许可证数量减到 0 时,若是下个线程还想要得到许可证,那么这个线程就必须等待,直到以前获得许可证的线程释放,它才能获取。因为线程在没有获取到许可证以前不能进一步去访问被保护的共享资源,因此这就控制了资源的并发访问量,这就是总体思路。

案例

  • 如咱们平时开发中典型的数据库操做,这是一个密集IO 操做,咱们能够启动不少线程可是数据库的链接池是有限制的,假设咱们设置容许五个连接,若是咱们开启太多线程直接操做则会出现异常,这时候咱们能够经过信号量来控制,让一直最多只有五个线程来获取链接。代码以下:
/*
        Semaphore 是信号量, 能够用来控制线程的并发数,能够协调各个线程,以达到合理的使用公共资源
     */

    public static void main(String[] args) {
        //建立10个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(100);
        //设置信号量的值5 ,也就是容许五个线程来执行
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {
            service.submit(() ->{
                try {
                    s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("数据库耗时操做"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在执行....");
                s.release();
            });
        }

    }

如上代码,建立了一个容量100的线程池,模拟咱们程序中大量的线程,添加一百个任务,让线程池执行。建立了一个容量为5的信号量,在线程中咱们调用 acquire 来得到信号量的许可,只有得到了才能只能下面的内容否则阻塞。当执行完后释放该许可,经过 release 方法,dom

  • 经过上面的演示,有没有以为很是眼熟,对,就是和咱们以前接触过的锁很类似,只是锁是只容许一个线程访问,那咱们能不能将信号量的容量设置为1呢? 这固然是能够的,当咱们设置为1时其实就和咱们的锁的功能是一致的,以下代码:
private static int count = 0;
    /*
        Semaphore 中若是咱们容许的的许可证数量为1 ,那么它的效果与锁类似。
     */
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {
            service.submit(() ->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行了");
                count ++;
                semaphore.release();
            });
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }

其余主要方法介绍

  • public boolean tryAcquire()tryAcquire 和锁的 trylock 思惟是一致的,是尝试获取许可证,至关于看看如今有没有空闲的许可证,若是有就获取,若是如今获取不到也不要紧,没必要陷入阻塞,能够去作别的事。
  • public boolean tryAcquire(long timeout, TimeUnit unit):是一个重载的方法,它里面传入了超时时间。好比传入了 3 秒钟,则意味着最多等待 3 秒钟,若是等待期间获取到了许可证,则往下继续执行;若是超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。
  • int availablePermits():返回此信号量中当前可用的许可证数
  • int getQueueLength():返回正在等待许可证的线程数
  • boolean hasQueuedThreads():判断是否有线程正在等待获取许可证
  • void reducePermits(int reduction):减小 reduction 个许可证,是个 protected 方法
  • Collection<Thread> getQueuedThreads():返回正在等待获取许可证的线程集合,是个 protected 方法

Exchanger

  • Exchanger(交换者)是一个用于线程间协做的工具类,它主要用于进行线程间数据的交换,它有一个同步点,当两个线程到达同步点时能够将各自的数据传给对方,若是一个线程先到达同步点则会等待另外一个到达同步点,到达同步点后调用 exchange 方法能够传递本身的数据而且得到对方的数据。
  • 咱们假设如今须要录入一些重要的帐单信息,为了保证准备,让两我的分别录入,以后再进行对比后是否一致,防止错误繁盛。下面经过代码来演示:
public class ExchangerTest {

    /*
    Exchanger 交换, 用于线程间协做的工具类,能够交换线程间的数据,
    其提供一个同步点,当线程到达这个同步点后进行数据间的交互,遗传算法能够如此来实现,
    以及校对工做也能够如此来实现
     */

    public static void main(String[] args) {
        /*
        模拟 两个工做人员录入记录,为了防止错误,二者录的相同内容,程序仅从校对,看是否有错误不一致的
         */

        //开辟两个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger<InfoMsg> exchanger = new Exchanger<>();

        service.submit(() ->{
            //模拟数据 线程 A的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程A";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其余...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程A 交换数据====== 获得"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        service.submit(() ->{
            //模拟数据 线程 B的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程B";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其余...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程B 交换数据====== 获得"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        service.shutdown();
    }

    static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString() {
            return "InfoMsg{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", message='" + message + '\'' +
                    ", content='" + content + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name, message, content, desc);
        }
    }
}
  • 运行结果以下:

上面代码运行能够看到,当咱们线程 A/B 到达同步点即调用 exchange 后进行数据的交换,拿到对方的数据再与本身的数据对比能够作到稽核 的效果ide

  • Exchanger 一样能够用于遗传算法中,选出两个对象进行交互两个的数据经过交叉规则获得两个混淆的结果。
  • Exchanger 中嗨提供了一个方法 public V exchange(V x, long timeout, TimeUnit unit) 主要是用来防止两个程序中一个一直没有执行 exchange 而致使另外一个一直陷入等待状态,这是能够用这个方法,设置超时时间,超过这个时间则再也不等待。


本文由AnonyStar 发布,可转载但需声明原文出处。
欢迎关注微信公帐号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online

相关文章
相关标签/搜索