java 并发工具类CountDownLatch & CyclicBarrier

一块儿在java1.5被引入的并发工具类还有CountDownLatch、CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。html

CountDownLatch

CountDownLatch 概念

CountDownLatch这个类可以使一个线程等待其余线程完成各自的工做后再执行。例如,应用程序的主线程但愿在负责启动框架服务的线程已经启动全部的框架服务以后再执行。java

CountDownLatch是经过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了本身的任务后,计数器的值就会减1。当计数器值到达0时,它表示全部的线程已经完成了任务,而后在闭锁上等待的线程就能够恢复执行任务。并发

图示:框架

伪代码:dom

//Main thread start
//Create CountDownLatch for N threads
//Create and start N threads
//Main thread wait on latch
//N threads completes there tasks are returns
//Main thread resume execution

CountDownLatch 如何工做

CountDownLatch.java类中定义的构造函数:ide

//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}

构造器中的计数值(count)实际上就是闭锁须要等待的线程数量。这个值只能被设置一次,并且CountDownLatch没有提供任何机制去从新设置这个计数值。函数

与CountDownLatch的第一次交互是主线程等待其余线程。主线程必须在启动其余线程后当即调用CountDownLatch.await()方法。这样主线程的操做就会在这个方法上阻塞,直到其余线程完成各自的任务。工具

其余N 个线程必须引用闭锁对象,由于他们须要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是经过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。因此当N个线程都调 用了这个方法,count的值等于0,而后主线程就能经过await()方法,恢复执行本身的任务。测试

使用场景

咱们尝试罗列出在java实时系统中CountDownLatch都有哪些使用场景。我所罗列的都是我所能想到的。若是你有别的可能的使用方法,请在留言里列出来,这样会帮助到你们。this

  1. 实现最大的并行性:有时咱们想同时启动多个线程,实现最大程度的并行性。例如,咱们想测试一个单例类。若是咱们建立一个初始计数为1的CountDownLatch,并让全部线程都在这个锁上等待,那么咱们能够很轻松地完成测试。咱们只需调用 一次countDown()方法就可让全部的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,全部N个外部系统已经启动和运行了。
  3. 死锁检测:一个很是方便的使用场景是,你可使用n个线程访问共享资源,在每次测试阶段的线程数目是不一样的,并尝试产生死锁。

CountDownLatch使用实例

在这个例子中,我模拟了一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,而且启动类一直在闭锁上等待着。一旦验证和检查了全部外部服务,那么启动类恢复执行。

BaseHealthChecker.java:这个类是一个Runnable,负责全部特定的外部服务健康的检测。它删除了重复的代码和闭锁的中心控制代码。

public abstract class BaseHealthChecker implements Runnable {
 
        private CountDownLatch _latch;
        private String _serviceName;
        private boolean _serviceUp;
     
        //Get latch object in constructor so that after completing the task, thread can countDown() the latch
        public BaseHealthChecker(String serviceName, CountDownLatch latch)
        {
            super();
            this._latch = latch;
            this._serviceName = serviceName;
            this._serviceUp = false;
        }
     
        @Override
        public void run() {
            try {
                verifyService();
                _serviceUp = true;
            } catch (Throwable t) {
                t.printStackTrace(System.err);
                _serviceUp = false;
            } finally {
                if(_latch != null) {
                    _latch.countDown();
                }
            }
        }
     
        public String getServiceName() {
            return _serviceName;
        }
     
        public boolean isServiceUp() {
            return _serviceUp;
        }
        //This methos needs to be implemented by all specific service checker
        public abstract void verifyService();
    }

NetworkHealthChecker.java:这个类继承了BaseHealthChecker,实现了verifyService()方法。DatabaseHealthChecker.java和CacheHealthChecker.java除了服务名和休眠时间外,与NetworkHealthChecker.java是同样的。

public class NetworkHealthChecker extends BaseHealthChecker
    {
        public NetworkHealthChecker (CountDownLatch latch)  {
            super("Network Service", latch);
        }
     
        @Override
        public void verifyService()
        {
            System.out.println("Checking " + this.getServiceName());
            try
            {
                Thread.sleep(7000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(this.getServiceName() + " is UP");
        }
    }

ApplicationStartupUtil.java:这个类是一个主启动类,它负责初始化闭锁,而后等待,直到全部服务都被检测完。

public class ApplicationStartupUtil
    {
        //List of service checkers
        private static List<BaseHealthChecker> _services;
     
        //This latch will be used to wait on
        private static CountDownLatch _latch;
     
        private ApplicationStartupUtil()
        {
        }
     
        private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
     
        public static ApplicationStartupUtil getInstance()
        {
            return INSTANCE;
        }
     
        public static boolean checkExternalServices() throws Exception
        {
            //Initialize the latch with number of service checkers
            _latch = new CountDownLatch(3);
     
            //All add checker in lists
            _services = new ArrayList<BaseHealthChecker>();
            _services.add(new NetworkHealthChecker(_latch));
            _services.add(new CacheHealthChecker(_latch));
            _services.add(new DatabaseHealthChecker(_latch));
     
            //Start service checkers using executor framework
            Executor executor = Executors.newFixedThreadPool(_services.size());
     
            for(final BaseHealthChecker v : _services)
            {
                executor.execute(v);
            }
     
            //Now wait till all services are checked
            _latch.await();
     
            //Services are file and now proceed startup
            for(final BaseHealthChecker v : _services)
            {
                if( ! v.isServiceUp())
                {
                    return false;
                }
            }
            return true;
        }
    }

如今你能够写测试代码去检测一下闭锁的功能了。

public class Main {
        public static void main(String[] args)
        {
            boolean result = false;
            try {
                result = ApplicationStartupUtil.checkExternalServices();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("External services validation completed !! Result was :: "+ result);
        }
    }

输出以下:

//Output in console:

Checking Network Service
Checking Cache Service
Checking Database Service
Database Service is UP
Cache Service is UP
Network Service is UP
External services validation completed !! Result was :: true

CyclicBarrier

CyclicBarrier 概念

主要的方法就是一个:await()。

await() 方法每被调用一次,计数便会减小1,并阻塞住当前线程。当计数减至0时,阻塞解除,全部在此 CyclicBarrier 上面阻塞的线程开始运行。在这以后,若是再次调用 await() 方法,计数就又会变成 N-1,新一轮从新开始,这即是 Cyclic 的含义所在。

在全部参与者都已经在此 barrier 上调用 await方法以前,将一直等待。若是当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生如下状况之一前,该线程将一直处于休眠状态:

  • 最后一个线程到达;或者
  • 其余某个线程中断当前线程;或者
  • 其余某个线程中断另外一个等待线程;或者
  • 其余某个线程在等待 barrier 时超时;或者
  • 其余某个线程在此 barrier 上调用 reset()。
  • 若是当前线程,在进入此方法时已经设置了该线程的中断状态;或者
  • 若是当前线程,在等待时被中断

CyclicBarrier 的使用并不难,但须要注意它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它一样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。意思就是说,同志们,别等了,有个小伙伴已经挂了,我们若是继续等有可能会一直等下去,全部各回各家吧。

CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。

和 CountDownLatch 同样,CyclicBarrier 一样能够能够在构造函数中设定总计数值。与 CountDownLatch 不一样的是,CyclicBarrier 的构造函数还能够接受一个 Runnable,会在 CyclicBarrier 被释放时执行。

CyclicBarrier 实例

/**
 * Description: 赛跑时,等待全部人都准备好时,才起跑:
 *
 * @author shenlongguang
 * @date: 2017/4/27 下午1:50.
 */
public class CyclicBarrierTest {

    public static void main(String[] args) throws IOException, InterruptedException {
        //若是将参数改成4,可是下面只加入了3个选手,这永远等待下去
        //Waits until all parties have invoked await on this barrier.
        CyclicBarrier barrier = new CyclicBarrier(5);

        ExecutorService executor = Executors.newFixedThreadPool(5);
        executor.submit(new Thread(new Runner(barrier, " No.1")));
        executor.submit(new Thread(new Runner(barrier, " No.2")));
        executor.submit(new Thread(new Runner(barrier, " No.3")));
        executor.submit(new Thread(new Runner(barrier, " No.4")));
        executor.submit(new Thread(new Runner(barrier, " No.5")));

        executor.shutdown();
    }
}

class Runner implements Runnable {
    // 一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
    private CyclicBarrier barrier;

    private String name;

    public Runner(CyclicBarrier barrier, String name) {
        super();
        this.barrier = barrier;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(System.currentTimeMillis()+ name + " ready...");
            // barrier的await方法,在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis()+ name + " go!");
    }
}

console 输出:

1493272909108 No.5 ready...
1493272910101 No.2 ready...
1493272914101 No.1 ready...
1493272915106 No.3 ready...
1493272915106 No.4 ready...
1493272915106 No.4 go!
1493272915107 No.5 go!
1493272915107 No.3 go!
1493272915107 No.1 go!
1493272915107 No.2 go!

CyclicBarrier 和 CountDownLatch 用法上的不一样

CountDownLatch 适用于一组线程和另外一个主线程之间的工做协做。一个主线程等待一组工做线程的任务完毕才继续它的执行是使用 CountDownLatch 的主要场景;

CyclicBarrier 用于一组或几组线程,好比一组线程须要在一个时间点上达成一致,例如同时开始一个工做。另外,CyclicBarrier 的循环特性和构造函数所接受的 Runnable 参数也是 CountDownLatch 所不具有的

参考 / 转载:

CountDownLatch英文原文when-to-use-countdownlatch-java-concurrency
CountDownLatch翻译文何时使用CountDownLatch
cyclicbarrier示例the-introduction-and-use-of-cyclicbarrier

相关文章
相关标签/搜索