CountDownLatch容许一个或多个线程等待其余线程完成操做。相似于join的操做,能够进行类比:java
join用于让当前执行线程等待join线程执行结束,如
A.join()
方法,将不停检查A线程是否存活,若是A存活,则当前线程永远等待。编程
public class JoinCDLT { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @SneakyThrows @Override public void run() { System.out.println("parser1 start"); Thread.sleep(5000); System.out.println("parser1 finish"); } }); Thread parser2 = new Thread(new Runnable() { @SneakyThrows @Override public void run() { System.out.println("parser2 start"); Thread.sleep(10000); System.out.println("parser2 finish"); } }); long start = System.currentTimeMillis(); parser1.start(); parser2.start(); //join用于让当前执行线程等待join线程执行结束 parser1.join(); parser2.join(); long end = System.currentTimeMillis(); System.out.println("all parser finish , spend " + (end - start) + " ms"); } }
CountDownLatch使用AQS实现,经过AQS的状态变量state来做为计数器值,当多个线程调用countdown方法时实际是原子性递减AQS的状态值,当线程调用await方法后当前线程会被放入AQS阻塞队列等待计数器为0再返回。segmentfault
public class CountDownLatchTest { //传入int参数做为计数器 static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { Thread a = new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(3000); System.out.println(1); c.countDown();//每当调用该方法,计数器N - 1 } }); Thread b = new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(3000); System.out.println(2); c.countDown(); } }); long start = System.currentTimeMillis(); a.start(); b.start(); //await方法会阻塞当前线程 c.await(); long end = System.currentTimeMillis(); System.out.println(3 + " " + (end - start)); } }
计数器必须大于等于0,只是等于0时候,计数器就是0,调用await方法时不会阻塞当前线程。CountDownLatch不可能从新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另一个线程调用await方法。并发
CyclicBarrier能够让一组线程达到一个屏障【同步点】时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。app
CyclicBarrier默认的构造方法是
CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。ide
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { //线程调用await,告诉CyclicBarrier已经到达了屏障,而后当前线程被阻塞 c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }, "thread-1"); thread.start(); try { //主线程到达了屏障,由于设置了parties设置为2,所以能够继续下去 c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(2); } }
上述程序,输出的结果多是先1后2的次序,也多是先2后1,缘由在于:主线程和子线程的调度由CPU决定,两个线程均可能先执行。函数
可是,若是将屏障数量改成3,此时主线程和子线程会永远等待,由于没有第三个线程达到屏障了。ui
getNumberWaiting
方法能够得到CyclicBarrier阻塞的线程数量。isBroken()
方法用来了解阻塞的线程是否被中断。Phaser能够替代CountDownLatch 和CyclicBarrier,但比二者更增强大,能够动态调整须要的线程个数,能够经过构造函数传入父Phaser实现层次Phaser。线程
参考:http://www.javashuo.com/article/p-gholfgmo-de.htmlcode
Semaphore 能够用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源,适用场景能够是流量控制。
使用AQS实现,AQS的状态变量state作为许可证数量,每次经过acquire()/tryAcquire()
,许可证数量经过CAS原子性递减,调用release()释放许可证,原子性递增,只要有许可证就能够重复使用。
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10);//10个许可证数量,最大并发数为10 public static void main(String[] args) { for(int i = 0; i < THREAD_COUNT; i ++){ //执行30个线程 threadPool.execute(new Runnable() { @Override public void run() { s.tryAcquire(); //尝试获取一个许可证 System.out.println("save data"); s.release(); //使用完以后归还许可证 } }); } threadPool.shutdown(); } }
用于进行线程间的数据交换,它提供一个同步点,在这个同步点两个线程能够交换彼此的数据。
若是第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当都达到同步点时,这两个线程能够交换数据。
public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @SneakyThrows @Override public void run() { String A = "银行流水A"; exgr.exchange(A); } }); threadPool.execute(new Runnable() { @SneakyThrows @Override public void run() { String B = "银行流水B"; String A = exgr.exchange(B); System.out.println("A 和 B 的数据是否一致 :" + A.equals(B) + " A录入的数据为 :" + A + " B录入的数据为 :" + B); } }); threadPool.shutdown(); } }
参考
《Java并发编程的艺术》 方腾飞