HashMap是非线程安全的,在多线程访问时没有同步机制,并发场景下put操做可能致使同一数组下的链表造成闭环,get时候出现死循环,致使CPU利用率接近100%。html
HashTable是线程安全的,使用synchronized锁住整个table的方式来保证并发访问下的线程安全,但效率却比较低下。由于线程1调用HashTable的put同步方法时,线程2的put或get等方法则进入阻塞状态,因此竞争越激烈,效率越低。node
ConcurrentHashMap是支持高并发、高吞吐量的线程安全的Map实现。下面会经过阅读 ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的源码来了解它的演变过程。算法
HashTable在竞争激烈的并发环境中效率低下的缘由是:访问HashTable的线程都竞争同一把锁。数组
ConcurrentHashMap却容许多个修改操做并发的进行,其关键是运用了锁分段技术:将容器内的数据分为一段段(Segment)的来存储,每段能够当作一个小的HashTable,每段数据都分配一把锁。当一个线程占用锁访问这一段数据时,其余线程能够访问其余段的数据。那么当多线程并发访问容器内不一样锁锁住的数据时,线程间就不存在锁竞争,从而有效的提高效率。安全
ConcurrentHashMap的数据结构以下:数据结构
ConcurrentHashMap 由 Segment 数组和 HashEntry 数组组成。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 相似,是一种数组和链表结构。一个 Segment 里包含一个 HashEntry 数组,每一个 HashEntry 用于存储 key-value 键值对数据,同时指向下一个 HashEntry 节点。当定位 key 在 ConcurrentHashMap 中的位置时,须要先通过一次 hash 定位到 Segment 的位置,而后在 hash 定位到指定的HashEntry。多线程
Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,每一个 Segment 守护着一个HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先得到它对应的Segment锁。而大多读操做是不加锁的,可是 size()、containsValue() 遇到并发修改竞争时须要全表加锁。并发
ConcurrentHashMap:下面是ConcurrentHashMap中的数据成员以及构造函数源码:app
构造函数主要作了两件事:1)参数的校验;2)table初始化长度ssh
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * Mask value for indexing into segments. The upper bits of a * key's hash code are used to choose the segment. */ final int segmentMask; /** * Shift value for indexing within segments. */ final int segmentShift; /** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments; // 构造函数 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 找到第一个 >= concurrencyLevel 的 2次方数,做为后续 Segment数组大小 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 上文中经过比较concurrencyLevel而计算出的ssize做为数组大小 // 使用的是StoreStore内存屏障,而不是较慢的StoreLoad内存屏障(使用在volatile写操做上)。实现写入不会被JIT从新排序指令,性能虽然提高,但写结果不会被当即看到。 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } }
全部成员都是final修饰的,保证线程的安全发布。segmentMask 和 segmentShift 主要是为了定位段。
concurrencyLevel 参数表示指望并发的修改 ConcurrentHashMap 的线程数量,用于决定 Segment 的数量,具体方式是经过找到第一个 >= concurrentcyLevel 的 2的次方数做为 Segment 数组的大小。默认值为16,Segment数组大小也为16,若是设置 concurrentcyLevel = 100,那么 Segment 数组大小则为128。
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; }
table:链表数组,每一个数组元素是一个hash链表的头部。table是volatile的,使得每次都能读取到最新的table值而不须要同步。
count:Segment中元素的数量。每次修改操做作告终构上的改变,如添加/删除节点(更新节点的value不算结构上的改变),都要更新count值。
modCount:对table的结构进行修改的次数。
threshold:若Segment里的元素数量超过这个值,则就会对Segment进行扩容。
loadFactor:负载因子,threshold = capacity * threshold。
HashEntry:Segment中的类,表明 hash 链表中的一个节点,源码以下:
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
下面是CouncurrentHashMap的put操做的源码:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; // 对key求hash,并取模,而后找到对应Segment数组下标位置 if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); // 当 Segment 为空时,会建立 return s.put(key, hash, value, false); // put操做会委托给Segment的put }
下面是ensureSegment的代码:
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // UNSAFE进行volatile读Segment数组对应位置的值 Segment<K,V> proto = ss[0]; // use segment 0 as prototype 若是为空,则从Segment[0]复制Entry数组长度capacity,loadFactor int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck 再次volatile读Segment数组,若是为空,继续新建Segment对象 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用while循环和UNSAFE的CAS原子性替换Segment数组对应下标的元素。使用乐观锁的方式 // 当线程t1和t2都读取Segment[u]==null时,只有一个线程能经过CAS替换成功。假设t1替换成功了,下一次while循环t2能volatile读取到替换的值 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
put最终交给 Segment 的 put 方法,每一个Segment至关于一个HashMap,put操做就是要在HashMap中寻找对应的key是否存在,若是存在则更新value,如不存在则新建一个HashEntry。put须要加锁,使用了ReetrantLock的tryLock的非阻塞加锁方法。源码以下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // tryLock尝试加锁,若是加锁成功,node=null。不然自旋等待并寻找对应key的节点是否存在 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); // 获取锁后,对HashEntry的table数组取模,获取数组下标index的第一个节点 for (HashEntry<K,V> e = first;;) { // first节点后面的链表,向后遍历,寻找与key相同的节点 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; // 找到与key相同的节点,按onlyIfAbsent判断是否替换旧的value值 ++modCount; } break; } e = e.next; } else { // e==null,表明没有找到。新建HashEntry节点或使用scanAndLockForPut中建立的节点做为新的链表头节点 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 若是当前总节点个数 > threashold,则rehash扩容 // Segment rehash是获取锁以后进行的,是将数组长度扩大一倍,将旧的数组元素复制到新数组中(先后有获取锁和释放锁的语义,不须要考虑多线程问题) rehash(node); else setEntryAt(tab, index, node); // 设置新HashEntry节点到table对应的位置中 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); // 最后解锁 } return oldValue; }
有一个点:HashEntry<K,V>[] tab = table。一开始将table赋值给tab局部变量,能够减小直接引用table时带来的性能损失,由于table是一个volatile变量,不能进行优化,而赋值给tab普通变量后,能够实现编译、运行时的优化。
ConcurrentHashMap的get操做,源码以下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 对key求hash long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // key对应的Segment的数组下标 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && // 先找对应的Segment。volatile读语义保证内存的可见性 (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); // 再在Segment中找对应的HashEntry e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
get操做不须要加锁,经过volatile保证可见性,若是同时有put并发操做增长HashEntry,因为是在链表头部添加(头插法),不会对get形成影响。
计算ConcurrentHashMap的 size 是一个有趣的问题,由于在计算的时候,还会在并发的插入数据,可能致使计算出的size和实际的size有出入。
JDK1.7中前后采起了两个方案:
第一种方案:先使用不加锁的模式先尝试遍历两次ConcurrentHashMap计算size,若是两次遍历过程当中全部segment中的modCount的和是一致的,则能够认为整个计算过程当中的Map没有发生变化(添加或删除HashEntry节点),返回size。
第二种方案:若是第一种方案不符合(Map发生告终构变化),就给每一个Segment加锁,而后计算ConcurrentHashMap的size,解锁,最后返回。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // 若是for循环达到RETRIES_BEFORE_LOCK,则表示前民几回累计的modCount都不相等,其余线程并发修改ConcurrentHashMap致使数据结构一直在改变。 // 降级为依次对Segment进行加锁,此时其余线程改变数据结构就会阻塞等待 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { // 循环每一个Segment,累加Segment中的count和modCount Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) // 若是当前sum和以前计算的sum相等,即各Segment累加的modCount相等。就能够认为两次for循环间没有其余线程修改内部数据结构。直接返回size break; last = sum; // last等于最近一次计算的sum值 } } finally { if (retries > RETRIES_BEFORE_LOCK) { // 若是加锁了,最终解锁 for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
改进1:取消segment分段锁,直接使用 transient volatile Node<K,V>[] table; 来保存数组。采用table数组元素做为锁,从而实现对每一行数据进行加锁,进一步减小了冲突的几率。
改进2:将原先table数组 + 链表的数据结构,变动为table数组 + 链表 / 红黑树 的结构,同HashMap在JDK1.8的数据结构的改进。优化为红黑树的好处是:当一个链表长度过长时,查询某个节点的时间复杂度为O(N),而当链表长度超过8时,将链表转化为红黑树,查询节点的时间复杂度能够下降为O(logN),从而提高了性能。
改进3:并发控制使用synchronized和CAS,使用synchronized替换ReetrantLock。在ConcurrentHashMap中能够看到不少的 U.compareAndSwapXXX,经过CAS算法实现无锁化修改值的操做,下降了锁的性能消耗。CAS的思想是不断的比较当前内存中的变量值和所指定的变量值是否相等,若是相等,则接受指定的修改值;不然,拒绝操做,相似与乐观锁。
数据结构以下:
Node是ConcurrentHashMap存储结构的基本单元,用于存储key-value键值对,是一个链表,但只容许查找数据,不容许修改数据。源码以下:
/** * Key-value entry. This class is never exported out as a * user-mutable Map.Entry (i.e., one supporting setValue; see * MapEntry below), but can be used for read-only traversals used * in bulk tasks. Subclasses of Node with a negative hash field * are special, and contain null keys and values (but are never * exported). Otherwise, keys and vals are never null. */ static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; // value和next使用volatile来保证可见性和禁止重排序 volatile V val; volatile Node<K,V> next; // 指向下一个Node节点 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } // 不容许更新value public final V setValue(V value) { throw new UnsupportedOperationException(); } /** * Virtualized support for map.get(); overridden in subclasses. */ // 用于map.get()方法,子类重写 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
TreeBin是封装TreeNode的容器,提供了转化红黑树的一些条件和锁的控制,部分源码以下:
/** * TreeNodes used at the heads of bins. TreeBins do not hold user * keys or values, but instead point to list of TreeNodes and * their root. They also maintain a parasitic read-write lock * forcing writers (who hold bin lock) to wait for readers (who do * not) to complete before tree restructuring operations. */ static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; // 指向TreeNode的根节点 volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock ... }
put源码以下:
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 求key的hash值,两次hash,使hash值能均匀分布 int hash = spread(key.hashCode()); int binCount = 0; // 迭代Node[] table数组 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 1) 若是table为空,则初始化,ConcurrentHashMap构造方法未初始化Node数组,而是在put中实现,属于懒汉式初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 2) 若是table不为空,根据hash值计算获得key在table中的索引i,若是table[i]为null,使用CAS方式新建Node节点(table[i]为链表或红黑树的首节点) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin,不进行加锁 } // 3) 若是table[i]不为空,而且hash值为MOVED(-1),代表该链表正在进行transfer扩容操做,帮助扩容完成 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 4) 以上条件都不知足,也就是存在hash冲突。对链表或红黑树的头节点进行加锁操做(再也不是segment,进一步减小了线程冲突) synchronized (f) { if (tabAt(tab, i) == f) { // hash值>0,表示该节点是链表结构。只需向后遍历便可 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 若是在链表中找到值为key的节点,按onlyIfAbsent判断是否替换value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 没有找到值为key的节点,直接新建Node并加入链表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是首节点为红黑树结构,putTreeValue存放key-value对 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 若是链表节点数>8,则将链表结构转换为红黑树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // size加1,会检查当前size是否超过sizeCtl,若是是则触发transfer扩容操做 addCount(1L, binCount); return null; }
大体流程为:
下面看initTable的源码:
sizeCtl是控制ConcurrentHashMap的控制标示符,用来控制初始化和扩容操做的,其不一样的含义以下:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // sizeCtl < 0表示其余线程正在初始化,当前线程挂起 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS 将 sizeCtl置为 -1,表示正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化 table = tab = nt; sc = n - (n >>> 2); // 下次扩容的阀值,等于 0.75*n } } finally { sizeCtl = sc; } break; } } return tab; }
get的源码以下:
/** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * * <p>More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算两次hash int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 读取首节点Node元素 // 若是首节点Node.key相等,返回value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 树的find()来查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 在链表中遍历查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
在JDK1.8版本中,对于size的计算,在扩容和addCount()时已经在处理了。JDK1.7是在调用时才去计算。
为了帮助统计size,ConcurrentHashMap提供了baseCount和counterCells两个辅助变量和CounterCell辅助类:
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //ConcurrentHashMap中元素个数,但返回的不必定是当前Map的真实元素个数。基于CAS无锁更新 private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; // 部分元素变化的个数保存在此数组中
经过累计baseCount和 counterCells数组中的数量,便可获得元素的总个数。size源码以下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { // 遍历,累加全部counterCells.value for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
http://www.cnblogs.com/study-everyday/p/6430462.html
http://blog.csdn.net/u010723709/article/details/48007881
http://blog.csdn.net/jianghuxiaojin/article/details/52006118