并发工具类(二)同步屏障CyclicBarrier

前言

  JDK中为了处理线程之间的同步问题,除了提供锁机制以外,还提供了几个很是有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。html

简介

  CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。java

构造方法摘要
bash

方法名称 说明
CyclicBarrier(int parties) 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。
CyclicBarrier(int parties, Runnable barrierAction) 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。

方法摘要
多线程

方法名称 说明
public int await() throws
InterruptedException, BrokenBarrierException
在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。
返回:
到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,
零指示最后一个到达的线程.
public int await(long timeout,TimeUnit unit) throws
InterruptedException,BrokenBarrierException,
ITimeoutException
在全部参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。
public void reset() 将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在因为其余缘由形成损坏以后,实行重置可能会变得很复杂;
public boolean isBroken() 查询此屏障是否处于损坏状态。
public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
public int getParties() 返回要求启动此 barrier 的参与者数目。

注意:并发

  • 对于失败的同步尝试,CyclicBarrier 使用了一种要么所有要么全不 (all-or-none) 的破坏模式:
    若是由于中断、失败或者超时等缘由,致使线程过早地离开了屏障点,那么在该屏障点等待的其余全部线程也将经过 BrokenBarrierException(若是它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
  • 内存一致性效果:
    线程中调用 await() 以前的操做 happen-before 那些是屏障操做的一部份的操做,后者依次 happen-before 紧跟在从另外一个线程中对应 await() 成功返回的操做。
@ Example1 屏障操做的例子

public static void main(String[] args) {
    //设置5个屏障,而且有屏障操做
    CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
        @Override
        public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"执行了屏障操做");        
        }
    });
    
    for(int i=0;i<5;i++){
       //建立5个线程
        Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
        thread.start();
    }
}
复制代码

class MyRunable implements Runnable{

    CyclicBarrier barrier;
    public MyRunable(CyclicBarrier barrier ){
         this.barrier = barrier; 
    }
    
    @Override
    public void run() {
        //一系列操做...
         System.out.println("线程 "+Thread.currentThread().getName()+" 到达了屏障点!");
         try {
             int index = barrier.await();
            if(index== (barrier.getParties()-1)){
                //第一个到达屏障点的线程,执行特殊操做....
                System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是第一个到达屏障点");
            }else if(index == 0){//最后一个到达屏障点的线程
                System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是最后一个到达屏障点");
            }else{
                System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!");
            }
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
复制代码

运行结果:
app

线程 thread_1 到达了屏障点!
线程 thread_4 到达了屏障点!
线程 thread_3 到达了屏障点!
线程 thread_0 到达了屏障点!
线程 thread_2 到达了屏障点!
线程thread_3执行了屏障操做
全部线程到达屏障点,线程 thread_3 被唤醒!!此线程是最后一个到达屏障点
全部线程到达屏障点,线程 thread_0 被唤醒!!
全部线程到达屏障点,线程 thread_4 被唤醒!!
全部线程到达屏障点,线程 thread_1 被唤醒!!此线程是第一个到达屏障点
全部线程到达屏障点,线程 thread_2 被唤醒!!
复制代码

 上面的例子,使用了传入屏障操做的Runable参数的构造方法,ide

屏障操做是由最后一个到达屏障点的线程执行的,这是不能够改变的

。然而,在实际使用中,工具

可能会出现由第n个到达屏障点的线程执行特殊的操做(或者说 屏障操做),那么就可使用 CyclicBarrier.await()进行判断

,如上面的例子,第一个和最后一个到达屏障点的线程都执行特殊的操做。this

   顺便说一下,可能会对本例子中前5个输出的顺序 有所疑惑:thread_3 经过awiat()方法返回的索引值,可知 thread_3 是最后一个到达屏障点的,但为何输出的顺序倒是第三个,而不是最后一个;在这就要真正理解CyclicBarrier,CyclicBarrier 本质上是一把锁,多个线程在使用CyclicBarrier 对象时,是须要先获取锁,即须要互斥访问,因此调用await( )方法不必定可以立刻获取锁。上面的例子,是先打印输出,再去获取锁,因此输出顺序不是到达屏障点的顺序。
spa

@ Example2 应用场景

   下面的例子是:CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。好比咱们用一个Excel保存了用户全部银行流水,每一个Sheet保存一个账户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

public class BankWaterService implements Runnable {
    
    //建立4个屏障,处理完后执行当前类的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4,this);
    
    //假设只有4个sheet,因此只启动4个线程
    private Executor excutor = Executors.newFixedThreadPool(4);
    
    //保存每一个sheet计算出的结果
    private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    
    private void count(){
        for(int i=0;i<4;i++){
            excutor.execute(new Runnable() {
                
                @Override
                public void run() {
                    //计算过程.....
                    //存储计算结果
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //计算完成,插入屏障
                        barrier.await();
                        //后续操做,将会使用到四个线程的运行结果....
                        System.out.println("线程"+Thread.currentThread().getName()+"运行结束,最终的计算结果:"+sheetBankWaterCount.get("result"));
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    @Override
    public void run() {
        int result = 0;
        for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
            result += item.getValue();
        }
        sheetBankWaterCount.put("result", result);
    }
    
    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}
复制代码

运行结果:

线程pool-1-thread-4运行结束,最终的计算结果:4
线程pool-1-thread-2运行结束,最终的计算结果:4
线程pool-1-thread-1运行结束,最终的计算结果:4
线程pool-1-thread-3运行结束,最终的计算结果:4
复制代码

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch: 一个线程(或者多个线程), 等待另外N个线程完成某个事情以后才能执行。而这N个线程经过调用CountDownLatch.countDown()方法 来告知“某件事件”完成,即计数减一。而一个线程(或者多个线程)则经过CountDownLatch.awiat( ) 进入等待状态,直到 CountDownLatch的计数为0时,才会所有被唤醒
  • CyclicBarrier : N个线程相互等待,任何一个线程完成某个事情以前,全部的线程都必须等待。
    CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数同样, 只不过是递减的.
    而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
  • CountDownLatch只能使用一次,CyclicBarrier则能够经过reset( )方法重置后,从新使用。因此
    CyclicBarrier能够用于更复杂的业务场景。
    例如:计算错误,能够重置计数器,并让线程从新执行一次。

文章源地址:https://www.cnblogs.com/jinggod/p/8494193.html

相关文章
相关标签/搜索