CountDownLatch与CyclicBarrier的基本使用

1 概述

CountDownLatch以及CyclicBarrier都是Java里面的同步工具之一,本文介绍了二者的基本原理以及基本使用方法。java

2 CountDownLatch

CountDownLatch是一个同步工具类,常见的使用场景包括:dom

  • 容许一个或多个线程等待一系列的其余线程结束
  • 在串行化任务中须要进行并行化处理,并等待全部并行化任务结束,串行化任务才能继续进行

好比考虑这样一个场景,在一个电商网站中,用户点击了首页,须要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:工具

  • 获取商品
  • 计算售价
  • 返回全部商品的最终售价

解决这样的问题可使用串行化或并行化操做,串行化就是逐一计算商品的售价,并返回,并行化就是获取商品后,并行计算每个商品的售价,最后返回,显而后一种方案要比前一种要好,那么这时候就能够用上CountDownLatch了。网站

一份简单的模拟代码以下:this

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        //计数器大小为商品列表的长度
        final CountDownLatch latch = new CountDownLatch(list.size());
        //线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price ");
            try{
                //随机休眠模拟业务操做耗时
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                //每完成计算一个商品,将计数器减1,注意须要放在finally中
                latch.countDown();
            }
        }));
        //主线程阻塞直到全部的计数器为0,也就是等待全部的子任务计算价格完毕
        latch.await();
        System.out.println("All of prices calculate finished");
        //手动终止,否则不会结束运行
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

输出:spa

在这里插入图片描述

代码比较简单,关键地方用上了注释,能够看到代码执行顺序以下:线程

  • 建立多个任务计算商品的价格
  • 主线程阻塞
  • 计算完成后,将计数器减1
  • 当计数器为0时,主线程退出阻塞状态

值得注意的是计数器减1的操做须要放在finally中,由于有可能会出现异常,若是出现异常致使计数器不能减小,那么主线程会一直阻塞。code

另外,CountDownLatch还有一个await(long timeout,TimeUnit unit)方法,是带有超时参数的,也就是说,若是在超时时间内,计数器的值仍是大于0(还有任务没执行完成),会使得当前线程退出阻塞状态。队列

3 CyclicBarrier

CyclicBarrierCountDownLatch有不少相似的地方,也是一个同步工具类,容许多个线程在执行完相应的操做以后彼此等待到达同一个barrier point(屏障点)。CyclicBarrier也适合某个串行化的任务被拆分为多个并行化任务,这点与CountDownLatch相似,可是CyclicBarrier具有的一个更强大的功能是,CyclicBarrier能够被重复使用。图片

3.1 等待完成

先简单说一下CyclicBarrier的实现原理:

  • 初始化CyclicBarrier,传入一个int参数,表示分片(parites),一般意义上来讲分片数就是任务的数量
  • 同时串行化执行多个任务
  • 任务执行完成后,调用await(),等待其余线程也到达barrier point
  • 当全部线程到达后,继续以串行化方式运行任务

常见的使用方法是设置分片数为任务数+1,这样,能够在主线程中执行await(),等待全部子任务完成。好比下面是使用CyclicBarrier实现一样功能的模拟代码:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price ");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished");
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

输出相同,代码大部分类似,不一样的地方有:

  • latch.countDown()替换成了barrier.await()
  • latch.await()替换成了barrier.await()
  • 线程池的核心线程数替换成了10

await()方法会等待全部的线程到达barrier point,上面代码执行流程简述以下:

  • 初始化CyclicBarrier,分片数为11(子线程数+1)
  • 主线程调用await(),等待子线程执行完成
  • 子线程各自进行商品价格的计算,计算完成后,调用await(),等待其余线程也到达barrier point
  • 当全部子线程计算完成后,因为没有后续操做,因此子线程运行结束,同时因为主线程还有后续操做,会先输出提示信息再终止线程池

注意一个很大的不一样就是这里的线程池核心线程数目改为了 10,那么,为何须要10?

由于若是是设置一个小于10的核心线程个数,因为线程池是会先建立核心线程来执行任务,核心线程满了以后,放进任务队列中,而假设只有5个核心线程,那么:

  • 5个线程进行计算价格
  • 另外5个任务放在任务队列中

这样的话,会出现死锁,由于计算中的线程须要队列中的任务到达barrier point才能结束,而队列中的任务须要核心线程计算完毕后,才能调度出来计算,这样死锁就出现了。

3.2 重复使用

CyclicBarrierCountDownLatch的一个最大不一样是,CyclicBarrier能够被重复使用,原理上来讲,await()会将内部计数器减1,当计数器减为0时,会自动进行计数器(分片数)重置。好比,在上面的代码中,因为赶上促销活动,须要对商品的价格再次进行计算:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price.");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed.");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished.");
        
        //复制的一段相同代码
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price again.");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed.");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished again.");
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

将计算价格的代码复制一遍,其中没有手动修改计数器,只是调用await(),输出以下:

在这里插入图片描述

能够看到,并无对CycliBarrier进行相似reset之类的操做,可是依然能按正常逻辑运行,这是由于await()内部会维护一个计数器,当计数器为0的时候,会自动进行重置,下面是await()OpenJDK 11下的源码:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return this.dowait(false, 0L);
    } catch (TimeoutException var2) {
        throw new Error(var2);
    }
}
    
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    ReentrantLock lock = this.lock;
    lock.lock();

    byte var9;
    try {
        //...
        int index = --this.count;
        if (index != 0) {
            //计数器不为0的状况
            //....
        }

        boolean ranAction = false;

        try {
            Runnable command = this.barrierCommand;
            if (command != null) {
                command.run();
            }

            ranAction = true;
            
            this.nextGeneration();
            var9 = 0;
        } finally {
            if (!ranAction) {
                this.breakBarrier();
            }

        }
    } finally {
        lock.unlock();
    }

    return var9;
}

private void nextGeneration() {
    this.trip.signalAll();
    this.count = this.parties;
    this.generation = new CyclicBarrier.Generation();
}

当计数器为0时,会生成新的Generation,并将var9置为0,最后返回var9(在这个方法中var9只有一处赋值,就是代码中的var9=0,能够理解成直接返回0)。

3.3 CyclicBarrier其余的一些经常使用方法

  • CyclicBarrier(int parties,Runnable barrierAction):构造的时候传入一个Runnable,表示全部线程到达barrier point时,会调用该Runnable
  • await(long timeout,TimeUnit unit):与无参的await()相似,底层调用的是相同的doWait(),不过增长了超时功能
  • isBroken():返回broken状态,某个线程因为执行await而进入阻塞,此时若是执行了中断操做(好比interrupt),那么isBroken()会返回true。须要注意,处于broken状态的CyclicBarrier不能被直接使用,须要调用reset()进行重置

4 总结

下面是CountDownLatchCyclicBarrier的一些简单比较,相同点以下:

  • 都是java.util.concurrent包下的线程同步工具类
  • 均可以用于“主线程阻塞一直等待,直到子任务完成,主线程才继续执行”的状况

不一样点:

  • CountDownLatchawait()方法会等待计数器归0,而CyclicBarrierawait()会等待其余线程到达barrier point
  • CyclicBarrier内部的计数器是能够被重置的,可是CountDownLatch不能够
  • CyclicBarrier是由LockCondition实现的,而CountDownLatch是由同步控制器AQS实现的
  • 构造时CyclicBarrier不容许parties为0,而CountDownLatch容许count为0
相关文章
相关标签/搜索