在上文已经说明,委托是构造线程安全类的一个最有效策略,也就是让现有的线程安全类管理全部的状态便可。如下将介绍这些基础构建模块。html
同步容器类包括Vector和Hashtable以及由Collections.synchronizedXxx
等工厂方法建立的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每一个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。同步容器对全部容器状态的访问都串行化,严重下降了并发性;当多个线程竞争锁时,吞吐量严重降低。java
同步容器类都是线程安全的,可是在某些状况下可能须要额外的客户端加锁来保护复合操做。git
好比,在Vecotr中,getLast()和deleteLast()操做,若是是在多线程的环境下运行,若是不加锁,会产生异常状况。一个线程在getLast()后,另外一个线程deleteLast(),而后该线程继续执行,进行deleteLast()操做,此时会抛出下标越界的异常。编程
又好比,在迭代的过程当中,使用get(index)的操做,若是有多个线程运行,可能会删除其中元素,一样会形成异常。设计模式
对于如上的状况,咱们须要经过客户端加锁来解决线程安全的问题。如在迭代时加锁:数组
synchronized(vector){
for(int i=0;i<vector.size();i++){
vector.get(i);
}
}
复制代码
在迭代或者for-each循环语法时,对容器类进行迭代的标准方式都是使用Iterator。然而,在设计同步容器类的迭代器时并无考虑到并发修改的问题,而且它们表现出的行为时“及时失败”的,也就是当它们发现容器在迭代过程当中被修改时,就会抛出ConcurrentModificationException。缓存
若是在迭代期间,对容器加锁,首先会下降效率,提升线程的等待时间;而后还可能会产生死锁;下降了吞吐量和CPU的利用率。安全
若是不但愿在迭代期间加锁,可使用克隆容器的方法,并在克隆副本上进行迭代。多线程
加锁能够防止迭代器抛出ConcurrentModificationException,可是要在全部对容器进行迭代的地方都要加锁。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器为参数时,都会对容器进行迭代。这些间接的迭代操做可能抛出ConcurrentModificationException。并发
Java 5.0提供了多种并发容器类来改进同步容器的性能。同步容器对全部容器状态的访问都串行化,严重下降了并发性;当多个线程竞争锁时,吞吐量严重降低。
并发容器是针对多个线程并发访问设计的。经过并发容器来替代同步容器,能够极大地提升伸缩性并下降风险。并发容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。
同步容器类在执行每一个操做期间都持有一个锁。ConcurrentHashMap采用了不一样的加锁策略来提供更高的并发性和伸缩性。它并非将每一个方法都在同一个锁上同步,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。
分段锁机制使得任意数量的读取线程能够并发访问Map,执行读取操做的线程和执行写入操做的线程能够并发访问Map,而且必定数量的写入线程能够并发地修改Map,所以提升了并发访问的吞吐量。
并发容器加强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,所以不须要在迭代过程当中对容器加锁。其迭代器具备弱一致性,能够容忍并发的修改,在建立迭代器时会遍历已有元素,并能够(可是不保证)在迭代器被构造后将修改操做反映给容器。size(),isEmpty()等方法返回的是一个近似值。
因为ConcurrentHashMap与Hashtable和synchronizedMap有更多的优点,所以大多数状况应该使用并发容器类,至于当须要对整个容器加锁进行独占访问时,才应该放弃使用并发容器。
注意,此时不能再经过客户端加锁新建新的原子操做了,客户端只能对并发容器自身加锁,但并发容器内部使用的并非自身锁。
写入时复制容器,在每次修改时都会加锁并建立和从新发布一个新的容器副本,直接修改容器引用,从而实现可见性。 写操做在一个复制的数组上进行,读操做仍是在原始数组中进行,读写分离,互不影响。写操做须要加锁,防止并发写入时致使写入数据丢失。写操做结束以后须要把原始数组指向新的复制数组。
CopyOnWriteArrayList 在写操做的同时容许读操做,大大提升了读操做的性能,所以很适合读多写少的应用场景。 可是 CopyOnWriteArrayList 有其缺陷:
阻塞队列支持生产者-消费者模式。简化了开发过程,消除了生产者和消费者之间的代码依赖性。阻塞队列简化了生产者-消费者设计的实现过程。一种常见的生产者-消费者设计模式就是线程池与工做队列的组合。
阻塞队列提供了四种处理方法:
阻塞队列有多种实现。
Java 6提供了Dqueue和BlockingDeque,是双端队列,实现了在队列头和队列尾的高效插入和移除。双端队列适用于工做密取模式。在工做密取中,每一个消费者都有各自的双端队列。若是一个消费者完成了本身的双端队列的所有工做,能够从其余消费者双端队列末尾秘密的获取工做。由于工做者线程不会再单个共享的任务队列上发生竞争。适用于既是生产者又是消费者问题。
线程会阻塞或暂停执行。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。当在代码中调用一个能够抛出InterruptedException的方法时,本身的方法就编程了阻塞方法,必须处理中断的响应。若是这个方法被中断,那么它将努力提早结束状态。
处理中断的响应有两种基本选择:
public void run(){
try{
something();
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
复制代码
同步工具类能够是任何一个对象,只要它根据其自身的状态来协调线程的控制流。包括阻塞队列,信号量,栅栏以及闭锁。
闭锁用来确保某些活动直到其余活动都完成了才继续执行。若是有多个线程,其中一个线程须要等到其余全部线程活动结束后才继续执行,使用闭锁。
CountDownLatch是一种闭锁的实现,可使得一个或者多个线程等待一组事情发生。包括一个计数器,表示须要等待的事件数量;countDown方法用来递减计数器,表示有一个事件已经发生了;await方法等待计数器为0,表示全部须要等待的事情已经发生。
// 初始化闭锁,并设置资源个数
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread( new Runnable(){
public void run(){
// 加载资源1
加载资源的代码……
// 本资源加载完后,闭锁-1
latch.countDown();
}
} ).start();
Thread t2 = new Thread( new Runnable(){
public void run(){
// 加载资源2
资源加载代码……
// 本资源加载完后,闭锁-1
latch.countDown();
}
} ).start();
Thread t3 = new Thread( new Runnable(){
public void run(){
// 本线程必须等待全部资源加载完后才能执行
latch.await();
// 当闭锁数量为0时,await返回,执行接下来的任务
任务代码……
}
} ).start();
复制代码
闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏相似于闭锁,能阻塞一组进程直到某个时间发生。栅栏与闭锁的区别在于,全部线程必须同时到达栅栏位置,才能继续执行。
如有多条线程,他们到达屏障时将会被阻塞,只有当全部线程都到达屏障时才能打开屏障,全部线程同时执行,如有这样的需求可使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。
闭锁只会阻塞一条线程,目的是为了让该条任务线程知足条件后执行; 而同步屏障会阻塞全部线程,目的是为了让全部线程同时执行(实际上并不会同时执行,而是尽可能把线程启动的时间间隔降为最少)。
// 建立同步屏障对象,并制定须要等待的线程个数 和 打开屏障时须要执行的任务
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
public void run(){
//当全部线程准备完毕后触发此任务
}
});
// 启动三条线程
for( int i=0; i<3; i++ ){
new Thread( new Runnable(){
public void run(){
// 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障)
barrier.await();
// 任务
任务代码……
}
} ).start();
}
复制代码
信号量用于控制同时访问某个特定资源的操做数量,或者执行某个指定操做的数量。计数信号量还能够用来实现某种资源池,或者对容器施加边界。
信号量能够用于实现资源池,也能够用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操做时首先获取许可,并在使用之后释放许可。若是没有许可,将阻塞直到有许可或被中断,超时。
信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能容许m条线程访问资源。
// 建立信号量对象,并给予3个资源
Semaphore semaphore = new Semaphore(3);
// 开启10条线程
for ( int i=0; i<10; i++ ) {
new Thread( new Runnbale(){
public void run(){
// 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源
semaphore.acquire();
// 任务代码
……
// 释放资源
semaphore.release();
}
} ).start();
}
复制代码
能够用做闭锁,是一种能够生成结果的Runnable,能够处于如下三种状态:等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会中止在这个状态上。
FutureTask在Executor框架中表示异步任务,此外还能够用来表示一些时间较长的运算,这些计算能够在使用计算结构以前启动。
首先,使用HashMap和同步机制来初始化缓存。
public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunc implements Computable<String,BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
return new BigInteger(arg);
}
}
public class Memoizer1<A,V> implements Computable<A,V> {
private final Map<A,V> cache=new HashMap<>();
private final Computable<A,V> c;
public Memoizer1(Computable<A,V> c){
this.c=c;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result=cache.get(arg);
if(result==null){
result=c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
复制代码
在这种实现方法中,使用HashMap保存以前计算的结果。首先检查须要的结果是否已经在缓存中,若是存在则返回以前计算,不然将计算结果缓存到HashMap再返回。
为了确保线程安全,将整个compute方法进行同步。可是这样伸缩性差,缓存的性能并无获得提高。
下面使用ConcurrentHashMap替换HashMap。可是,这种方法存在一些不足,当两个线程同时调用compute时,可能会致使计算获得相同的值。这样是低效的,由于缓存的做用就是避免相同的数据被计算屡次。其问题在于,若是某个线程启动了一个计算,而其余线程并不知道这个计算正在进行,极可能会重复这个计算。
针对如上问题,咱们考虑可使用FutureTask来解决。使用该类来表示计算的过程,若是有结果可用,则返回结果,不然一直阻塞。
public class Memo2 <A,V> implements Computable<A,V>{
private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
private final Computable<A,V>c;
public Memo2(Computable<A,V>c){
this.c=c;
}
@Override
public V compute(A arg) throws InterruptedException {
Future<V> future=cache.get(arg);
if(future==null){
Callable<V> eval=new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft=new FutureTask<>(eval);
future=cache.putIfAbsent(arg,ft);
if(future==null){
future=ft;
ft.run();
}
}
try{
return future.get();
}catch (ExecutionException e){
e.printStackTrace();
}
return null;
}
}
复制代码