前言:多线程并发在咱们的业务场景很常见,可是为了解决多线程的并发问题并不容易,即使你是中高级程序员亦或是架构师,你都不必定可以吃透并解决多线程并发场景所暴露的问题,今天抛砖引玉来给你们介绍下多程序并发的五种同步辅助类,但愿在线程并发上能为你们之后提供一下思路以及解决问题。html
1.CountDownLatch java
CountDownLatch 是一种很是简单、但很经常使用的同步辅助类。其做用是在完成一组正在其余线程中执行的操做以前,容许一个或多个线程一直阻塞。用给定的计数初始化CountDownLatch。因为调用了countDown()方法,因此在当前计数到达零以前,await方法会一直受阻塞。以后,会释放全部等待的线程,await的全部后续调用都将当即返回程序员
示例代码编程
//建立时,就须要指定参与的parties个数 数组
int parties = 12; 安全
CountDownLatch latch = new CountDownLatch(parties); 多线程
//线程池中同步task 架构
ExecutorService executor = Executors.newFixedThreadPool(parties); 并发
for(int i = 0; i < parties; i++) { dom
executor.execute(new Runnable() {
@Override
public void run() {
try {
//能够在任务执行开始时执行,表示全部的任务都启动后,主线程的await便可解除
//latch.countDown();
//run
//..
Thread.sleep(3000);
} catch (Exception e) {
}
finally {
//任务执行完毕后:到达
//表示全部的任务都结束,主线程才能继续
latch.countDown();
}
}
});
}
latch.await();//主线程阻塞,直到全部的parties到达
//latch上全部的parties都达到后,再次执行await将不会有效,
//即barrier是不可重用的
executor.shutdown();
2.CyclicBarrier
CyclicBarrier 一种可重置的多路同步点,在某些并发编程场景颇有用。它容许一组线程互相等待,直到到达某个公共的屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 颇有用。由于该 barrier在释放等待线程后能够重用,因此称它为循环的barrier。
下面看看对应的方法。
public CyclicBarrier(int parties, Runnable barrierAction)
建立一个新的CycleBarrier,它将在给定数量的参与者(线程)处于等待状态时候启动,并在启动barrier时执行给定的屏障操做,该操做由最后一个今日的屏障的线程执行。参数barrierAction是在启动屏障的时候执行命令,若是不执行任何操做则该参数是null。
public int await()
在全部参与者都已经在此barrier上调用 await方法以前,将一直等待。若是当前线程部署将要到达的最后一个线程将禁用它。
reset
将屏障重置为其初始化状态,若是全部参与者目前都在屏障处等待,则它们将返回,并且会抛出一个异常。
getNumberWaiting
返回当前在屏障处等待的参与者数目。
3.Semaphore(信号量)
信号量是一类经典的同步工具。信号量一般用来限制线程能够同时访问的(物理或逻辑)资源数量。
咱们以一个停车场运做为例来讲明信号量的做用。假设停车场只有三个车位,一开始三个车位都是空的。这时若是同时来了三辆车,看门人容许其中它们进入进入,而后放下车拦。之后来的车必须在入口等待,直到停车场中有车辆离开。这时,若是有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,若是又离开一辆,则又能够放入一辆,如此往复。
在这个停车场系统中,车位是公共资源,每辆车比如一个线程,看门人起的就是信号量的做用。信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的例子中能够用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中能够用车辆类比线程),首先要查看信号量,若是信号量的值大于1,则将其减1,而后去占有公共资源。若是信号量的值为0,则线程会将本身阻塞,直到有其它线程释放公共资源。
在信号量上咱们定义两种操做: acquire(获取) 和 release(释放)。当一个线程调用acquire操做时,它要么经过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,而后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另外一个用于并发线程数的控制。
使用示例
papublic class SemaphoreDemo {
private Semaphore smp = new Semaphore(3); private Random rnd = new Random(); class TaskDemo implements Runnable{
private String id; TaskDemo(String id){
this.id = id; }
public void run(){
try { smp.acquire(); System.out.println("Thread " + id + " is working"); Thread.sleep(rnd.nextInt(1000)); smp.release(); System.out.println("Thread " + id + " is over"); } catch (InterruptedException e) { } } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); //注意我建立的线程池类型, ExecutorService se = Executors.newCachedThreadPool(); se.submit(semaphoreDemo.new TaskDemo("a")); se.submit(semaphoreDemo.new TaskDemo("b")); se.submit(semaphoreDemo.new TaskDemo("c")); se.submit(semaphoreDemo.new TaskDemo("d")); se.submit(semaphoreDemo.new TaskDemo("e")); se.submit(semaphoreDemo.new TaskDemo("f")); se.shutdown(); }}
运行结果
Thread c is working
Thread b is working
Thread a is working
Thread c is over
Thread d is working
Thread b is over
Thread e is working
Thread a is over
Thread f is working
Thread d is over
Thread e is over
Thread f is over
能够看出,最多同时有三个线程并发执行,也能够认为有三个公共资源(好比计算机的三个串口)。
参考内容:http://www.cnblogs.com/nullzx/p/5270233.html
4.Phaser
在JAVA 1.7引入了一个新的并发API:Phaser,一个可重用的同步barrier。在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,可是Phaser更加灵活,并且侧重于“重用”。
API简述
一、Phaser():构造函数,建立一个Phaser;默认parties个数为0。此后咱们能够经过register()、bulkRegister()方法来注册新的parties。每一个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了全部的waiter,即由于advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每一个QNode保存一个waiter的信息,好比Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另外一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只须要响应另外一个Queue中的waiters便可,避免出现混乱。
二、Phaser(int parties):构造函数,初始必定数量的parties;至关于直接regsiter此数量的parties。
三、arrive():到达,阻塞,等到当前phase下其余parties到达。若是没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,若是Phaser已经终止,则返回负数。
四、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会致使Phaser内部的parties个数减一(只影响当前phase),即下一个phase须要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。
五、arriveAndAwaitAdvance():到达,且阻塞直到其余parties都到达,且advance。此方法等同于awaitAdvance(arrive())。若是你但愿阻塞机制支持timeout、interrupted响应,可使用相似的其余方法(参见下文)。若是你但愿到达后且注销,并且阻塞等到当前phase下其余的parties到达,可使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。
六、awaitAdvance(int phase):阻塞方法,等待phase周期数下其余全部的parties都到达。若是指定的phase与Phaser当前的phase不一致,则当即返回。
七、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程若是被外部中断,则此方法当即返回,并抛出InterrutedException。
八、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。
九、forceTermination():强制终止,此后Phaser对象将不可用,即register等将再也不有效。此方法将会致使Queue中全部的waiter线程被唤醒。
十、register():新注册一个party,致使Phaser内部registerPaties数量加1;若是此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,若是Phaser已经中断,将会返回负数。
十一、bulkRegister(int parties):批量注册多个parties数组,规则同十、。
十二、getArrivedParties():获取已经到达的parties个数。
1三、getPhase():获取当前phase周期数。若是Phaser已经中断,则返回负值。
1四、getRegisteredParties():获取已经注册的parties个数。
1五、getUnarrivedParties():获取还没有到达的parties个数。
代码示例:
//建立时,就须要指定参与的parties个数
int parties = 12;
//能够在建立时不指定parties
// 而是在运行时,随时注册和注销新的parties
Phaser phaser = new Phaser();
//主线程先注册一个
//对应下文中,主线程能够等待全部的parties到达后再解除阻塞(相似与CountDownLatch)
phaser.register();
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
phaser.register();//每建立一个task,咱们就注册一个party
executor.execute(new Runnable() {
@Override
public void run() {
try {
int i = 0;
while (i < 3 && !phaser.isTerminated()) {
System.out.println("Generation:" + phaser.getPhase());
Thread.sleep(3000);
//等待同一周期内,其余Task到达
//而后进入新的周期,并继续同步进行
phaser.arriveAndAwaitAdvance();
i++;//咱们假定,运行三个周期便可
}
} catch (Exception e) {
}
finally {
phaser.arriveAndDeregister();
}
}
});
}
//主线程到达,且注销本身
//此后线程池中的线程便可开始按照周期,同步执行。
phaser.arriveAndDeregister();
参考内容:http://shift-alt-ctrl.iteye.com/blog/2302923
5.Exchanger
类java.util.concurrent.Exchanger提供了一个同步点,在这个同步点,一对线程能够交换数据。每一个线程经过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据,并返回。
当在运行不对称的活动时颇有用。好比说,一个线程向buffer中填充数据,另外一个线程从buffer中消费数据;这些线程能够用Exchange来交换数据。这个交换对于两个线程来讲都是安全的。
package com.clzhang.sample.thread;import java.util.*;import java.util.concurrent.Exchanger;public class SyncExchanger {
private static final Exchanger exchanger = new Exchanger();
class DataProducer implements Runnable {
private List list = new ArrayList(); public void run() {
for (int i = 0; i < 5; i++) { System.out.println("生产了一个数据,耗时1秒"); list.add(new Date()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } try { list = (List) exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); }
for (Iterator iterator = list.iterator(); iterator.hasNext();) { System.out.println("Producer " + iterator.next()); } } }
class DataConsumer implements Runnable {
private List list = new ArrayList();
public void run() {
for (int i = 0; i < 5; i++) { list.add("这是一个收条。"); }
try { list = (List) exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); }
for (Iterator iterator = list.iterator(); iterator.hasNext();) { Date d = (Date) iterator.next(); System.out.println("Consumer: " + d); } } }
public static void main(String[] args) { SyncExchanger ins = new SyncExchanger();
new Thread(ins.new DataProducer()).start();
new Thread(ins.new DataConsumer()).start(); } }
输出
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Consumer: Thu Sep 12 17:21:39 CST 2013
Consumer: Thu Sep 12 17:21:40 CST 2013
Consumer: Thu Sep 12 17:21:41 CST 2013
Consumer: Thu Sep 12 17:21:42 CST 2013
Consumer: Thu Sep 12 17:21:43 CST 2013
https://mp.weixin.qq.com/s/JvORQc7fAS7AVIRsO8SdHA