本文主要介绍和对比咱们经常使用的几种并发工具类,主要涉及 CountDownLatch
、 CyclicBarrier
、 Semaphore
、 Exchanger
相关的内容,若是对多线程相关内容不熟悉,能够看笔者以前的一些文章:java
CountDownLatch
、CyclicBarrier
二者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段,Semaphore
、Exchanger
的使用,semaphore
是信号量,能够用来控制容许的线程数,而 Exchanger
能够用来交换两个线程间的数据。CountDownLatch
是 JDK5
以后加入的一种并发流程控制工具,它在 java.util.concurrent
包下CountDownLatch
容许一个或多个线程等待其余线程完成操做,这里须要注意,是能够是一个等待也能够是多个来等待CountDownLatch
的构造函数以下,它接受一个 int
类型的参数做为计数器,即若是你想等待N
个线程完成,那么这里就传入 N
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
countDown
与 await
,其中 当咱们调用 countDown
方法时相应的 N
的值减 1,而 await
方法则会阻塞当前线程,直到 N
的值变为零。CountDownLatch
来实现这一案例,那么等待的个数 N
就是上面的裁判线程的个数,即为 1,/** * @url i-code.onlien * 云栖简码 */ public static void main(String[] args) throws InterruptedException { //模拟跑步比赛,裁判说开始,全部选手开始跑,咱们可使用countDownlatch来实现 //这里须要等待裁判说开始,因此时等着一个线程 CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已准备"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开始跑~~"); },"选手1").start(); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已准备"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开始跑~~"); },"选手2").start(); TimeUnit.SECONDS.sleep(1); System.out.println("裁判:预备~~~"); countDownLatch.countDown(); System.out.println("裁判:跑~~~"); }
在上述代码中,咱们首先建立了一个计数为1 的
CountDownLatch
对象,这表明咱们须要等待的线程数,以后再建立了两个线程,用来表明选手线程,同时在选手的线程中咱们都调用了await
方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程main
方法中咱们等待 1秒后执行countDown
方法,这个方法就是减一,此时的N
则为零了,那么选手线程则开始执行后面的内容,总体的输出如上图所示面试
CountDownLatch
来实现,那么的计数个数 N
则为5,由于要等待这五个,经过代码实现以下:public static void main(String[] args) throws InterruptedException { /** * i-code.online * 云栖简码 */ //等待的个数 CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "从住所出发..."); try { TimeUnit.SECONDS.sleep((long) (Math.random()*10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 到达目的地-----"); countDownLatch.countDown(); },"人员-"+i).start(); } System.out.println("大巴正在等待人员中....."); countDownLatch.await(); System.out.println("-----全部人到齐,出发-----"); }
从上述代码中咱们能够看到,定义了一个计数为5的
countDownLatch
,以后经过循环建立五个线程,模拟五我的员,当他们到达指定地点后执行countDown
方法,对计数减一。主线程至关因而大巴车的线程,执行await
方法进行阻塞,只有当N
的值减到0后则执行后面的输出算法
public CountDownLatch(int count) { };
它的构造函数是传入一个参数,该参数
count
是须要倒数的数值。数据库
await()
:调用 await()
方法的线程开始等待,直到倒数结束,也就是 count
值为 0
的时候才会继续执行。await(long timeout, TimeUnit unit)
:await()
有一个重载的方法,里面会传入超时参数,这个方法的做用和 await()
相似,可是这里能够设置超时时间,若是超时就再也不等待了。countDown()
:把数值倒数 1
,也就是将 count
值减 1
,直到减为 0
时,以前等待的线程会被唤起。上面的案例介绍了
CountDownLatch
的使用,可是CountDownLatch
有个特色,那就是不可以重用,好比已经完成了倒数,那可不能够在下一次继续去从新倒数呢?是能够的,一旦倒数到0 则结束了,没法再次设置循环执行,可是咱们实际需求中有不少场景中须要循环来处理,这时候咱们可使用CyclicBarrier
来实现编程
CyclicBarrier
与 CountDownLatch
比较类似,当等待到必定数量的线程后开始执行某个任务CyclicBarrier
的字面意思是能够循环使用的屏障,它的功能就是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开会,此时全部被屏障阻塞的线程都将继续执行。以下演示public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
CyclicBarrier(int parties)
构造函数提供了int
类型的参数,表明的是须要拦截的线程数量,而每一个线程经过调用 await
方法来告诉 CyclicBarrier
我到达屏障点了,而后阻塞CyclicBarrier(int parties, Runnable barrierAction)
构造函数是为咱们提供的一个高级方法,加了一个 barrierAction
的参数,这是一个Runnable
类型的,也就是一个线程,它表示当全部线程到达屏障后,清闲触发 barrierAction
线程执行,再执行各个线程以后的内容CyclicBarrier
的来实现,这里咱们须要来拦截的线程就是两个。具体实现 以下:/* CyclicBarrier 与countDownLatch 比较类似,也是等待线程完成, 不过countDownLatch 是await等待其余的线程经过countDown的数量,达到必定数则执行, 而 CyclicBarrier 则是直接看await的数量,达到必定数量直接所有执行, */ public static void main(String[] args) { //比如情侣约会,无论谁先到都的等另外一个,这里就是两个线程, CyclicBarrier cyclicBarrier = new CyclicBarrier(2); new Thread(() ->{ System.out.println("快速收拾,出门~~~"); try { TimeUnit.MILLISECONDS.sleep(500); System.out.println("到了约会地点等待女友前来~~"); cyclicBarrier.await(); System.out.println("女友到来嗨皮出发~~约会"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },"男友").start(); new Thread(() ->{ System.out.println("慢慢收拾,出门~~~"); try { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("到了约会地点等待男友前来~~"); cyclicBarrier.await(); System.out.println("男友到来嗨皮出发~~约会"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },"女友").start(); }
上面代码,相对简单,建立一个拦截数为2的屏障,以后建立两个线程,调用await方法,只有当调用两次才会触发后面的流程。微信
Runnable
参数的构造函数;和以前 CountDownLatch
的案例类似,公司组织出游,这时候确定有不少大巴在等待接送,大巴不会等全部的 人都到才出发,而是每坐满一辆车就出发一辆,这种场景咱们就可使用 CyclicBarrier
来实现,实现以下:/* CyclicBarrier是可重复使用到,也就是每当几个知足是再也不等待执行, 好比公司组织出游,安排了好多辆大把,每坐满一辆就发车,再也不等待,相似这种场景,实现以下: */ public static void main(String[] args) { //公司人数 int peopleNum = 2000; //每二十五我的一辆车,凑够二十五则发车~ CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{ //达到25人出发 System.out.println("------------25人数凑齐出发------------"); }); for (int j = 1; j <= peopleNum; j++) { new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start(); } } static class PeopleTask implements Runnable{ private String name; private CyclicBarrier cyclicBarrier; public PeopleTask(String name,CyclicBarrier cyclicBarrier){ this.name = name; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println(name+"从家里出发,正在前往聚合地...."); try { TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+"到达集合地点,等待其余人.."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
相同点:多线程
不一样点:并发
CountDownLatch
的计数器只能使用一次,到到达0后就不能再次使用了,除非新建实例;而 CyclicBarrier
的计数器是能够复用循环的,因此 CyclicBarrier
能够用在更复杂的场景,能够随时调用 reset
方法来重制拦截数,如计算发生错误时能够直接充值计数器,让线程从新执行一次。CyclicBarrier
要等固定数量的线程都到达了屏障位置才能继续执行,而 CountDownLatch
只需等待数字倒数到 0
,也就是说 CountDownLatch
做用于事件,但 CyclicBarrier
做用于线程;CountDownLatch
是在调用了 countDown
方法以后把数字倒数减 1
,而 CyclicBarrier
是在某线程开始等待后把计数减 1
。CyclicBarrier
有执行动做 barrierAction
,而 CountDownLatch
没这个功能。Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源,acquire
方法)。线程能够从信号量中去“获取”一个许可证,一旦线程获取以后,信号量持有的许可证就转移过去了,因此信号量手中剩余的许可证要减一。release
方法),这个许可证至关于被归还给信号量了,因而信号量中的许可证的可用数量加一。当信号量拥有的许可证数量减到 0 时,若是下个线程还想要得到许可证,那么这个线程就必须等待,直到以前获得许可证的线程释放,它才能获取。因为线程在没有获取到许可证以前不能进一步去访问被保护的共享资源,因此这就控制了资源的并发访问量,这就是总体思路。IO
操做,咱们能够启动不少线程可是数据库的链接池是有限制的,假设咱们设置容许五个连接,若是咱们开启太多线程直接操做则会出现异常,这时候咱们能够经过信号量来控制,让一直最多只有五个线程来获取链接。代码以下:/* Semaphore 是信号量, 能够用来控制线程的并发数,能够协调各个线程,以达到合理的使用公共资源 */ public static void main(String[] args) { //建立10个容量的线程池 final ExecutorService service = Executors.newFixedThreadPool(100); //设置信号量的值5 ,也就是容许五个线程来执行 Semaphore s = new Semaphore(5); for (int i = 0; i < 100; i++) { service.submit(() ->{ try { s.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("数据库耗时操做"+Thread.currentThread().getName()); TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "正在执行...."); s.release(); }); } }
如上代码,建立了一个容量100的线程池,模拟咱们程序中大量的线程,添加一百个任务,让线程池执行。建立了一个容量为5的信号量,在线程中咱们调用
acquire
来得到信号量的许可,只有得到了才能只能下面的内容否则阻塞。当执行完后释放该许可,经过release
方法,dom
private static int count = 0; /* Semaphore 中若是咱们容许的的许可证数量为1 ,那么它的效果与锁类似。 */ public static void main(String[] args) throws InterruptedException { final ExecutorService service = Executors.newFixedThreadPool(10); Semaphore semaphore = new Semaphore(1); for (int i = 0; i < 10000; i++) { service.submit(() ->{ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行了"); count ++; semaphore.release(); }); } service.shutdown(); TimeUnit.SECONDS.sleep(5); System.out.println(count); }
public boolean tryAcquire()
:tryAcquire
和锁的 trylock
思惟是一致的,是尝试获取许可证,至关于看看如今有没有空闲的许可证,若是有就获取,若是如今获取不到也不要紧,没必要陷入阻塞,能够去作别的事。public boolean tryAcquire(long timeout, TimeUnit unit)
:是一个重载的方法,它里面传入了超时时间。好比传入了 3 秒钟,则意味着最多等待 3 秒钟,若是等待期间获取到了许可证,则往下继续执行;若是超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。int availablePermits()
:返回此信号量中当前可用的许可证数int getQueueLength()
:返回正在等待许可证的线程数boolean hasQueuedThreads()
:判断是否有线程正在等待获取许可证void reducePermits(int reduction)
:减小 reduction
个许可证,是个 protected
方法Collection<Thread> getQueuedThreads()
:返回正在等待获取许可证的线程集合,是个 protected
方法Exchanger
(交换者)是一个用于线程间协做的工具类,它主要用于进行线程间数据的交换,它有一个同步点,当两个线程到达同步点时能够将各自的数据传给对方,若是一个线程先到达同步点则会等待另外一个到达同步点,到达同步点后调用 exchange
方法能够传递本身的数据而且得到对方的数据。public class ExchangerTest { /* Exchanger 交换, 用于线程间协做的工具类,能够交换线程间的数据, 其提供一个同步点,当线程到达这个同步点后进行数据间的交互,遗传算法能够如此来实现, 以及校对工做也能够如此来实现 */ public static void main(String[] args) { /* 模拟 两个工做人员录入记录,为了防止错误,二者录的相同内容,程序仅从校对,看是否有错误不一致的 */ //开辟两个容量的线程池 final ExecutorService service = Executors.newFixedThreadPool(2); Exchanger<InfoMsg> exchanger = new Exchanger<>(); service.submit(() ->{ //模拟数据 线程 A的 InfoMsg infoMsg = new InfoMsg(); infoMsg.content="这是线程A"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在执行其余..."); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("线程A 交换数据====== 获得"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("数据不一致~~请稽核"); return; } } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() ->{ //模拟数据 线程 B的 InfoMsg infoMsg = new InfoMsg(); infoMsg.content="这是线程B"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在执行其余..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("线程B 交换数据====== 获得"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("数据不一致~~请稽核"); return; } } catch (InterruptedException e) { e.printStackTrace(); } }); service.shutdown(); } static class InfoMsg{ String id; String name; String message; String content; String desc; @Override public String toString() { return "InfoMsg{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", message='" + message + '\'' + ", content='" + content + '\'' + ", desc='" + desc + '\'' + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; InfoMsg infoMsg = (InfoMsg) o; return Objects.equals(id, infoMsg.id) && Objects.equals(name, infoMsg.name) && Objects.equals(message, infoMsg.message) && Objects.equals(content, infoMsg.content) && Objects.equals(desc, infoMsg.desc); } @Override public int hashCode() { return Objects.hash(id, name, message, content, desc); } } }
上面代码运行能够看到,当咱们线程
A/B
到达同步点即调用exchange
后进行数据的交换,拿到对方的数据再与本身的数据对比能够作到稽核 的效果ide
Exchanger
一样能够用于遗传算法中,选出两个对象进行交互两个的数据经过交叉规则获得两个混淆的结果。Exchanger
中嗨提供了一个方法 public V exchange(V x, long timeout, TimeUnit unit)
主要是用来防止两个程序中一个一直没有执行 exchange
而致使另外一个一直陷入等待状态,这是能够用这个方法,设置超时时间,超过这个时间则再也不等待。本文由AnonyStar 发布,可转载但需声明原文出处。
欢迎关注微信公帐号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online