java并发编程(四): 基础构建模块

基础构建模块:

  • 委托是建立线程安全类的一个最有效的策略:只需让现有的线程安全类管理全部的状态便可。

同步容器类:

  • 同步容器类包括:Vector, HashtableCollections.synchronizedXxx()方法产生的实例。
  • 同步容器类是线程安全的,但在某些状况下须要客户端加锁保护来实现一些复合操做
  • 常见复合操做:迭代跳转条件运算,如"若没有则添加"。

以下面的复合操做就有可能不安全: java

/**
 * getLast, rmLast没有同步,可能致使lastIndex错乱
 */
@NotThreadSafe
public class UnsafeVector<E> {
	private final Vector<E> v = new Vector<>();
	
	public E getLast(){
		int lastIndex = v.size()-1;
		return v.get(lastIndex);
	}
	
	public E rmLast(){
		int lastIndex = v.size()-1;
		return v.remove(lastIndex);
	}
}
  • 因为同步容器类要遵照同步策略,即支持客户端加锁,上面代码能够经过客户端加锁实现线程安全:
/**
 * 经过客户端加锁实现线程安全
 */
@ThreadSafe
public class SafeVector<E> {
	private final Vector<E> v = new Vector<>();
	
	public E getLast(){
		synchronized (v) {
			int lastIndex = v.size()-1;
			return v.get(lastIndex);
		}
	}
	
	public E rmLast(){
		synchronized(v){
			int lastIndex = v.size()-1;
			return v.remove(lastIndex);
		}
	}
}

迭代器与ConcurrentModificationException:

  • 容器在迭代过程当中被修改时 ,就会抛出一个ConcurrentModificationException异常。
/**
 * 下面将会抛出:ConcurrentModificationException
 * 可经过在迭代前锁住vector, 但这样会损失并发性能
 */
@NotThreadSafe
public class ModificationExceptionVector {
	public static void main(String[] args) {
		Vector<Person> vector = new Vector<>();
		for (int i=0; i<10; i++){
			vector.add(new Person(i, "person" + i));
		}
		new Thread(new IterateThread(vector)).start();
		new Thread(new RemoveThread(vector)).start();
	}
	
	private static class RemoveThread implements Runnable{
		private Vector<Person> v;
		private Random ran = new Random();
		public RemoveThread(Vector<Person> v) {
			this.v = v;
		}
		
		@Override
		public void run() {
			try {
				// do 100 times' remove
				for (int i=0 ;i<5; i++){
					v.remove(ran.nextInt(v.size()));
					Thread.sleep(500);
				}
			} catch (InterruptedException e) {
			}
		}
	}
	
	private static class IterateThread implements Runnable{
		private Vector<Person> v;
		
		public IterateThread(Vector<Person> v) {
			this.v = v;
		}
		
		@Override
		public void run() {
			try {
				Iterator<Person> it = v.iterator();
				while (it.hasNext()){
					System.out.println(it.next());
					Thread.sleep(500);
				}
			} catch (InterruptedException e) {
			}
		}
	}
}

隐藏迭代器:

  • 正如封装对象的状态有助于维持不变性条件同样,封装对象的同步机制一样有助于确保实施同步策略。
  • 一些隐藏的迭代操做:hashCode, equals, containsAll, removeAll, retainAll等。

并发容器:

  • 经过并发容器来代替同步容器,能够极大地提升伸缩性下降风险

ConrrentHashMap:

以前有一篇文章介绍过ConcurrentHashMap: http://my.oschina.net/indestiny/blog/209458 编程

  • ConcurrentHashMap使用一种粒度更细的加锁机制来实现大程度的共享,这种机制称为分段锁(Lock Striping);
  • ConcurrentHashMap的迭代器不会抛出ConcurrentModificationException,所以不须要在迭代过程当中加锁,由于其返回的迭代器具备弱一致性,而非"及时失败"
  • ConcurrentHashMap对一些操做进行了弱化,如size(计算的是近似值,而不是精确值), isEmpty等。

额外的原子Map操做:

  • ConcurrentMap声明了一些原子操做接口:
public interface ConcurrentMap<K, V> extends Map<K, V> {    
    V putIfAbsent(K key, V value);
    boolean remove(Object key, Object value);
    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);
}

CopyOnWriteArrayList:

  • CopyOnWriteArrayList比同步List具备更高的并发性能,并且在迭代时不须要加锁或复制
  • 其安全性在于:只要发布一个事实不可变的对象,那么在访问该对象时就不须要进一步同步;在每次修改都会建立一个新的容器副本,从而实现可变性
  • 仅当迭代操做远远多于修改操做时,才应该使用"写入时复制"容器。好比事件通知系统,对监听器列表中的每一个监听器进行通知。

阻塞队列和生产者--消费者模式:

  • 在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具;它们可以意志或防止产生过多的工做项,使应用程序在负荷过载的状况下变得更加健壮。
  • BlockingQueue实现:LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue;

串行线程封闭:

  • 对于可变对象生产者--消费者这种设计与阻塞队列一块儿,促进了串行线程封闭,从而将对象全部权从生产者交付给消费者。

双端队列与工做密取:

  • java6提供了双端队列:ArrayDeque, LinkedBlockingDeque;
  • 双端队列适用于另外一种模式:工做密取,每一个消费者有各自的双端队列,这种模式很是适合既是消费者又是生产者问题。
  • 当消费者本身的双端队列为空时,它会从其余消费者队列末尾中密取任务。

阻塞方法与中断方法:

  • 阻塞的缘由:等待I/O操做结束等待得到一个锁等待从Thread.sleep方法中醒来,或是等待另外一个线程的计算结果等。
  • 传递InterreuptedException: 抛出异常给方法调用者,或捕获异常,作一些清理工做再抛出抛出异常。
  • 恢复中断:有时不能抛出InterruptedException, 好比在Runnable中,则能够恢复中断
/**
 * 恢复中断状态以免屏蔽中断
 */
public class TaskRunnable implements Runnable {
	private final BlockingQueue<Task> queue;
	public TaskRunnable(BlockingQueue<Task> queue) {
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try {
			doTask(queue.take());
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
       ...
}

同步工具类:

  • 任何一个对象均可以是同步工具类,java平台提供的一些同步工具类有:Semaphore(信号量), Barrier(栅栏), Latch(闭锁);

闭锁:

  • 闭锁能够用来确保某些活动直到其余活动都完成后才继续执行;

一个计算多个线程启动到结束耗时的例子: 缓存

/**
 * 在计时测试中使用CountDownLatch来启动和中止线程
 */
public class TestHarness {
	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
		final CountDownLatch startGate = new CountDownLatch(1); //全部线程同时开始执行task的阀门
		final CountDownLatch endGate = new CountDownLatch(nThreads); //全部线程结束的阀门
		
		for (int i=0; i<nThreads; i++){
			Thread t = new Thread(){
				@Override
				public void run() {
					try {
						startGate.await(); //等待startGate值减为0
						try {
							task.run();
						} finally{
							endGate.countDown(); //一个线程运行结束,值减1
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			};
			t.start();
		}
		long start = System.nanoTime();
		startGate.countDown(); //全部线程开始执行task
		endGate.await(); //等待全部线程执行结束
		long end = System.nanoTime();
		return end - start;
	}
}

FutureTask:

  • FutureTask也可用作闭锁,表示一种抽象的可生成结果的计算。
/**
 * 使用FutureTask来提早加载稍后须要的数据
 */
public class Preloader {
	private final FutureTask<ProductInfo> future = new FutureTask<>(
			new Callable<ProductInfo>() {
				@Override
				public ProductInfo call() throws Exception {
					return loadProductInfo();
				}
			});
	private final Thread thread = new Thread(future);

	public void start() {
		thread.start();
	}

	private ProductInfo loadProductInfo() {
		// TODO Auto-generated method stub
		return null;
	}

	public ProductInfo get() throws InterruptedException {
		try {
			return future.get();
		} catch (ExecutionException e) {
			// exception handle
			return null;
		}
	}
}

信号量:

  • 计数信号量用来控制同时访问某个特定资源的操做数量,或者同时执行某个制定操做的数量,也能够用来实现某种资源池,或者对容器施加边界
/**
 * 使用Semaphore为容器设置边界
 */
public class BoundedHashSet<T> {
	private final Set<T> set;
	private final Semaphore sem;
	
	public BoundedHashSet(int bound){
		this.set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound); //非公平
	}
	
	public boolean add(T t) throws InterruptedException{
		sem.acquire(); //请求semaphore, permits-1或阻塞到permits > 0
		boolean wasAdded = false;
		
		try {
			wasAdded = set.add(t);
			return wasAdded;
		} finally{
			if (!wasAdded) //未添加成功则释放semaphore
				sem.release();
		}
	}
	
	public boolean remove(T t){
		boolean wasRemoved = set.remove(t);
		if (wasRemoved) //删除成功permits+1;
			sem.release();
		return wasRemoved;
	}
}

栅栏:

  • 栅栏(Barrier)相似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,全部线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件(CutDownLatch值减为0)栅栏用于等待其余线程
/**
 * CyclicBarrier测试
 */
public class CyclicBarrierTest {
	
	public static void main(String[] args) {
		int threadCount = 3;
		CyclicBarrier barrier =
				 new CyclicBarrier(threadCount, new Runnable() {
					@Override
					public void run() { //最后一个线程到达栅栏时触发
						System.out.println("all have finished.");
					}
		});
		
		for (int i=0 ;i<threadCount; i++){
			new Thread(new WorkThread(barrier)).start();
		}
	}
	
	private static class WorkThread implements Runnable{
		private CyclicBarrier barrier;
		
		public WorkThread(CyclicBarrier barrier) {
			this.barrier = barrier;
		}
		
		@Override
		public void run() {
			System.out.println(
					Thread.currentThread().getId() + " Working...");
			try {
				barrier.await(); //当前线程阻塞直到最后一个线程到达
				System.out.println(Thread.currentThread().getId() + " awaiting finished.");
			} catch (InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}
  • 除Barrier栅栏外,还有Exchanger栅栏,它是一种两方栅栏, 能够实现两个线程之间交换数据
/**
 * 经过Exchanger交换2个线程数据
 */
public class ExchangerTest {
	public static void main(String[] args) {
		Exchanger<String> exchanger = new Exchanger<>();

		ExchangerRunnable exchangerRunnable1 =
		        new ExchangerRunnable(exchanger, "A");

		ExchangerRunnable exchangerRunnable2 =
		        new ExchangerRunnable(exchanger, "B");

		new Thread(exchangerRunnable1).start();
		new Thread(exchangerRunnable2).start();
	}
	
	private static class ExchangerRunnable implements Runnable{
		private Exchanger<String> exchanger;
		private String data;
		public ExchangerRunnable(Exchanger<String> exchanger, String data){
			this.exchanger = exchanger;
			this.data = data;
		}

		@Override
		public void run() {
			try {
	            String beforeData = this.data;
	            //阻塞直到另外一个线程调用exchanger.exchange(), 交换数据
	            this.data = this.exchanger.exchange(this.data); 
	            System.out.println(
	                    Thread.currentThread().getName() +
	                    " exchanged " + beforeData + " for " + this.data
	            );
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
		}
	}
}

构建高效且可伸缩的结果缓存:

  • 一个简单安全,性能低下的缓存设计:
/**
 * 计算缓存器
 * 内部使用HashMap实现计算结果的缓存
 * 经过外部接口同步操做实现线程安全
 * 但有可能因为计算时间过长致使性能低下
 */
public class Memoizer1<A, V> implements Computable<A, V> {
	private final Map<A, V> cache = new HashMap<A, V>();
	private final Computable<A, V> c;
	
	public Memoizer1(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public synchronized V compute(A key) throws InterruptedException {
		V result = cache.get(key);
		if (result == null){
			result = c.compute(key); //计算
			cache.put(key, result);
		}
		return result;
	}
}
  • 经过并发容器ConcurrentHashMap代替HashMap,提高并发性能:
/**
 * 计算缓存器
 * 经过ConcurrentHashMap代替HashMap, 提高并发性能
 * 但这样有可能多个线程同时调用compute方法,
 * 因为计算过程当中尚未结果,有可能致使多个线程计算一样的值
 */
public class Memoizer2<A, V> implements Computable<A, V> {
	private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
	private final Computable<A, V> c;
	
	public Memoizer2(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(A key) throws InterruptedException {
		V result = cache.get(key);
		if (result == null){
			result = c.compute(key); //计算
			cache.put(key, result);
		}
		return result;
	}
}
  • 经过FutureTask来弥补重复结果计算问题:
/**
 * 计算缓存器
 * 经过FutureTask代替map中的Value
 * 这样能够在计算结果计算完成,就当即返回,
 * 但仍然有可能重复计算,由于存在非原子的复合操做"若没有则添加": if (f == null){...}
 */
public class Memoizer3<A, V> implements Computable<A, V> {
	private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;
	
	public Memoizer3(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(final A key) throws InterruptedException {
		Future<V> f = cache.get(key);
		if (f == null){
			Callable<V> computeTask = new Callable<V>() {
				@Override
				public V call() throws Exception {
					return c.compute(key);
				}
			};
			FutureTask<V> ft = new FutureTask<>(computeTask);
			f = ft;
			cache.put(key, ft);
			ft.run(); //执行计算
		}
		try {
			return f.get(); //获取计算结果
		} catch (ExecutionException e) {
			//do exception handle
		}
		return null;
	}
}
  • 经过对CocurrentHashMap.putIfAbsent()对上面的问题进行修复:
/**
 * 计算缓存器
 * 经过ConcurrentHashMap.putIfAbsent避免重复任务
 */
public class Memoizer<A, V> implements Computable<A, V> {
	private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;
	
	public Memoizer(Computable<A, V> c) {
		this.c = c;
	}
	
	@Override
	public V compute(final A key) throws InterruptedException {
		while(true){
			Future<V> f = cache.get(key);
			if (f == null){
				Callable<V> computeTask = new Callable<V>() {
					@Override
					public V call() throws Exception {
						return c.compute(key);
					}
				};
				FutureTask<V> ft = new FutureTask<>(computeTask);
				f = cache.putIfAbsent(key, ft); //该方法不会对相同key的值进行覆盖,这样避免了相同key的任务被计算
				if (f == null) ft.run(); //执行计算
			}
			try {
				return f.get(); //获取计算结果
			} catch (CancellationException e){ 
				cache.remove(key); //计算取消则移除对应的计算任务key
			} catch (ExecutionException e) {
				//do exception handle
			}
		}
	}
}
一,二,三,四就讲述了java并发编程的基础知识。

并发技巧清单:

  • 可变状态相当重要的。
       全部并发访问均可以归结为如何协调对并发状态的访问,可变状态越少,越容易确保线程安全性。
  • 尽可能将域声明为final类型,除非须要它们是可变的。
  • 不可变对象必定是线程安全的。

       不可变对象能极大地下降并发编程的复杂性。它们更为简单且安全,能够任意共享而无须使用加锁或保护性复制等机制。 安全

  • 封装有助于管理复杂性。
      将数据封装在对象中,更易于维护不变性;将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁保护每一个可变变量
  • 当保护同一个不变性条件中的全部变量时,要使用同一个锁。
  • 在执行复合操做期间,要持有锁。
  • 若是从多个线程中访问同一个可变变量时没有同步机制,那么程序会可能出问题。
  • 不要自行推断不须要使用同步。
  • 设计过程当中考虑线程安全,不要在上线出问题后再作。
  • 同步策略文档化

不吝指正。 并发

相关文章
相关标签/搜索