同步工具类能够是任何一个对象。阻塞队列能够做为同步工具类,其余类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)、以及闭锁(Latch)。java
全部的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行仍是等待,此外还提供了一些方法对状态进行操做,以及另外一些方法用于高效地等待同步工具类进入到预期状态。算法
闭锁是一种同步工具类,能够延迟线程进度直到其到达终止状态。闭锁的做用至关于一扇门:在闭锁到达结束状态以前,这扇门一直是关闭的,而且没有任何线程能经过,当到达结束状态时容许全部的线程经过。当闭锁到达结束状态后,将不会再改变状态,所以这扇门将永远打开。闭锁能够用来确保某些活动直到其余活动都完成才继续执行。安全
CountDownLatch是一种灵活的闭锁实现,它可使一个或多个线程等待一组线程。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示须要等待的事件数量。countDown递减计数器,表示一个事件已经发生,而await方法等待计数器达到零,这表示全部须要等待的事件都已经发生。若是计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。并发
查看源码发现:咱们传进去的参数至关于内部Sync的状态,每次调用countDown的时候将状态值减一,状态值为0表示结束状态(await会解除阻塞)ide
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void countDown() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
查看sync的源码:函数
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } ... }
例如:实现一个统计多个线程并发执行任务的用时功能:工具
当线程执行run中代码的时候会阻塞到startLatch.await(); 直到主线程调用startLatch.countDown(); 将计数器减一。这时全部线程开始执行任务。测试
当线程执行完的时候endLatch.countDown();将结束必锁的计数器减一,此时主线程阻塞在endLatch.await();,直到5个线程都执行完主线程也解除阻塞。ui
package cn.qlq.thread.tone; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo4 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class); public static void main(String[] args) throws InterruptedException { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch endLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { Thread.sleep(1 * 1000); new Thread(new Runnable() { @Override public void run() { try { startLatch.await();// 起始闭锁的计数器阻塞等到计数器减到零(标记第一个线程开始执行) Thread.sleep(1 * 1000); endLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } // 实现计时 long startTime = System.nanoTime(); startLatch.countDown();// 将起始闭锁的计数器减一 endLatch.await();// 结束闭锁阻塞直到计数器为零 long endTime = System.nanoTime(); LOGGER.error("结束,用时{}", endTime - startTime); } }
FutureTask也能够用作闭锁。(Futuretask实现了Future的语义,表示一种抽象的可生成计算结果的计算)。FutureTask的计算是经过Callable实现的,至关于一种可生产运算结果的Runnable,而且能够处于如下三种状态:等待运行、正在运行和运行完成。执行完成表示计算的全部可能结束方式,包括正常结束、异常取消和运行完成。当FutureTask进入完成状态后,它会永远中止在这个状态。this
Future.get取决于任务的状态。若是任务已经完成,那么get会当即返回结果;不然get将阻塞直到任务进入完成状态,而后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
package threadTest; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 实现callable接口,实现Callable接口 * * */ public class MyCallable implements Callable<String> { /** * 实现call方法,接口中抛出异常。由于子类不能够比父类干更多的坏事,因此子类能够不抛出异常 */ @Override public String call() { System.out.println(Thread.currentThread().getName() + " 执行callable的call方法"); return "result"; } public static void main(String[] args) { test1(); } /** * 单个线程 */ public static void test1() { // 1.建立固定大小的线程池 ExecutorService es = Executors.newFixedThreadPool(1); // 2.提交线程任务,用Future接口接受返回的实现类 Future<String> future = es.submit(new MyCallable()); // 3.关闭线程池 es.shutdown(); // 4.调用future.get()获取callable执行完成的返回结果 String result; try { result = future.get(); System.out.println(Thread.currentThread().getName() + "\t" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
结果:
pool-1-thread-1 执行callable的call方法
main result
查看源码:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } ...}
计数信号量(counting Semaphore)用来控制同时访问某个资源的数量,或者同时执行某个操做的数量。计数信号量还能够实现某种资源池,或者对容器实施边界。
信号量是1个的Semaphore意味着只能被1个线程占用,能够用来设计同步(至关于互斥锁)。信号量大于1的Semaphore能够用来设计控制并发数,或者设计有界容器。
Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可由构造函数指定。在执行操做时首先得到许可(只要还有剩余的许可),并在使用后释放。若是没有许可,那么acquire将会一直阻塞直到有许可(或者直到中断或者操做超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量能够用做互斥体(mutex),并具有不可重入的加锁语义:谁拥有了这个惟一的许可谁就拥有了互斥锁。
例如:例如信号量构造一个有界阻塞容器:
信号量的计数值初始化为容器的最大值。add操做在向底层容器添加一个元素以前,首先要获取一个许可。若是add没有添加任何元素,那么会马上释放信号量。一样,remove操做释放一个许可,使更多的元素能加到容器中。
class BoundedHashSet<T> { private Set<T> set; private Semaphore semaphore; public BoundedHashSet(int bound) { set = Collections.synchronizedSet(new HashSet()); semaphore = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { semaphore.acquire();// 尝试获取信号量 boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) {// 若是添加失败就释放信号量,添加成功就占用一个信号量 semaphore.release(); } } } public boolean remove(T o) throws InterruptedException { boolean remove = set.remove(o); if (remove)// 若是删除成功以后就释放一个信号量 semaphore.release(); return remove; } }
测试代码:
BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3); System.out.println(boundedHashSet.add("1")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("3")); System.out.println(boundedHashSet.add("4"));// 将会一直阻塞到这里 System.out.println("=========");
结果:(JVM不会关闭)
注意:
1.Semaphore能够指定公平锁仍是非公平锁,默认是非公平锁
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
2.acquire方法和release方法是能够有参数的,表示获取/返还的信号量个数
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
栅栏(Barrier)相似于闭锁(一种同步工具,能够延迟线程直到其达到其终止状态),它能阻塞一组线程直到某个事件发生。栅栏与闭锁的区别在于全部线程必须同时到达栅栏位置,才能继续执行。闭锁等于等待事件,而栅栏用于等待其余线程。栅栏能够用于实现一些协议,例如几个家庭成员决定在某个地方集合:"全部人6:00到达目的地,而后讨论下一步的事情"。
CyclicBarrier可使必定数量的参与方反复地在栅栏位置聚集,它在并行迭代算法中很是有用:这种算法一般将一个问题划分红一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞到全部线程到达栅栏位置。若是全部线程都到达栅栏,那么栅栏将打开全部线程被释放,而栅栏将被重置以便下次使用。若是对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,全部阻塞的await调用都将终止并抛出BrokenBarrierException。若是成功的经过栅栏,那么await将为每一个线程返回一个惟一的到达索引号,咱们能够用这些索引号"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工做。CyclicBarrier还可使你将一个栅栏操做传递给构造函数,这是一个Runnable,当成功的经过栅栏时会(在一个子线程)执行它,但在阻塞过程被释放以前是不能执行的。
CyclicBarrier的构造方法能够传入参与的数量(也就是被栅栏拦截的线程的数量),也能够传入一个Runnable对象。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
例如:
package cn.qlq.thread.tone; import java.util.concurrent.CyclicBarrier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo2 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class); public static void main(String[] args) throws InterruptedException { final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
结果:
18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
00的时候0线程到达栅栏进入阻塞,02的时候1线程到达栅栏,因为栅栏的参与者是2因此此时至关于全部线程到达栅栏,栅栏放开,而后栅栏被重置。
04的时候2线程到达栅栏进入阻塞,06的时候3线程到达栅栏,因为栅栏的参与者是2因此此时至关于全部参与者线程到达栅栏,而后栅栏放开。
咱们将栅栏的参与者改成5查看结果:
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
结果:4个线程会阻塞到await方法处,并且JVM不会关闭,由于栅栏的参与者不够5个因此被一直阻塞。
Exchanger至关于一个两方(Two-party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操做时,Exchanger很是有用。例如:一个线程向缓冲区写东西,另外一个线程从缓冲区读数据。Exchanger至关于参与者只有两个的CyclicBarrier。
两个线程会阻塞在exchanger.exchange方法上,泛型能够指定其交换的数据类型。
例如:两个线程交换本身的线程名称
package cn.qlq.thread.tone; import java.util.concurrent.Exchanger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo3 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class); public static void main(String[] args) throws InterruptedException { final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交换的数据 for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { String exchange = exchanger.exchange(Thread.currentThread().getName()); LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
结果:
18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2