ConcurrentHashMap之实现细节(转)

ConcurrentHashMap是Java 5中支持高并发、高吞吐量的线程安全HashMap实现。在这以前我对ConcurrentHashMap只有一些肤浅的理解,仅知道它采用了多个锁,大概也足够了。可是在通过一次惨痛的面试经历以后,我以为必须深刻研究它的实现。面试中被问到读是否要加锁,由于读写会发生冲突,我说必需要加锁,我和面试官也所以发生了冲突,结果可想而知。仍是闲话少说,经过仔细阅读源代码,如今总算理解ConcurrentHashMap实现机制了,其实现之精巧,使人叹服,与你们共享之。java

 

 

实现原理 
node

 

锁分离 (Lock Stripping)c++

 

ConcurrentHashMap容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的hash table,它们有本身的锁。只要多个修改操做发生在不一样的段上,它们就能够并发进行。面试

 

有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。这里“按顺序”是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。不变性是多线程编程占有很重要的地位,下面还要谈到。算法

 

Java代码   收藏代码
  1. /** 
  2.  * The segments, each of which is a specialized hash table 
  3.  */  
  4. final Segment<K,V>[] segments;  

 

 

不变(Immutable)和易变(Volatile)编程

 

ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。若是使用传统的技术,如HashMap中的实现,若是容许能够在hash链的中间添加或删除元素,读操做不加锁将获得不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,其结构以下所示:数组

 

Java代码   收藏代码
  1. static final class HashEntry<K,V> {  
  2.     final K key;  
  3.     final int hash;  
  4.     volatile V value;  
  5.     final HashEntry<K,V> next;  
  6. }  

能够看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,由于这须要修改next引用值,全部的节点的修改只能从头部开始。对于put操做,能够一概添加到Hash链的头部。可是对于remove操做,可能须要从中间删除一个节点,这就须要将要删除节点的前面全部节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操做时还会详述。为了确保读操做可以看到最新的值,将value设置成volatile,这避免了加锁。安全

 


其它数据结构

 

为了加快定位段以及段中hash槽的速度,每一个段hash槽的的个数都是2^n,这使得经过位运算就能够定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪一个段中。可是咱们也不要忘记《算法导论》给咱们的教训:hash槽的的个数不该该是2^n,这可能致使hash槽分配不均,这须要对hash值从新再hash一次。(这段彷佛有点多余了 )多线程

 

这是从新hash的算法,还比较复杂,我也懒得去理解了。

Java代码   收藏代码
  1. private static int hash(int h) {  
  2.     // Spread bits to regularize both segment and index locations,  
  3.     // using variant of single-word Wang/Jenkins hash.  
  4.     h += (h <<  15) ^ 0xffffcd7d;  
  5.     h ^= (h >>> 10);  
  6.     h += (h <<   3);  
  7.     h ^= (h >>>  6);  
  8.     h += (h <<   2) + (h << 14);  
  9.     return h ^ (h >>> 16);  
  10. }  

 

这是定位段的方法:

Java代码   收藏代码
  1. final Segment<K,V> segmentFor(int hash) {  
  2.     return segments[(hash >>> segmentShift) & segmentMask];  
  3. }  

 

 

 

数据结构

 

关于Hash表的基础数据结构,这里不想作过多的探讨。Hash表的一个很重要方面就是如何解决hash冲突,ConcurrentHashMap和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不一样的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的数据成员:

 

Java代码   收藏代码
  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>  
  2.         implements ConcurrentMap<K, V>, Serializable {  
  3.     /** 
  4.      * Mask value for indexing into segments. The upper bits of a 
  5.      * key's hash code are used to choose the segment. 
  6.      */  
  7.     final int segmentMask;  
  8.   
  9.     /** 
  10.      * Shift value for indexing within segments. 
  11.      */  
  12.     final int segmentShift;  
  13.   
  14.     /** 
  15.      * The segments, each of which is a specialized hash table 
  16.      */  
  17.     final Segment<K,V>[] segments;  
  18. }  

 

全部的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。

 

每一个Segment至关于一个子Hash表,它的数据成员以下:

 

Java代码   收藏代码
  1.     static final class Segment<K,V> extends ReentrantLock implements Serializable {  
  2. private static final long serialVersionUID = 2249069246763182397L;  
  3.         /** 
  4.          * The number of elements in this segment's region. 
  5.          */  
  6.         transient volatile int count;  
  7.   
  8.         /** 
  9.          * Number of updates that alter the size of the table. This is 
  10.          * used during bulk-read methods to make sure they see a 
  11.          * consistent snapshot: If modCounts change during a traversal 
  12.          * of segments computing size or checking containsValue, then 
  13.          * we might have an inconsistent view of state so (usually) 
  14.          * must retry. 
  15.          */  
  16.         transient int modCount;  
  17.   
  18.         /** 
  19.          * The table is rehashed when its size exceeds this threshold. 
  20.          * (The value of this field is always <tt>(int)(capacity * 
  21.          * loadFactor)</tt>.) 
  22.          */  
  23.         transient int threshold;  
  24.   
  25.         /** 
  26.          * The per-segment table. 
  27.          */  
  28.         transient volatile HashEntry<K,V>[] table;  
  29.   
  30.         /** 
  31.          * The load factor for the hash table.  Even though this value 
  32.          * is same for all segments, it is replicated to avoid needing 
  33.          * links to outer object. 
  34.          * @serial 
  35.          */  
  36.         final float loadFactor;  
  37. }  

count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操做,以保证读取操做可以读取到几乎最新的修改。协调方式是这样的,每次修改操做作告终构上的改变,如增长/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操做开始都要读取count的值。这利用了Java 5中对volatile语义的加强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程当中某个段是否发生改变,在讲述跨段操做时会还会详述。threashold用来表示须要进行rehash的界限值。table数组存储段中节点,每一个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得可以读取到最新的table值而不须要同步。loadFactor表示负载因子。

 

 

实现细节

 

修改操做

 

先来看下删除操做remove(key)。

Java代码   收藏代码
  1. public V remove(Object key) {  
  2.  hash = hash(key.hashCode());  
  3.     return segmentFor(hash).remove(key, hash, null);  
  4. }  

整个操做是先定位到段,而后委托给段的remove操做。当多个删除操做并发进行时,只要它们所在的段不相同,它们就能够同时进行。下面是Segment的remove方法实现:

Java代码   收藏代码
  1. V remove(Object key, int hash, Object value) {  
  2.     lock();  
  3.     try {  
  4.         int c = count - 1;  
  5.         HashEntry<K,V>[] tab = table;  
  6.         int index = hash & (tab.length - 1);  
  7.         HashEntry<K,V> first = tab[index];  
  8.         HashEntry<K,V> e = first;  
  9.         while (e != null && (e.hash != hash || !key.equals(e.key)))  
  10.             e = e.next;  
  11.   
  12.         V oldValue = null;  
  13.         if (e != null) {  
  14.             V v = e.value;  
  15.             if (value == null || value.equals(v)) {  
  16.                 oldValue = v;  
  17.                 // All entries following removed node can stay  
  18.                 // in list, but all preceding ones need to be  
  19.                 // cloned.  
  20.                 ++modCount;  
  21.                 HashEntry<K,V> newFirst = e.next;  
  22.                 for (HashEntry<K,V> p = first; p != e; p = p.next)  
  23.                     newFirst = new HashEntry<K,V>(p.key, p.hash,  
  24.                                                   newFirst, p.value);  
  25.                 tab[index] = newFirst;  
  26.                 count = c; // write-volatile  
  27.             }  
  28.         }  
  29.         return oldValue;  
  30.     } finally {  
  31.         unlock();  
  32.     }  
  33. }  

 整个操做是在持有段锁的状况下执行的,空白行以前的行主要是定位到要删除的节点e。接下来,若是不存在这个节点就直接返回null,不然就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不须要复制,它们能够重用。下面是个示意图,我直接从这个网站 上复制的(画这样的图实在是太麻烦了,若是哪位有好的画图工具,能够推荐一下)。

 

 

删除元素以前:

 

a hash chain before an element is removed

 

 

删除元素3以后:

the chain with element 3 removed

 

第二个图其实有点问题,复制的结点中应该是值为2的结点在前面,值为1的结点在后面,也就是恰好和原来结点顺序相反,还好这不影响咱们的讨论。

 

整个remove实现并不复杂,可是须要注意以下几点。第一,当要删除的结点存在时,删除的最后一步操做要将count的值减一。这必须是最后一步操做,不然读取操做可能看不到以前对段所作的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是由于table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写作任何优化,直接屡次访问非volatile实例变量没有多大影响,编译器会作相应优化。

 

 

接下来看put操做,一样地put操做也是委托给段的put方法。下面是段的put方法:

Java代码   收藏代码
  1. V put(K key, int hash, V value, boolean onlyIfAbsent) {  
  2.     lock();  
  3.     try {  
  4.         int c = count;  
  5.         if (c++ > threshold) // ensure capacity  
  6.             rehash();  
  7.         HashEntry<K,V>[] tab = table;  
  8.         int index = hash & (tab.length - 1);  
  9.         HashEntry<K,V> first = tab[index];  
  10.         HashEntry<K,V> e = first;  
  11.         while (e != null && (e.hash != hash || !key.equals(e.key)))  
  12.             e = e.next;  
  13.   
  14.         V oldValue;  
  15.         if (e != null) {  
  16.             oldValue = e.value;  
  17.             if (!onlyIfAbsent)  
  18.                 e.value = value;  
  19.         }  
  20.         else {  
  21.             oldValue = null;  
  22.             ++modCount;  
  23.             tab[index] = new HashEntry<K,V>(key, hash, first, value);  
  24.             count = c; // write-volatile  
  25.         }  
  26.         return oldValue;  
  27.     } finally {  
  28.         unlock();  
  29.     }  
  30. }  

该方法也是在持有段锁的状况下执行的,首先判断是否须要rehash,须要就先rehash。接着是找是否存在一样一个key的结点,若是存在就直接替换这个结点的值。不然建立一个新的结点并添加到hash链的头部,这时必定要修改modCount和count的值,一样修改count的值必定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n,这里就不介绍了。

 

修改操做还有putAll和replace。putAll就是屡次调用put方法,没什么好说的。replace甚至不用作结构上的更改,实现要比put和delete要简单得多,理解了put和delete,理解replace就不在话下了,这里也不介绍了。

 

 

获取操做

 

首先看下get操做,一样ConcurrentHashMap的get操做是直接委托给Segment的get方法,直接看Segment的get方法:

 

Java代码   收藏代码
  1. V get(Object key, int hash) {  
  2.     if (count != 0) { // read-volatile  
  3.         HashEntry<K,V> e = getFirst(hash);  
  4.         while (e != null) {  
  5.             if (e.hash == hash && key.equals(e.key)) {  
  6.                 V v = e.value;  
  7.                 if (v != null)  
  8.                     return v;  
  9.                 return readValueUnderLock(e); // recheck  
  10.             }  
  11.             e = e.next;  
  12.         }  
  13.     }  
  14.     return null;  
  15. }  

 

get操做不须要锁。第一步是访问count变量,这是一个volatile变量,因为全部的修改操做在进行结构修改时都会在最后一步写count变量,经过这种机制保证get操做可以获得几乎最新的结构更新。对于非结构更新,也就是结点值的改变,因为HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,若是没有找到,直接访回null。对hash链进行遍历不须要加锁的缘由在于链指针next是final的。可是头指针却不是final的,这是经过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过期的头结点,例如,当执行get方法时,刚执行完getFirst(hash)以后,另外一个线程执行了删除操做并更新头结点,这就致使get方法中返回的头结点不是最新的。这是能够容许,经过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要获得最新的数据,只有采用彻底的同步。

 

最后,若是找到了所求的结点,判断它的值若是非空就直接返回,不然在有锁的状态下再读一次。这彷佛有些费解,理论上结点的值不可能为空,这是由于put的时候就进行了判断,若是为空就要抛NullPointerException。空值的惟一源头就是HashEntry中的默认值,由于HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操做的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被从新排序,这就可能致使结点的值为空。这种状况应当很罕见,一旦发生这种状况,ConcurrentHashMap采起的方式是在持有锁的状况下再读一遍,这可以保证读到最新的值,而且必定不会为空值。

 

Java代码   收藏代码
  1. V readValueUnderLock(HashEntry<K,V> e) {  
  2.     lock();  
  3.     try {  
  4.         return e.value;  
  5.     } finally {  
  6.         unlock();  
  7.     }  
  8. }  

 

 

另外一个操做是containsKey,这个实现就要简单得多了,由于它不须要读取值:

Java代码   收藏代码
  1. boolean containsKey(Object key, int hash) {  
  2.     if (count != 0) { // read-volatile  
  3.         HashEntry<K,V> e = getFirst(hash);  
  4.         while (e != null) {  
  5.             if (e.hash == hash && key.equals(e.key))  
  6.                 return true;  
  7.             e = e.next;  
  8.         }  
  9.     }  
  10.     return false;  
  11. }  

 

 

跨段操做 

 

有些操做须要涉及到多个段,好比说size(), containsValaue()。先来看下size()方法:

 

Java代码   收藏代码
  1. public int size() {  
  2.     final Segment<K,V>[] segments = this.segments;  
  3.     long sum = 0;  
  4.     long check = 0;  
  5.     int[] mc = new int[segments.length];  
  6.     // Try a few times to get accurate count. On failure due to  
  7.     // continuous async changes in table, resort to locking.  
  8.     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {  
  9.         check = 0;  
  10.         sum = 0;  
  11.         int mcsum = 0;  
  12.         for (int i = 0; i < segments.length; ++i) {  
  13.             sum += segments[i].count;  
  14.             mcsum += mc[i] = segments[i].modCount;  
  15.         }  
  16.         if (mcsum != 0) {  
  17.             for (int i = 0; i < segments.length; ++i) {  
  18.                 check += segments[i].count;  
  19.                 if (mc[i] != segments[i].modCount) {  
  20.                     check = -1; // force retry  
  21.                     break;  
  22.                 }  
  23.             }  
  24.         }  
  25.         if (check == sum)  
  26.             break;  
  27.     }  
  28.     if (check != sum) { // Resort to locking all segments  
  29.         sum = 0;  
  30.         for (int i = 0; i < segments.length; ++i)  
  31.             segments[i].lock();  
  32.         for (int i = 0; i < segments.length; ++i)  
  33.             sum += segments[i].count;  
  34.         for (int i = 0; i < segments.length; ++i)  
  35.             segments[i].unlock();  
  36.     }  
  37.     if (sum > Integer.MAX_VALUE)  
  38.         return Integer.MAX_VALUE;  
  39.     else  
  40.         return (int)sum;  
  41. }  

 

size方法主要思路是先在没有锁的状况下对全部段大小求和,若是不能成功(这是由于遍历过程当中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,若是还不成功就在持有全部段锁的状况下再对全部段大小求和。在没有锁的状况下主要是利用Segment中的modCount进行检测,在遍历过程当中保存每一个Segment的modCount,遍历完成以后再检测每一个Segment的modCount有没有改变,若是有改变表示有其它线程正在对Segment进行结构性并发更新,须要从新计算。

 

 

其实这种方式是存在问题的,在第一个内层for循环中,在这两条语句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之间,其它线程可能正在对Segment进行结构性的修改,致使segments[i].count和segments[i].modCount读取的数据并不一致。这可能使size()方法返回任什么时候候都未曾存在的大小,很奇怪javadoc竟然没有明确标出这一点,多是由于这个时间窗口过小了吧。size()的实现还有一点须要注意,必需要先segments[i].count,才能segments[i].modCount,这是由于segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能获得几乎最新的值(前面我已经说了为何只是“几乎”了)。这点在containsValue方法中获得了淋漓尽致的展示:

 

 

Java代码   收藏代码
  1. public boolean containsValue(Object value) {  
  2.     if (value == null)  
  3.         throw new NullPointerException();  
  4.   
  5.     // See explanation of modCount use above  
  6.   
  7.     final Segment<K,V>[] segments = this.segments;  
  8.     int[] mc = new int[segments.length];  
  9.   
  10.     // Try a few times without locking  
  11.     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {  
  12.         int sum = 0;  
  13.         int mcsum = 0;  
  14.         for (int i = 0; i < segments.length; ++i) {  
  15.             int c = segments[i].count;  
  16.             mcsum += mc[i] = segments[i].modCount;  
  17.             if (segments[i].containsValue(value))  
  18.                 return true;  
  19.         }  
  20.         boolean cleanSweep = true;  
  21.         if (mcsum != 0) {  
  22.             for (int i = 0; i < segments.length; ++i) {  
  23.                 int c = segments[i].count;  
  24.                 if (mc[i] != segments[i].modCount) {  
  25.                     cleanSweep = false;  
  26.                     break;  
  27.                 }  
  28.             }  
  29.         }  
  30.         if (cleanSweep)  
  31.             return false;  
  32.     }  
  33.     // Resort to locking all segments  
  34.     for (int i = 0; i < segments.length; ++i)  
  35.         segments[i].lock();  
  36.     boolean found = false;  
  37.     try {  
  38.         for (int i = 0; i < segments.length; ++i) {  
  39.             if (segments[i].containsValue(value)) {  
  40.                 found = true;  
  41.                 break;  
  42.             }  
  43.         }  
  44.     } finally {  
  45.         for (int i = 0; i < segments.length; ++i)  
  46.             segments[i].unlock();  
  47.     }  
  48.     return found;  
  49. }  

 

一样注意内层的第一个for循环,里面有语句int c = segments[i].count; 可是c却历来没有被使用过,即便如此,编译器也不能作优化将这条语句去掉,由于存在对volatile变量count的读取,这条语句存在的惟一目的就是保证segments[i].modCount读取到几乎最新的值。关于containsValue方法的其它部分就不分析了,它和size方法差很少。

 

 

跨段方法中还有一个isEmpty()方法,其实现比size()方法还要简单,也不介绍了。最后简单地介绍下迭代方法,如keySet(), values(), entrySet()方法,这些方法都返回相应的迭代器,全部迭代器都继承于Hash_Iterator类(提交时竟然提醒我不能包含sh It,只得加了下划线),里实现了主要的方法。其结构是:

 

Java代码   收藏代码
  1. abstract class Hash_Iterator{  
  2.     int nextSegmentIndex;  
  3.     int nextTableIndex;  
  4.     HashEntry<K,V>[] currentTable;  
  5.     HashEntry<K, V> nextEntry;  
  6.     HashEntry<K, V> lastReturned;  
  7. }  

 

 nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex对应段中中hash链的索引,currentTable是nextSegmentIndex对应段的table。调用next方法时主要是调用了advance方法:

 

 

Java代码   收藏代码
  1. final void advance() {  
  2.     if (nextEntry != null && (nextEntry = nextEntry.next) != null)  
  3.         return;  
  4.   
  5.     while (nextTableIndex >= 0) {  
  6.         if ( (nextEntry = currentTable[nextTableIndex--]) != null)  
  7.             return;  
  8.     }  
  9.   
  10.     while (nextSegmentIndex >= 0) {  
  11.         Segment<K,V> seg = segments[nextSegmentIndex--];  
  12.         if (seg.count != 0) {  
  13.             currentTable = seg.table;  
  14.             for (int j = currentTable.length - 1; j >= 0; --j) {  
  15.                 if ( (nextEntry = currentTable[j]) != null) {  
  16.                     nextTableIndex = j - 1;  
  17.                     return;  
  18.                 }  
  19.             }  
  20.         }  
  21.     }  
  22. }  

不想再多介绍了,惟一须要注意的是跳到下一个段时,必定要先读取下一个段的count变量。 

 

这种迭代方式的主要效果是不会抛出ConcurrentModificationException。一旦获取到下一个段的table,也就意味着这个段的头结点在迭代过程当中就肯定了,在迭代过程当中就不能反映对这个段节点并发的删除和添加,对于节点的更新是可以反映的,由于节点的值是一个volatile变量。

 


结束语

 

ConcurrentHashMap是一个支持高并发的高性能的HashMap实现,它支持彻底并发的读以及必定程度并发的写。ConcurrentHashMap的实现也是很精巧,充分利用了最新的JMM规范,值得学习,却不值得模仿。最后因为本人水平有限,对大师的做品不免有误解,若是存在,还望大牛们不吝指出。

相关文章
相关标签/搜索