CountDownLatch
以及CyclicBarrier
都是Java
里面的同步工具之一,本文介绍了二者的基本原理以及基本使用方法。java
CountDownLatch
CountDownLatch
是一个同步工具类,常见的使用场景包括:dom
好比考虑这样一个场景,在一个电商网站中,用户点击了首页,须要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:工具
解决这样的问题可使用串行化或并行化操做,串行化就是逐一计算商品的售价,并返回,并行化就是获取商品后,并行计算每个商品的售价,最后返回,显而后一种方案要比前一种要好,那么这时候就能够用上CountDownLatch
了。网站
一份简单的模拟代码以下:this
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); //计数器大小为商品列表的长度 final CountDownLatch latch = new CountDownLatch(list.size()); //线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ //随机休眠模拟业务操做耗时 TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { //每完成计算一个商品,将计数器减1,注意须要放在finally中 latch.countDown(); } })); //主线程阻塞直到全部的计数器为0,也就是等待全部的子任务计算价格完毕 latch.await(); System.out.println("All of prices calculate finished"); //手动终止,否则不会结束运行 executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
输出:spa
代码比较简单,关键地方用上了注释,能够看到代码执行顺序以下:线程
值得注意的是计数器减1的操做须要放在finally
中,由于有可能会出现异常,若是出现异常致使计数器不能减小,那么主线程会一直阻塞。code
另外,CountDownLatch
还有一个await(long timeout,TimeUnit unit)
方法,是带有超时参数的,也就是说,若是在超时时间内,计数器的值仍是大于0(还有任务没执行完成),会使得当前线程退出阻塞状态。队列
CyclicBarrier
CyclicBarrier
与CountDownLatch
有不少相似的地方,也是一个同步工具类,容许多个线程在执行完相应的操做以后彼此等待到达同一个barrier point
(屏障点)。CyclicBarrier
也适合某个串行化的任务被拆分为多个并行化任务,这点与CountDownLatch
相似,可是CyclicBarrier
具有的一个更强大的功能是,CyclicBarrier
能够被重复使用。图片
先简单说一下CyclicBarrier
的实现原理:
CyclicBarrier
,传入一个int
参数,表示分片(parites
),一般意义上来讲分片数就是任务的数量await()
,等待其余线程也到达barrier point
常见的使用方法是设置分片数为任务数+1,这样,能够在主线程中执行await()
,等待全部子任务完成。好比下面是使用CyclicBarrier
实现一样功能的模拟代码:
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished"); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
输出相同,代码大部分类似,不一样的地方有:
latch.countDown()
替换成了barrier.await()
latch.await()
替换成了barrier.await()
10
await()
方法会等待全部的线程到达barrier point
,上面代码执行流程简述以下:
CyclicBarrier
,分片数为11(子线程数+1)await()
,等待子线程执行完成await()
,等待其余线程也到达barrier point
注意一个很大的不一样就是这里的线程池核心线程数目改为了 10,那么,为何须要10?
由于若是是设置一个小于10的核心线程个数,因为线程池是会先建立核心线程来执行任务,核心线程满了以后,放进任务队列中,而假设只有5个核心线程,那么:
这样的话,会出现死锁,由于计算中的线程须要队列中的任务到达barrier point
才能结束,而队列中的任务须要核心线程计算完毕后,才能调度出来计算,这样死锁就出现了。
CyclicBarrier
与CountDownLatch
的一个最大不一样是,CyclicBarrier
能够被重复使用,原理上来讲,await()
会将内部计数器减1,当计数器减为0时,会自动进行计数器(分片数)重置。好比,在上面的代码中,因为赶上促销活动,须要对商品的价格再次进行计算:
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished."); //复制的一段相同代码 list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price again."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished again."); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
将计算价格的代码复制一遍,其中没有手动修改计数器,只是调用await()
,输出以下:
能够看到,并无对CycliBarrier
进行相似reset
之类的操做,可是依然能按正常逻辑运行,这是由于await()
内部会维护一个计数器,当计数器为0的时候,会自动进行重置,下面是await()
在OpenJDK 11
下的源码:
public int await() throws InterruptedException, BrokenBarrierException { try { return this.dowait(false, 0L); } catch (TimeoutException var2) { throw new Error(var2); } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { ReentrantLock lock = this.lock; lock.lock(); byte var9; try { //... int index = --this.count; if (index != 0) { //计数器不为0的状况 //.... } boolean ranAction = false; try { Runnable command = this.barrierCommand; if (command != null) { command.run(); } ranAction = true; this.nextGeneration(); var9 = 0; } finally { if (!ranAction) { this.breakBarrier(); } } } finally { lock.unlock(); } return var9; } private void nextGeneration() { this.trip.signalAll(); this.count = this.parties; this.generation = new CyclicBarrier.Generation(); }
当计数器为0时,会生成新的Generation
,并将var9
置为0,最后返回var9
(在这个方法中var9
只有一处赋值,就是代码中的var9=0
,能够理解成直接返回0)。
CyclicBarrier
其余的一些经常使用方法CyclicBarrier(int parties,Runnable barrierAction)
:构造的时候传入一个Runnable
,表示全部线程到达barrier point
时,会调用该Runnable
await(long timeout,TimeUnit unit)
:与无参的await()
相似,底层调用的是相同的doWait()
,不过增长了超时功能isBroken()
:返回broken
状态,某个线程因为执行await
而进入阻塞,此时若是执行了中断操做(好比interrupt
),那么isBroken()
会返回true
。须要注意,处于broken
状态的CyclicBarrier
不能被直接使用,须要调用reset()
进行重置下面是CountDownLatch
与CyclicBarrier
的一些简单比较,相同点以下:
java.util.concurrent
包下的线程同步工具类不一样点:
CountDownLatch
的await()
方法会等待计数器归0,而CyclicBarrier
的await()
会等待其余线程到达barrier point
CyclicBarrier
内部的计数器是能够被重置的,可是CountDownLatch
不能够CyclicBarrier
是由Lock
和Condition
实现的,而CountDownLatch
是由同步控制器AQS
实现的CyclicBarrier
不容许parties
为0,而CountDownLatch
容许count
为0