曾经在 [高并发Java 五] JDK并发包1 中提到过ConcurrentHashMap,只是简单的提到了下ConcurrentHashMap的优势,以及大概的实现原理。html
而本文则重点介绍ConcurrentHashMap实现的细节。node
HashMap就不介绍了,具体请查看JDK7与JDK8中HashMap的实现算法
HashTable是一个线程安全的类,它使用synchronized来锁住整张Hash表来实现线程安全,即每次锁住整张表让线程独占。ConcurrentHashMap容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的Hashtable,它们有本身的锁。只要多个修改操做发生在不一样的段上,它们就能够并发进行。数组
有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。这里“按顺序”是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。安全
ConcurrentHashMap使用分段锁技术,将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问,可以实现真正的并发访问。以下图是ConcurrentHashMap的内部结构图:多线程
从图中能够看到,ConcurrentHashMap内部分为不少个Segment,每个Segment拥有一把锁,而后每一个Segment(继承ReentrantLock)并发
static final class Segment<K,V> extends ReentrantLock implements Serializable
Segment继承了ReentrantLock,代表每一个segment均可以当作一个锁。(ReentrantLock前文已经提到,不了解的话就把当作synchronized的替代者吧)这样对每一个segment中的数据须要同步操做的话都是使用每一个segment容器对象自身的锁来实现。只有对全局须要改变时锁定的是全部的segment。app
Segment下面包含不少个HashEntry列表数组。对于一个key,须要通过三次(为何要hash三次下文会详细讲解)hash操做,才能最终定位这个元素的位置,这三次hash分别为:ssh
ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图能够看出之间的关系async
/** * The segments, each of which is a specialized hash table */ final Segment<K,V>[] segments;
不变(Immutable)和易变(Volatile)ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。若是使用传统的技术,如HashMap中的实现,若是容许能够在hash链的中间添加或删除元素,读操做不加锁将获得不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,其结构以下所示:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; volatile HashEntry<K,V> next; }
在JDK 1.6中,HashEntry中的next指针也定义为final,而且每次插入将新添加节点做为链的头节点(同HashMap实现),并且每次删除一个节点时,会将删除节点以前的全部节点 拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除之后 有两条链存在,于是能够保证即便在同一条链中,有一个线程在删除,而另外一个线程在遍历,它们都能工做良好,由于遍历的线程能继续使用原有的链。于是这种实现是一种更加细粒度的happens-before关系,即若是遍历线程在删除线程结束后开始,则它能看到删除后的变化,若是它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。若是这不符合你的需求,仍是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()作的包装。
而HashMap中的Entry只有key是final的
static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; int hash;
不变 模式(immutable)是多线程安全里最简单的一种保障方式。由于你拿他没有办法,想改变它也没有机会。
不变模式主要经过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不须要同步就能自由地被访问和共享。
先看看ConcurrentHashMap的初始化作了哪些事情,构造函数的源码以下:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。
初始化的一些动做:
put操做的源码以下:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
操做步骤以下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
put操做是要加锁的。
get操做的源码以下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
操做步骤为:
值得注意的是,get操做是不须要加锁的(若是value为null,会调用readValueUnderLock,只有这个步骤会加锁),经过前面提到的volatile和final来确保数据安全。
size操做与put和get操做最大的区别在于,size操做须要遍历全部的Segment才能算出整个Map的大小,而put和get都只关心一个Segment。假设咱们当前遍历的Segment为SA,那么在遍历SA过程当中其余的Segment好比SB可能会被修改,因而这一次运算出来的size值可能并非Map当前的真正大小。因此一个比较简单的办法就是计算Map大小的时候全部的Segment都Lock住,不能更新(包含put,remove等等)数据,计算完以后再Unlock。这是普通人可以想到的方案,可是牛逼的做者还有一个更好的Idea:先给3次机会,不lock全部的Segment,遍历全部Segment,累加各个Segment的大小获得整个Map的大小,若是某相邻的两次计算获取的全部Segment的更新的次数(每一个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,经过这个值能够获得每一个Segment的更新操做的次数)是同样的,说明计算过程当中没有更新操做,则直接返回这个值。若是这三次不加锁的计算过程当中Map的更新次数有变化,则以后的计算先对全部的Segment加锁,再遍历全部Segment计算Map大小,最后再解锁全部Segment。源代码以下:
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
举个例子:
一个Map有4个Segment,标记为S1,S2,S3,S4,如今咱们要获取Map的size。计算过程是这样的:第一次计算,不对S1,S2,S3,S4加锁,遍历全部的Segment,假设每一个Segment的大小分别为1,2,3,4,更新操做次数分别为:2,2,3,1,则此次计算能够获得Map的总大小为1+2+3+4=10,总共更新操做次数为2+2+3+1=8;第二次计算,不对S1,S2,S3,S4加锁,遍历全部Segment,假设此次每一个Segment的大小变成了2,2,3,4,更新次数分别为3,2,3,1,由于两次计算获得的Map更新次数不一致(第一次是8,第二次是9)则能够判定这段时间Map数据被更新,则此时应该再试一次;第三次计算,不对S1,S2,S3,S4加锁,遍历全部Segment,假设每一个Segment的更新操做次数仍是为3,2,3,1,则由于第二次计算和第三次计算获得的Map的更新操做的次数是一致的,就能说明第二次计算和第三次计算这段时间内Map数据没有被更新,此时能够直接返回第三次计算获得的Map的大小。最坏的状况:第三次计算获得的数据更新次数和第二次也不同,则只能先对全部Segment加锁再计算最后解锁。
containsValue操做采用了和size操做同样的想法:
public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; for (int j = 0; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; }
看看hash的源代码:
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
源码中的注释是这样的:
Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.
这里用到了Wang/Jenkins hash算法的变种,主要的目的是为了减小哈希冲突,使元素可以均匀的分布在不一样的Segment上,从而提升容器的存取效率。假如哈希的质量差到极点,那么全部的元素都在一个Segment中,不只存取元素缓慢,分段锁也会失去意义。
举个简单的例子:
System.out.println(Integer.parseInt("0001111", 2) & 15); System.out.println(Integer.parseInt("0011111", 2) & 15); System.out.println(Integer.parseInt("0111111", 2) & 15); System.out.println(Integer.parseInt("1111111", 2) & 15);
这些数字获得的hash值都是同样的,全是15,因此若是不进行第一次预hash,发生冲突的概率仍是很大的,可是若是咱们先把上例中的二进制数字使用hash()函数先进行一次预hash,获得的结果是这样的:
0100|0111|0110|0111|1101|1010|0100|1110 1111|0111|0100|0011|0000|0001|1011|1000 0111|0111|0110|1001|0100|0110|0011|1110 1000|0011|0000|0000|1100|1000|0001|1010
上面这个例子引用自: InfoQ
能够看到每一位的数据都散开了,而且ConcurrentHashMap中是使用预hash值的高位参与运算的。好比以前说的先将hash值向右按位移动28位,再与15作&运算,获得的结果都别为:4,15,7,8,没有冲突!
1. http://www.cnblogs.com/ITtangtang/p/3948786.html
2. http://qifuguang.me/2015/09/10/[Java%E5%B9%B6%E5%8F%91%E5%8C%85%E5%AD%A6%E4%B9%A0%E5%85%AB]%E6%B7%B1%E5%BA%A6%E5%89%96%E6%9E%90ConcurrentHashMap/
3. http://www.cnblogs.com/yydcdut/p/3959815.html