继续 上一篇 《 学习笔记五:线程间的协做与通讯》java
在jdk的并发包里提供了几个很是有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段,以下主要介绍工具类的使用。数据库
CountDownLatch容许一个或多个线程等待其余线程完成操做。多线程
CountDownLatch类其实是使用计数器的方式去控制的,当咱们初始化CountDownLatch的时候传入了一个int变量这个时候在类的内部初始化一个int的变量,每当咱们调用countDownt()方法的时候就使得这个变量的值减1,而对于await()方法则去判断这个int的变量的值是否为0,是则表示全部的操做都已经完成,不然继续等待。并发
构造器中的计数值(count)实际上就是闭锁须要等待的线程数量。计数器的值必须大于等于0,只是等于0的时候,计数器就是零,调用await方法时不会阻塞当前线程;这个值只能被设置一次,并且CountDownLatch没有提供任何机制去从新设置这个计数值;一个线程调用countDown方法happen-before,另一个线程调用await方法。app
与CountDownLatch的第一次交互是主线程等待其余线程。主线程必须在启动其余线程后当即调用CountDownLatch.await()方法。这样主线程的操做就会在这个方法上阻塞,直到其余线程完成各自的任务。ide
示例:boss等待全部员工来开会,当全部人员都到齐以后,boss宣布开始会议!!!函数
package com.black.example.mutilThread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CountDownLatchDemo { //声明countDownLatch 变量,初始化线程数量(内部计数器) static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) { new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); new Thread(new MyRunner(countDownLatch, "小张", 4000)).start(); new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); try { System.out.println("等待员工到来开会。。。。。。。"); //注意这里是await。主线程将会一直等待在这里,当全部线程都执行 countDownLatch.countDown();以后当前线程才会继续执行 countDownLatch.await(); startMeeting("老板"); } catch (InterruptedException e) { e.printStackTrace(); } } private static void startMeeting(String name) { System.out.println(name + "说:人齐了。会议开始!!"); } static class MyRunner implements Runnable { CountDownLatch countDownLatch; String name; int time; public MyRunner(CountDownLatch countDownLatch, String name, int time) { this.countDownLatch = countDownLatch; this.name = name; this.time = time; } @Override public void run() { try { System.out.println(name + " 开始出发去公司。"); TimeUnit.SECONDS.sleep(1); System.out.println(name + " 终于到会议室!!!"); countDownLatch.countDown(); System.out.println(name + " 准备好了!!"); } catch (Exception e) { e.printStackTrace(); } } } }运行结果:注意,至因而谁先到会议室,每次运行结果都会不同。由于主线程和子线程的调用时由CPU决定的工具
若是某我的缺席会议,咱们不能让主线程一直等待,因此可使用另一个带指定时间的await方法-await(long time,TimeUtil unit)的那个带指定时间后,就好再也不阻塞当前线程。jion也有相似的方法。学习
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。ui
CyclicBarrier默认构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达屏障了,而后当前线程被阻塞。 示例代码以下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CyclicBarrierDemo1 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new MyThread(1)).start(); new Thread(new MyThread(2)).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("当前值输出:"+counter); TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行"); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } } }运行结果:由于主线程和子线程的调用时由CPU决定的,两个线程都有可能先执行,因此会产生不一样的结果
若是把new CyclicBarrier(2) 修改成new CyclicBarrier(3),则主线程和子线程会永远等待,不会继续执行,由于第三个尚未到达屏障,因此以前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供了高级构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在现场到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码以下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; public class CyclicBarrierDemo2 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3)); public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用计数当前值:1"); cyclicBarrier.await(); System.out.println("默认执行通用线程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用计数当前值:2"); cyclicBarrier.await(); System.out.println("默认执行通用线程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("优先执行:"+counter); TimeUnit.SECONDS.sleep(1); System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行"); } catch (InterruptedException e) { e.printStackTrace(); } } } }运行结果:当两个线程都到达屏障后,优先执行对象MyThread 的任务。
CyclicBarrier的应用场景
能够用于多线程计算数据,最后合并计算结果的场景。例如:用一个Excel保存用户的全部银行流水,每一个Sheet保存一个帐户近一年的每笔银行流水,如今须要统计用户的日均银行流水,先用多线程处理每一个sheet里的银行流水,都执行完以后,获得每一个sheet的日均银行流水。最后再用barrierAction总结出整个Excel的日均银行流水,示例代码以下:
package com.black.example.mutilThread; import java.util.Map; import java.util.concurrent.*; public class BankWaterService implements Runnable { //建立4个屏障,处理完以后执行当前类的run方法 private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this); //假设只有4个sheet,因此启动4个线程 private Executor executor = Executors.newFixedThreadPool(4); //保存每一个sheet计算出的银行流水结果 private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>(); private void count(){ for (int i=0;i<4;i++){ executor.execute(new Runnable() { @Override public void run() { //1:计算当前sheet的银行流水数据,计算代码省略....伪代码以下: sheetCountMap.put(Thread.currentThread().getName(),1); //计算完成,插入屏障 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result=0; //汇总每一个sheet的计算结果 for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){ result+=entry.getValue(); } //输出结果 sheetCountMap.put("result",result); System.out.println("最终结果:"+result); } public static void main(String[] args) { BankWaterService bankWaterService = new BankWaterService(); bankWaterService.count(); } }运行结果:最终结果:4
CountDownLatch计数器只能使用一次,而CyclicBarrier计数器可使用reset()方法重置。
因此CyclicBarrier能够处理更为复杂的业务场景,如:计算发生错误,能够重置计数器,并让线程从新执行一次。
CyclicBarrier其余用法及源代码以下,如:
isBroken() :阻塞的线程是否被中断,返回值boolean类型
getNumberWaiting() :获取Cyclic-Barrier阻塞的线程数量
Semaphore(信号量)用来控制同时访问特定资源的线程数量,经过协调各个线程,以保证合理的使用公共资源。
应用场景
Semaphore能够用于流量控制,特别是公用资源有限的应用场景,好比数据库连接。
假若有个需求,要读几万个文件的数据,由于是IO密集型任务,咱们能够启动几十个线程并发读取,可是若是读到内存后,还须要存储到数据库中,而数据库的链接数只有10个,这时咱们必须控制只有10个线程同时获取数据库连接保存数据,不然会报错没法获取数据库连接。这时可使用Semaphore来作流量控制,代码示例以下:
package com.black.example.mutilThread; import java.util.concurrent.*; public class SemaphoreDemo { private static final int THREAD_COUNT=30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10);//10个并发 public static void main(String[] args) { for (int i=0;i<THREAD_COUNT;i++){ threadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"-----请求资源"); //请求获取资源,若是有空闲资源则会当即获取,进入临界区,不然将会等待,一直等待到获取到临界区资源 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"----获取资源,保存数据!"); TimeUnit.SECONDS.sleep(1); semaphore.release();//释放资源 } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }运行结果:
pool-1-thread-4-----请求资源 pool-1-thread-2-----请求资源 pool-1-thread-4----获取资源,保存数据! pool-1-thread-1-----请求资源 pool-1-thread-5-----请求资源 pool-1-thread-3-----请求资源 pool-1-thread-5----获取资源,保存数据! pool-1-thread-8-----请求资源 pool-1-thread-1----获取资源,保存数据! pool-1-thread-9-----请求资源 pool-1-thread-2----获取资源,保存数据! pool-1-thread-7-----请求资源 pool-1-thread-9----获取资源,保存数据! pool-1-thread-10-----请求资源 pool-1-thread-11-----请求资源 pool-1-thread-8----获取资源,保存数据! pool-1-thread-6-----请求资源 pool-1-thread-3----获取资源,保存数据! pool-1-thread-13-----请求资源 pool-1-thread-11----获取资源,保存数据! pool-1-thread-10----获取资源,保存数据! pool-1-thread-12-----请求资源 pool-1-thread-7----获取资源,保存数据! pool-1-thread-15-----请求资源 pool-1-thread-14-----请求资源 pool-1-thread-16-----请求资源 pool-1-thread-17-----请求资源 pool-1-thread-18-----请求资源 pool-1-thread-19-----请求资源 pool-1-thread-21-----请求资源 pool-1-thread-20-----请求资源 pool-1-thread-22-----请求资源 pool-1-thread-23-----请求资源 pool-1-thread-24-----请求资源 pool-1-thread-25-----请求资源 pool-1-thread-27-----请求资源 pool-1-thread-26-----请求资源 pool-1-thread-28-----请求资源 pool-1-thread-29-----请求资源 pool-1-thread-30-----请求资源 pool-1-thread-6----获取资源,保存数据! pool-1-thread-13----获取资源,保存数据! pool-1-thread-12----获取资源,保存数据! pool-1-thread-15----获取资源,保存数据! pool-1-thread-16----获取资源,保存数据! pool-1-thread-17----获取资源,保存数据! pool-1-thread-14----获取资源,保存数据! pool-1-thread-21----获取资源,保存数据! pool-1-thread-19----获取资源,保存数据! pool-1-thread-18----获取资源,保存数据! pool-1-thread-20----获取资源,保存数据! pool-1-thread-22----获取资源,保存数据! pool-1-thread-23----获取资源,保存数据! pool-1-thread-26----获取资源,保存数据! pool-1-thread-27----获取资源,保存数据! pool-1-thread-25----获取资源,保存数据! pool-1-thread-24----获取资源,保存数据! pool-1-thread-30----获取资源,保存数据! pool-1-thread-29----获取资源,保存数据! pool-1-thread-28----获取资源,保存数据!Semaphore简单用法以下:
//构造方法,可用的许可证数量,默认使用非公平锁的方式建立 public Semaphore(int permits){...} //尝试获取许可证 public boolean tryAcquire(){...} //尝试获取许可证,在指定时间内若获取不到则返回 public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...} //返回信号量中当前可用的许可证数 public int availablePermits(){...} //返回正在等待获取许可证的线程数(估计值) public final int getQueueLength() {...} //查询是否有任何线程等待获取许可证 public final boolean hasQueuedThreads(){...} //减小reduction个许可证,这个方法在使用的子类中颇有用跟踪不可用资源的信号量 protected void reducePermits(int reduction) {...} //返回全部等待获取许可证的线程集合 protected Collection<Thread> getQueuedThreads(){...}
上一篇:学习笔记五:线程间的协做与通讯
下一篇:待续....