以下面的复合操做就有可能不安全: java
/** * getLast, rmLast没有同步,可能致使lastIndex错乱 */ @NotThreadSafe public class UnsafeVector<E> { private final Vector<E> v = new Vector<>(); public E getLast(){ int lastIndex = v.size()-1; return v.get(lastIndex); } public E rmLast(){ int lastIndex = v.size()-1; return v.remove(lastIndex); } }
/** * 经过客户端加锁实现线程安全 */ @ThreadSafe public class SafeVector<E> { private final Vector<E> v = new Vector<>(); public E getLast(){ synchronized (v) { int lastIndex = v.size()-1; return v.get(lastIndex); } } public E rmLast(){ synchronized(v){ int lastIndex = v.size()-1; return v.remove(lastIndex); } } }
/** * 下面将会抛出:ConcurrentModificationException * 可经过在迭代前锁住vector, 但这样会损失并发性能 */ @NotThreadSafe public class ModificationExceptionVector { public static void main(String[] args) { Vector<Person> vector = new Vector<>(); for (int i=0; i<10; i++){ vector.add(new Person(i, "person" + i)); } new Thread(new IterateThread(vector)).start(); new Thread(new RemoveThread(vector)).start(); } private static class RemoveThread implements Runnable{ private Vector<Person> v; private Random ran = new Random(); public RemoveThread(Vector<Person> v) { this.v = v; } @Override public void run() { try { // do 100 times' remove for (int i=0 ;i<5; i++){ v.remove(ran.nextInt(v.size())); Thread.sleep(500); } } catch (InterruptedException e) { } } } private static class IterateThread implements Runnable{ private Vector<Person> v; public IterateThread(Vector<Person> v) { this.v = v; } @Override public void run() { try { Iterator<Person> it = v.iterator(); while (it.hasNext()){ System.out.println(it.next()); Thread.sleep(500); } } catch (InterruptedException e) { } } } }
以前有一篇文章介绍过ConcurrentHashMap: http://my.oschina.net/indestiny/blog/209458 编程
public interface ConcurrentMap<K, V> extends Map<K, V> { V putIfAbsent(K key, V value); boolean remove(Object key, Object value); boolean replace(K key, V oldValue, V newValue); V replace(K key, V value); }
/** * 恢复中断状态以免屏蔽中断 */ public class TaskRunnable implements Runnable { private final BlockingQueue<Task> queue; public TaskRunnable(BlockingQueue<Task> queue) { this.queue = queue; } @Override public void run() { try { doTask(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ... }
一个计算多个线程启动到结束耗时的例子: 缓存
/** * 在计时测试中使用CountDownLatch来启动和中止线程 */ public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); //全部线程同时开始执行task的阀门 final CountDownLatch endGate = new CountDownLatch(nThreads); //全部线程结束的阀门 for (int i=0; i<nThreads; i++){ Thread t = new Thread(){ @Override public void run() { try { startGate.await(); //等待startGate值减为0 try { task.run(); } finally{ endGate.countDown(); //一个线程运行结束,值减1 } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); //全部线程开始执行task endGate.await(); //等待全部线程执行结束 long end = System.nanoTime(); return end - start; } }
/** * 使用FutureTask来提早加载稍后须要的数据 */ public class Preloader { private final FutureTask<ProductInfo> future = new FutureTask<>( new Callable<ProductInfo>() { @Override public ProductInfo call() throws Exception { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } private ProductInfo loadProductInfo() { // TODO Auto-generated method stub return null; } public ProductInfo get() throws InterruptedException { try { return future.get(); } catch (ExecutionException e) { // exception handle return null; } } }
/** * 使用Semaphore为容器设置边界 */ public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); //非公平 } public boolean add(T t) throws InterruptedException{ sem.acquire(); //请求semaphore, permits-1或阻塞到permits > 0 boolean wasAdded = false; try { wasAdded = set.add(t); return wasAdded; } finally{ if (!wasAdded) //未添加成功则释放semaphore sem.release(); } } public boolean remove(T t){ boolean wasRemoved = set.remove(t); if (wasRemoved) //删除成功permits+1; sem.release(); return wasRemoved; } }
/** * CyclicBarrier测试 */ public class CyclicBarrierTest { public static void main(String[] args) { int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() { @Override public void run() { //最后一个线程到达栅栏时触发 System.out.println("all have finished."); } }); for (int i=0 ;i<threadCount; i++){ new Thread(new WorkThread(barrier)).start(); } } private static class WorkThread implements Runnable{ private CyclicBarrier barrier; public WorkThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { System.out.println( Thread.currentThread().getId() + " Working..."); try { barrier.await(); //当前线程阻塞直到最后一个线程到达 System.out.println(Thread.currentThread().getId() + " awaiting finished."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }
/** * 经过Exchanger交换2个线程数据 */ public class ExchangerTest { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); } private static class ExchangerRunnable implements Runnable{ private Exchanger<String> exchanger; private String data; public ExchangerRunnable(Exchanger<String> exchanger, String data){ this.exchanger = exchanger; this.data = data; } @Override public void run() { try { String beforeData = this.data; //阻塞直到另外一个线程调用exchanger.exchange(), 交换数据 this.data = this.exchanger.exchange(this.data); System.out.println( Thread.currentThread().getName() + " exchanged " + beforeData + " for " + this.data ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
/** * 计算缓存器 * 内部使用HashMap实现计算结果的缓存 * 经过外部接口同步操做实现线程安全 * 但有可能因为计算时间过长致使性能低下 */ public class Memoizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private final Computable<A, V> c; public Memoizer1(Computable<A, V> c) { this.c = c; } @Override public synchronized V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; } }
/** * 计算缓存器 * 经过ConcurrentHashMap代替HashMap, 提高并发性能 * 但这样有可能多个线程同时调用compute方法, * 因为计算过程当中尚未结果,有可能致使多个线程计算一样的值 */ public class Memoizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private final Computable<A, V> c; public Memoizer2(Computable<A, V> c) { this.c = c; } @Override public V compute(A key) throws InterruptedException { V result = cache.get(key); if (result == null){ result = c.compute(key); //计算 cache.put(key, result); } return result; } }
/** * 计算缓存器 * 经过FutureTask代替map中的Value * 这样能够在计算结果计算完成,就当即返回, * 但仍然有可能重复计算,由于存在非原子的复合操做"若没有则添加": if (f == null){...} */ public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { Future<V> f = cache.get(key); if (f == null){ Callable<V> computeTask = new Callable<V>() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(computeTask); f = ft; cache.put(key, ft); ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (ExecutionException e) { //do exception handle } return null; } }
/** * 计算缓存器 * 经过ConcurrentHashMap.putIfAbsent避免重复任务 */ public class Memoizer<A, V> implements Computable<A, V> { private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } @Override public V compute(final A key) throws InterruptedException { while(true){ Future<V> f = cache.get(key); if (f == null){ Callable<V> computeTask = new Callable<V>() { @Override public V call() throws Exception { return c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(computeTask); f = cache.putIfAbsent(key, ft); //该方法不会对相同key的值进行覆盖,这样避免了相同key的任务被计算 if (f == null) ft.run(); //执行计算 } try { return f.get(); //获取计算结果 } catch (CancellationException e){ cache.remove(key); //计算取消则移除对应的计算任务key } catch (ExecutionException e) { //do exception handle } } } }一,二,三,四就讲述了java并发编程的基础知识。
不可变对象能极大地下降并发编程的复杂性。它们更为简单且安全,能够任意共享而无须使用加锁或保护性复制等机制。 安全
不吝指正。 并发