JDK 1.5 以前, 主要包括:数组
Vector
和 Hashtable
)Collections.synchronizedXxx
)这些类的共同特征是, 公共方法都是由 synchronized
来修饰的, 以限制一次只能有一个线程能访问容器.安全
老的容器自身并不支持复合操做, 包括:并发
好在老的容器类遵循一个支持 客户端加锁 的同步策略. 来解决复合运算的问题:函数
解决迭代和导航:ui
synchronized(list) { // 确保调用 size() 后, list 大小不会改变 for (int i = 0; i < list.size(); ++i) { doSomething(list[i]); } }
解决条件运算:this
synchronized(list) { // 确保调用 size() 后, list 大小不会改变 int lastIndex = list.size() - 1; list.remove(lastIndex); }
这样作的弊端是:线程
作任何操做都要锁住整个容器, 效率低, 容易出错.设计
ConcurrentModificationException
Collection
进行迭代的标准方法是使用 Iterator
, 不管是显式使用仍是 经过 JDK 1.5 以后的 for-each
语法. code
在 迭代 的时候, 仍有其余线程在并发修改容器的可能性, 使用迭代器仍不可避免地须要在迭代期间对容器加锁.xml
迭代器在并发修改的时候, 策略是 及时失败(fail-fast) 的: 当发现迭代器被修改后(如: add
和 remove
), 会抛出一个未检查的 ConcurrentModificationException
.
以 ArrayList
为例子, 其父类 AbstractList
内部有一个字段名为 modCount
的计数器. 任何改变 List
大小的操做都须要改变 modCount
这个值.
这个值会被用来在迭代或者时, 检查有没有修改容器, 套路是这样的:
修改时:
if (modCount != expectedModCount) throw new ConcurrentModificationException(); } // Add or Remove // ....... expectedModCount = modCount;
迭代:
public E prev/next() { if (modCount != expectedModCount) throw new ConcurrentModificationException(); } // Other..... }
Note: ConcurrentModificationException
也能够出现单线程的代码中, 好比当在迭代期间调用 remove
方法
有时候, 一些操做会隐含的调用迭代器, 好比:
调用 toString()
方法, 尤为是写 log 时, 有
log("Set:" + set);
这样的语句.
hashCode
和 equals
方法, 如下是 HashTable
的 hashCode
和 equals
方法:
public synchronized boolean equals(Object o) { if (o == this) return true; if (!(o instanceof Map)) return false; Map<?,?> t = (Map<?,?>) o; if (t.size() != size()) return false; try { Iterator<Map.Entry<K,V>> i = entrySet().iterator(); while (i.hasNext()) { Map.Entry<K,V> e = i.next(); K key = e.getKey(); V value = e.getValue(); if (value == null) { if (!(t.get(key)==null && t.containsKey(key))) return false; } else { if (!value.equals(t.get(key))) return false; } } } catch (ClassCastException unused) { return false; } catch (NullPointerException unused) { return false; } return true; } public synchronized int hashCode() { int h = 0; if (count == 0 || loadFactor < 0) return h; // Returns zero loadFactor = -loadFactor; // Mark hashCode computation in progress Entry<?,?>[] tab = table; for (Entry<?,?> entry : tab) { while (entry != null) { h += entry.hashCode(); entry = entry.next; } } loadFactor = -loadFactor; // Mark hashCode computation complete return h; }
另外 containAll
, removeAll
和 retainAll
也会产生迭代.
JDK 1.5 后, 新增长了:
ConcurrentHashMap
, 来替代同步的 Map
实现, 增长了 put-if-absent
, 替换和条件删除CopyOnWriteArrayList
, 是 List
相应的同步实现Queue
, 用来临时保存正在等待进一步处理的一系列元素, 实现包括ConcurrentLinkedQueue
, 一个传统的 FIFO 队列PriorityQueue
, 一个(非并发)居右优先级顺序的队列BlockingQueue
, 拓展自 Queue
, 增长了可阻塞的插入和获取操做. JDK 1.6 后, 新增长了
Deque
和 BlockingDeque
, 分别扩展了 Queue
和 BlockingQueue
:
Deque
接口, 实现类是 ArrayDeque
, 不阻塞BlockingDeque
接口, 实现类是 LinkedBlockingDeque
, 阻塞.ConcurrentSkipListMap
和 ConcurrentSkipListSet
, 做为 SortedMap
和 SortedSet
的并发替代品
Note: 从一个空的Queue
中取元素, 并不会阻塞, 而是返回 null
ConcurrentHashMap
在 ConcurrentHashMap
以前, HashTable
和 SynchronizedMap
都是经过给整个方法加 synchronized
来达到同步的, 这样限制某一时刻只有一个线程能够访问容器.
ConcurrentHashMap
使用一个更加细化的锁机制, 名叫分离锁. 这个机制容许更深层次的共享访问:
因为并发环境中, Map 的大小一般是动态的, size
和 isEmpty
返回的只是个估算值(可能返回后接着过时).
支持的复合操做:
CopyOnWriteArrayList
写入时复制(COW)
容器的线程安全原理:
只要不可边对象被正确发布, 那么访问它将不须要更多的同步.
所以, 每次添加/修改一个元素, 容器内就会新建立一个新的数组, 容器底层的数组会指向这个新数组. 旧数组仍然被使用, 直到没有引用后被 GC 回收.
因为 COW 复制数组有开销, 因此 COW 适用于容器迭代操做远远高于对容器修改的频率.
FAQ: Arrays.copyOf
和 System.arraycopy
区别?
Arrays.copyOf
不只会复制元素, 还会建立新的数组. System.arrayCopy
拷贝到一个现有数组, Arrays.copyOf
实现中用了 System.arrayCopy
;
生产者-消费者设计分离了 "识别须要完成的工做" 和 "执行工做". 该模式不会发现一个工做便当即处理, 而是把工做置入一个任务清单中:
最多见的生产者-消费者设计是: 线程池和工做队列的结合
在设计初期就使用阻塞队列创建对资源的管理, 提前作这件事情会比往后再修复容易的多.
Blocking queue 提供了可阻塞的 put
和 take
方法. 常见的实现有:
LinkedBlockedQueue
, FIFO, 链表实现, 队列首 take, 队列尾 put.ArrayBlockingQueue
, FIFO, 数组实现, 能够在 putIndex(队列尾) 插入, 从 takeIndex(队列首) 取出.PriorityBlockingQueue
, 根据 Comparator 排序顺序取出SynchronousQueue
, 生产线程直接和消费线程对接, 若是生产线程找不到消费者或反之, 则, put 和 take 会一直阻止. 只有在消费者充足的时候比较适合, 他们总能为下一个任务作好准备.双端队列用来实现 窃取工做(work stealing) 模式.
在传统的 生产者-消费者 设计中, 全部的消费者只共享一个工做队列.
而在 窃取工做 设计中, 每个消费者都有一个本身的双端队列. 若是一个消费者完成了本身双端队列中的所有工做, 它能够偷取其余消费者的双端队列的 末尾 任务(其余消费者仍然从队列 首 取任务).
由于工做者线程并不会竞争一个共享的任务队列, 因此 窃取工做 模式比传统的 生产者-消费者 设计有更好的伸缩性.
阻塞: 线程被挂起, 状态变为BLOCKED
, WAITING
或是 TIMED_WAITING
, 等待直到一个事件发生才能继续进行.
BlockingQueue
的 put
和 take
方法会抛出一个受检查的 InterruptedException
, 这个异常说明这是个阻塞方法, 能够被中断来提早结束阻塞.
处理中断的方法:
InterruptedException
. 传递给调用者, 能够对其中特定活动进行简洁地清理后, 再抛出.恢复中断. 当代码是 Runnable
的一部分时, 必须捕获 InterruptedException
. 而且, 在当前线程中调用 interrupt
从新设置中断状态(抛出异常会清理中断标志位), 这样调用栈中更高层代码能够发现中断已经发生.
try { processTask(queue.take()); } catch (InterruptedException e) { // 恢复中断状态 Thread.currentThread().interrupt(); }
Synchronizer 是一个对象, 它根据自己的状态调节线程的控制流. 主要类型有:
他们的特性: 封装状态, 这些状态绝对着线程执行到某一点时是经过仍是被迫等待.
直到 闭锁 到达 终点状态 以前, 门一直是关闭的, 没有线程可以经过, 在 终点状态 到来的时候, 门开了, 容许全部线程经过. 一旦到了终点状态, 他就 不能 再改变状态了.
用例:
FutureTask 描述了一个抽象的可携带结果的计算. FutureTask的计算经过 Callable
实现.
Callable
等价于一个可携带结果的 Runnable
. Callable
有三种状态:
要获取 FutureTask
的结果, 能够调用 get()
方法. 调用 get()
时, 有两种状况:
FutureTask
保证了计算结果将计算线程安全的传递到当前线程.
假如FutureTask
执行的任务有异常抛出, 则异常会被封装在 ExecutionException
里. 如下代码能够从 ExecutionException
中取出异常:
try { futureTask.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceOf XXXException) { // 本身想要捕获的异常 } else { throw launderThrowable(cause); } } public static RuntimeException launderThrowable(Throwable cause) { if (t instanceOf RuntimeException) { return (RuntimeException)t; } else if (t instanceOf Error) { } else { throw new IllegalStateException("Not unchecked", t); } }
计数信号量用来控制可以同时访问某种资源的活动的数量, 或者同时执行某一操做的数量.
使用计数信号量以前须要先构造一个, 构造时能够将许可集(permit)总数传递进去. 在使用计数信号量时, 要先尝试获取(acquire)一个许可, 假如此时有剩余许可则继续执行, 若没有, 则 阻塞. 使用完以后, 要手动释放(release)一个许可.
用处:
关卡用来阻塞一组线程, 直到 全部线程 达到一个条件. 就像一些家庭成员指定商场的一个集合地点:"咱们每一个人6:00在麦当劳见, 到了之后不见不散, 以后咱们再决定接下来作什么".
关卡 与 闭锁 的不一样:
关卡: 等待的是其余线程, 能够重复被使用 闭锁: 等待的是事件, 只能使用一次
当一个线程到达关卡点时, 调用 await
, await
会被阻塞, 直到全部线程都到达关卡点.
若是对 await
的调用超时, 或者阻塞中的线程被中断, 那么关卡就被认为是 失败 的.
await
, 那么当这个线程 await
超时, 这个线程会抛出 TimeoutException
异常, 其余调用 barrior.await()
的线程会抛出 BrokenBarrierException
;若是成功地经过关卡, await
为每个线程返回一个惟一的到达索引号, 能够用来 "选举" 产生一个领导, 在下一次迭代中承担一些特殊工做.
CyclicBarrier
也容许你向构造函数传递一个 关卡行为(Barrier action), 这是一个 Runnable, 当成功经过关卡的时候, 会(在 某一个 子任务线程中) 执行, 可是在阻塞线程被释放以前是不能执行的.
Exchanger 是关卡的另外一种形式, 它是一种两步关卡, 在关卡点会交换数据.