JDK中为了处理线程之间的同步问题,除了提供锁机制以外,还提供了几个很是有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。html
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。java
构造方法摘要
bash
方法名称 | 说明 |
---|---|
CyclicBarrier(int parties) | 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。 |
CyclicBarrier(int parties, Runnable barrierAction) | 建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。 |
方法摘要
多线程
方法名称 | 说明 |
---|---|
public int await() throws InterruptedException, BrokenBarrierException |
在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。
返回:
到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,
零指示最后一个到达的线程.
|
public int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException, ITimeoutException |
在全部参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。 |
public void reset() | 将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在因为其余缘由形成损坏以后,实行重置可能会变得很复杂; |
public boolean isBroken() | 查询此屏障是否处于损坏状态。 |
public int getNumberWaiting() | 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。 |
public int getParties() | 返回要求启动此 barrier 的参与者数目。 |
注意:并发
public static void main(String[] args) {
//设置5个屏障,而且有屏障操做
CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"执行了屏障操做");
}
});
for(int i=0;i<5;i++){
//建立5个线程
Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
thread.start();
}
}
复制代码
class MyRunable implements Runnable{
CyclicBarrier barrier;
public MyRunable(CyclicBarrier barrier ){
this.barrier = barrier;
}
@Override
public void run() {
//一系列操做...
System.out.println("线程 "+Thread.currentThread().getName()+" 到达了屏障点!");
try {
int index = barrier.await();
if(index== (barrier.getParties()-1)){
//第一个到达屏障点的线程,执行特殊操做....
System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是第一个到达屏障点");
}else if(index == 0){//最后一个到达屏障点的线程
System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是最后一个到达屏障点");
}else{
System.out.println("全部线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!");
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
复制代码
运行结果:
app
线程 thread_1 到达了屏障点!
线程 thread_4 到达了屏障点!
线程 thread_3 到达了屏障点!
线程 thread_0 到达了屏障点!
线程 thread_2 到达了屏障点!
线程thread_3执行了屏障操做
全部线程到达屏障点,线程 thread_3 被唤醒!!此线程是最后一个到达屏障点
全部线程到达屏障点,线程 thread_0 被唤醒!!
全部线程到达屏障点,线程 thread_4 被唤醒!!
全部线程到达屏障点,线程 thread_1 被唤醒!!此线程是第一个到达屏障点
全部线程到达屏障点,线程 thread_2 被唤醒!!
复制代码
上面的例子,使用了传入屏障操做的Runable参数的构造方法,ide
。然而,在实际使用中,工具
,如上面的例子,第一个和最后一个到达屏障点的线程都执行特殊的操做。this
顺便说一下,可能会对本例子中前5个输出的顺序 有所疑惑:thread_3 经过awiat()方法返回的索引值,可知 thread_3 是最后一个到达屏障点的,但为何输出的顺序倒是第三个,而不是最后一个;在这就要真正理解CyclicBarrier,CyclicBarrier 本质上是一把锁,多个线程在使用CyclicBarrier 对象时,是须要先获取锁,即须要互斥访问,因此调用await( )方法不必定可以立刻获取锁。上面的例子,是先打印输出,再去获取锁,因此输出顺序不是到达屏障点的顺序。
spa
下面的例子是:CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。好比咱们用一个Excel保存了用户全部银行流水,每一个Sheet保存一个账户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
public class BankWaterService implements Runnable {
//建立4个屏障,处理完后执行当前类的run方法
private CyclicBarrier barrier = new CyclicBarrier(4,this);
//假设只有4个sheet,因此只启动4个线程
private Executor excutor = Executors.newFixedThreadPool(4);
//保存每一个sheet计算出的结果
private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count(){
for(int i=0;i<4;i++){
excutor.execute(new Runnable() {
@Override
public void run() {
//计算过程.....
//存储计算结果
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
try {
//计算完成,插入屏障
barrier.await();
//后续操做,将会使用到四个线程的运行结果....
System.out.println("线程"+Thread.currentThread().getName()+"运行结束,最终的计算结果:"+sheetBankWaterCount.get("result"));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result = 0;
for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
result += item.getValue();
}
sheetBankWaterCount.put("result", result);
}
public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}
复制代码
运行结果:
线程pool-1-thread-4运行结束,最终的计算结果:4
线程pool-1-thread-2运行结束,最终的计算结果:4
线程pool-1-thread-1运行结束,最终的计算结果:4
线程pool-1-thread-3运行结束,最终的计算结果:4
复制代码
文章源地址:https://www.cnblogs.com/jinggod/p/8494193.html