【Java并发编程】常见工具类总结:CountDownLatch,CyclicBarrier,Semphore,Exchanger

CountDownLatch

CountDownLatch容许一个或多个线程等待其余线程完成操做。相似于join的操做,能够进行类比:java

join用于让当前执行线程等待join线程执行结束,如A.join()方法,将不停检查A线程是否存活,若是A存活,则当前线程永远等待。编程

public class JoinCDLT {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println("parser1 start");
                Thread.sleep(5000);
                System.out.println("parser1 finish");
            }
        });
        Thread parser2 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println("parser2 start");
                Thread.sleep(10000);
                System.out.println("parser2 finish");
            }
        });
        long start = System.currentTimeMillis();
        parser1.start();
        parser2.start();
        //join用于让当前执行线程等待join线程执行结束
        parser1.join();
        parser2.join();
        long end = System.currentTimeMillis();
        System.out.println("all parser finish , spend " + (end - start) + " ms");
    }
    
}

CountDownLatch使用AQS实现,经过AQS的状态变量state来做为计数器值,当多个线程调用countdown方法时实际是原子性递减AQS的状态值,当线程调用await方法后当前线程会被放入AQS阻塞队列等待计数器为0再返回。segmentfault

public class CountDownLatchTest {

    //传入int参数做为计数器
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        Thread a = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(3000);
                System.out.println(1);
                c.countDown();//每当调用该方法,计数器N - 1
            }
        });
        Thread b = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(3000);
                System.out.println(2);
                c.countDown();
            }
        });
        long start = System.currentTimeMillis();
        a.start();
        b.start();
        //await方法会阻塞当前线程
        c.await();
        long end = System.currentTimeMillis();
        System.out.println(3 + " " + (end - start));
    }
}

计数器必须大于等于0,只是等于0时候,计数器就是0,调用await方法时不会阻塞当前线程。CountDownLatch不可能从新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另一个线程调用await方法。并发

CyclicBarrier

CyclicBarrier能够让一组线程达到一个屏障【同步点】时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。app

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。ide

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //线程调用await,告诉CyclicBarrier已经到达了屏障,而后当前线程被阻塞
                    c.await(); 
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }, "thread-1");
        thread.start();
        try {
            //主线程到达了屏障,由于设置了parties设置为2,所以能够继续下去
            c.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

上述程序,输出的结果多是先1后2的次序,也多是先2后1,缘由在于:主线程和子线程的调度由CPU决定,两个线程均可能先执行。函数

可是,若是将屏障数量改成3,此时主线程和子线程会永远等待,由于没有第三个线程达到屏障了。ui

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可使用reset()方法重置,所以CyclicBarrier可以处理更为复杂的业务场景。
  • CyclicBarrier还提供了其余有用的方法,如getNumberWaiting方法能够得到CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

Phaser 的实现

Phaser能够替代CountDownLatch 和CyclicBarrier,但比二者更增强大,能够动态调整须要的线程个数,能够经过构造函数传入父Phaser实现层次Phaser。线程

参考:http://www.javashuo.com/article/p-gholfgmo-de.htmlcode

Semaphore

Semaphore 能够用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源,适用场景能够是流量控制。

使用AQS实现,AQS的状态变量state作为许可证数量,每次经过acquire()/tryAcquire(),许可证数量经过CAS原子性递减,调用release()释放许可证,原子性递增,只要有许可证就能够重复使用。

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);//10个许可证数量,最大并发数为10

    public static void main(String[] args) {
        for(int i = 0; i < THREAD_COUNT; i ++){ //执行30个线程
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    s.tryAcquire(); //尝试获取一个许可证
                    System.out.println("save data");
                    s.release(); //使用完以后归还许可证
                }
            });
        }
        threadPool.shutdown();
    }
}

Exchanger 原理

用于进行线程间的数据交换,它提供一个同步点,在这个同步点两个线程能够交换彼此的数据。

若是第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当都达到同步点时,这两个线程能够交换数据。

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String A = "银行流水A";
                exgr.exchange(A);
            }
        });
        threadPool.execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String B = "银行流水B";
                String A = exgr.exchange(B);
                System.out.println("A 和 B 的数据是否一致 :"
                        + A.equals(B) + " A录入的数据为 :" + A + " B录入的数据为 :" + B);
            }
        });
        threadPool.shutdown();
    }
}

参考

相关文章
相关标签/搜索