CountDownLatch和CyclicBarrier

一、多线程开发中,常常会遇到一个线程等待一个或多个线程的状况,遇到这样的场景如何解决?
一个线程等待一个线程:能够经过wait和notify实现
一个线程等待多个线程:能够经过CountDownLatch实现
多个线程之间互相等待:能够经过CyclicBarrier实现java

二、countDownLatch和CyclicBarrier的区别:
countDownLatch    多线程

减计数方式
调用countDown()计数减1,调用await()只阻塞线程,对计数无影响
计数为0时,释放全部等待的线程
计数为0时,不可重复利用dom

CyclicBarrieride

加计数方式
调用await方法,计数加1,若加1后的值不等于构造方法的值,则线程阻塞
计数达到指定值时,计数从新置为0,释放全部阻塞线程
计数达到指定值时,可重复利用.函数

三、CountDownLatch的伪代码以下所示:测试

//Main thread startthis

//Create CountDownLatch for N threadsspa

//Create and start N threads线程

//Main thread wait on latchcode

//N threads completes there tasks are returns

//Main thread resume execution

四、CountDownLatch如何工做

CountDownLatch.java类中定义的构造函数:

//Constructs a CountDownLatch initialized with the given count.

public void CountDownLatch(int count) {...}

与CountDownLatch的第一次交互是主线程等待其余线程。主线程必须在启动其余线程后当即调用CountDownLatch.await()方法。这样主线程的操做就会在这个方法上阻塞,直到其余线程完成各自的任务。构造器中的计数值(count)实际上就是闭锁须要等待的线程数量。这个值只能被设置一次,并且CountDownLatch没有提供任何机制去从新设置这个计数值。

其余N 个线程必须引用闭锁对象,由于他们须要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是经过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。因此当N个线程都调 用了这个方法,count的值等于0,而后主线程就能经过await()方法,恢复执行本身的任务。

五、CountDownLatch使用例子

在这个例子中,模拟了一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,而且启动类一直在闭锁上等待着。一旦验证和检查了全部外部服务,那么启动类恢复执行。

BaseHealthChecker.java:这个类是一个Runnable,负责全部特定的外部服务健康的检测。

public abstract class BaseHealthChecker implements Runnable{

    private CountDownLatch countDownLatch;
    private String serviceName;
    private boolean serviceUp;
    
    public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch){
        this.serviceName = serviceName;
        this.countDownLatch = countDownLatch;
        this.serviceUp = false;
    }
    
    @Override
    public void run() {
        try {
            serviceVerify();
            serviceUp = true;
        } catch (Exception e) {
            e.printStackTrace();
            serviceUp = false;
        } finally {
            if (null != countDownLatch) {
                countDownLatch.countDown();
            }
        }
    }
    
    public String getServiceName() {
        return serviceName;
    }

    public boolean isServiceUp() {
        return serviceUp;
    }

    public abstract void serviceVerify() throws Exception;    
}

NetworkHealthChecker.java:这个类继承了BaseHealthChecker,实现了verifyService()方法。DatabaseHealthChecker.javaCacheHealthChecker.java除了服务名和休眠时间外,与NetworkHealthChecker.java是同样的。

public class NetworkHealthChecker extends BaseHealthChecker{

    public NetworkHealthChecker(CountDownLatch countDownLatch) {
        super("Network Service", countDownLatch);
    }

    @Override
    public void serviceVerify() throws InterruptedException {
        System.out.println("Checking " + this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName() + " is checked");
    }

}

ApplicationStartupUtil.java:这个类是一个主启动类,它负责初始化闭锁,而后等待,直到全部服务都被检测完。

public class ApplicatiionStartupUtil {

    private static List<BaseHealthChecker> checkers;
    private ApplicatiionStartupUtil(){
        
    }

    private final ApplicatiionStartupUtil INSTANCE = new ApplicatiionStartupUtil();
    public ApplicatiionStartupUtil getInstance(){
        return INSTANCE;
    }
    
    public static boolean checkExternalService() throws InterruptedException{
        CountDownLatch countDownLatch = new CountDownLatch(3);
        checkers = new ArrayList<BaseHealthChecker>();
        checkers.add(new NetworkHealthChecker(countDownLatch));
        checkers.add(new CacheHealthChecker(countDownLatch));
        checkers.add(new DateBasekHealthChecker(countDownLatch));
        
        ExecutorService executor = Executors.newFixedThreadPool(checkers.size());
            
        for(final BaseHealthChecker checker : checkers){
            executor.execute(checker);
        }
        countDownLatch.await();
        executor.shutdown();
        
        for(final BaseHealthChecker checker : checkers){
            if (!checker.isServiceUp()) {
                return false;
            }
        }
        return true;
    }
}

测试代码:

public class Main {

    public static void main(String[] args) {
        boolean result = false;
        try {
            result = ApplicatiionStartupUtil.checkExternalService();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);
    }
}

运行结果

Checking Network Service
Checking DateBase Service
Checking Cache Service
Network Service is checked
Cache Service is checked
DateBase Serviceis Checked
External services validation completed !! Result was :: true

六、CyclicBarrier使用的例子

public class CyclicBarrierTest {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());
    public static void main(String[] args) {
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(new Random().nextInt(5) * 1000);
                    System.out.println("thread is prepared...." + new Date());
                    cyclicBarrier.await();
                    System.out.println(1);
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        try {
            Thread.sleep(new Random().nextInt(5) * 1000);
            System.out.println("main is prepared...." + new Date());
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e ) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
    
    static class A implements Runnable{

        @Override
        public void run() {
            System.out.println("A.......");
        }
        
    }
}

运行结果:

thread is prepared....Fri Oct 27 11:08:44 CST 2017 main is prepared....Fri Oct 27 11:08:47 CST 2017 A....... 1 2

相关文章
相关标签/搜索