8.并发容器ConcurrentHashMap#put方法解析

jdk1.7.0_79node

  HashMap能够说是每一个Java程序员用的最多的数据结构之一了,无处不见它的身影。关于HashMap,一般也能说出它不是线程安全的。这篇文章要提到的是在多线程并发环境下的HashMap——ConcurrentHashMap,显然它必然是线程安全的,一样咱们不可避免的要讨论散列表,以及它是如何实现线程安全的,它的效率又是怎样的,由于对于映射容器还有一个Hashtable也是线程安全的但它彷佛只出如今笔试、面试题里,在现实编码中它已经基本被遗弃。程序员

  关于HashMap的线程不安全,在多线程并发环境下它所带来的影响毫不仅仅是出现脏数据等数据不一致的状况,严重的是它有可能带来程序死循环,这可能有点难以想象,但确实在不久前的项目里同事有遇到了CPU100%满负荷运行,分析结果是在多线程环境下HashMap致使程序死循环。对于Hashtable,查看其源码可知,Hashtable保证线程安全的方式就是利用synchronized关键字,这样会致使效率低下,但对于ConcurrentHashMap则采用了不一样的线程安全保证方式——分段锁。它不像Hashtable那样将整个table锁住而是将数组元素分段加锁,若是线程1访问的元素在分段segment1,而线程2访问的元素在分段segment2,则它们互不影响能够同时进行操做。若是合理的进行分段就是其关键问题。面试

  ConcurrentHashMap和HashMap的结果基本一致,一样也是Entry做为存放数据的对象,另一个就是上面提到的分段锁——Segment。它继承自ReentrantLock(关于ReentrantLock,可参考《5.Lock接口及其实现ReentrantLock》),故它具备ReentrantLock一切特性——可重入,独占等。算法

  ConcurrentHashMap的结构图以下所示:数组

  能够看到相比较于HashMapConcurrentHashMap在Entry数组之上是Segment,这个就是咱们上面提到的分段锁,合理的肯定分段数就能更好的提升并发效率,咱们来看ConcurrentHashMap是如何肯定分段数的。 安全

  ConcurrentHashMap的初始化时经过其构造函数public ConcurrentHashMap(int initialCapacity, float loadFactorint concurrencyLevel)完成的,若在不指定各参数的状况下,初始容量initialCapacity=DAFAULT_INITIAL_CAPACITY=16,负载因子loadFactor=DEFAULT_LOAD_FACTOR=0.75f,并发等级concurrencyLevel=DEFAULT_CONCURRENCY_LEVEL=16,前二者和HashMap相同。至于负载因子表示一个散列表的空间的使用程度initialCapacity(总容量) * loadFactor(负载因子) = 数据量,有此公式可知,若负载因子越大,则散列表的装填程度越高,也就是能容纳更多的元素,但这样元素就多,链表就大,此时索引效率就会下降。若负载因子越小,则相反,索引效率就会高,换来的代价就是浪费的空间越多并发等级它表示估计最多有多少个线程来共同修改这个Map,稍后能够看到它和segment数组相关,segment数组的长度就是经过concurrencyLevel计算得出。数据结构

 1 //以默认参数为例initalCapacity=16,loadFactor=0.75,concurrencyLevel=16 
 2 public ConcurrentHashMap(int initalCapacity, float loadFactor, int concurrencyLevel) { 
 3   if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 
 4     throw new IllegalArgumentException(); 
 5   if (concurrencyLevel > MAX_SEGMENTS) 
 6     concurrencyLevel = MAX_SEGMENTS; 
 7   int sshift = 0; 
 8   int ssize = 1;//segment数组长度 
 9   while (ssize < concurrencyLevel) { 
10     ++sshift; 
11     ssize <= 1; 
12   }//通过ssize左移4位后,ssize=16,ssift=4 
13 /*segmentShift用于参与散列运算的位数,segmentMask是散列运算的掩码,这里有关的散列函数运算和HashMap有相似之处*/ 
14   this.segmentShift = 32 – ssift;//段偏移量segmentShift=28 
15   this.segmentMask = ssize – 1;//段掩码segmentMask=15(1111) 
16   if (initialCapacity > MAXIMUM_CAPACITY) 
17     initialCapacity = MAXIMUM_CAPACITY; 
18   int c = initialCapacity / ssize;//c = 1 
19   if (c * ssize < initialCapacity) 
20     ++c; 
21   int cap = MIN_SEGMENT_TABLE_CAPACITY;//MIN_SEGMENT_TABLE_CAPACITY=2 
22   while (cap < c)//cap = 2, c = 1,false 
23       cap <<= 1;//cap是segment里HashEntry数组的长度,最小为2 
24 /*建立segments数组和segment[0]*/ 
25   Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[]) new HashEntry[cap]);//参数意为:负载因子=1,数据容量=(int)(2 * 0.75)=1,总容量=2,故每一个Segment的HashEntry总容量为2,实际数据容量为1 
26   Segment<K,V> ss = (Segment<K,V>[])new Segment[ssize];//segments数组大小为16 
27   UNSAFE.putOrderedObject(ss, SBASE, s0); 
28   this.segments = ss; 
29 } 

  以上就是整个初始化过程,主要是初始化segments的长度大小以及经过负载因子肯定每一个Segment的容量大小。肯定好Segment事后,接下来的重点就是如何准肯定位Segment。定位Segment的方法就是经过散列函数来定位,先经过hash方法对元素进行二次散列,这个算法较为复杂,其目的只有一个——减小散列冲突,使元素能均匀分布在不一样的Segment上,提升容器的存取效率。 多线程

  咱们经过最直观最经常使用的put方法来观察ConcurrentHashMap是如何经过key值计算hash值在定位到Segment的 并发

 1 //ConcurrentHashMap#put 
 2 public V put(K key, V value) { 
 3   Segment<K,V> s; 
 4   if (value == null) 
 5     throw new NullPointerException(); 
 6   int hash = hash(key);//根据散列函数,计算出key值的散列值 
 7   int j = (hash >>> segmentShift) & segmentMask;//这个操做就是定位Segment的数组下标,jdk1.7以前是segmentFor返回Segment,1.7以后直接就取消了这个方法,直接计算数组下标,而后经过偏移量底层操做获取Segment 
 8   if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck 
 9       (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment 
10     s = ensureSegment(j);//经过便宜量定位不到就调用ensureSegment方法定位Segment 
11   return s.put(key, hash, value, false); 
12 } 

  Segment.put方法就是将键、值构造为Entry节点加入到对应的Segment段里,若是段中已经有元素(即表示两个key键值的hash值重复)则将最新加入的放到链表的头),整个过程必然是加锁安全的。 ssh

 

  不妨继续深刻Segment.put方法 :

 1 //Segment#put 
 2 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 
 3   HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//非阻塞获取锁,获取成功node=null,失败 
 4   V oldValue; 
 5   try { 
 6     HashEntry<K,V>[] tab = table;//Segment对应的HashEntry数组长度 
 7     int index = (tab.length - 1) & hash; 
 8     HashEntry<K,V> first = entryAt(tab, index);//获取HashEntry数组的第一个值 
 9     for (HashEntry<K,V> e = first;;) { 
10       if (e != null) {//HashEntry数组已经存在值 
11         K k; 
12         if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {//key值和hash值都相等,则直接替换旧值 
13           oldValue = e.value; 
14           if (!onlyIfAbsent) { 
15             e.value = value; 
16             ++modCount; 
17           } 
18           break; 
19         } 
20         e = e.next;//不是同一个值则继续遍历,直到找到相等的key值或者为null的HashEntry数组元素 
21       } 
22       else {//HashEntry数组中的某个位置元素为null 
23         if (node != null) 
24           node.setNext(first);//将新加入节点(key)的next引用指向HashEntry数组第一个元素 
25         else//已经获取到了Segment锁 
26           node = new HashEntry<K,V>(hash, key, value, first) 
27         int c = count + 1; 
28         if (c > threshold && tab.lenth < MAXIUM_CAPACITY)//插入前先判断是否扩容,ConcurrentHashMap扩容与HashMap不一样,ConcurrentHashMap只扩Segment的容量,HashMap则是整个扩容 
29           rehash(node); 
30         else 
31           setEntryAt(tab, index, node);//设置为头节点 
32         ++modCount;//总容量 
33         count = c; 
34         oldValue = null; 
35         break; 
36       } 
37      } 
38   } finally { 
39     unlock(); 
40   } 
41   return oldValue; 
42 } 

  上面大体就是ConcurrentHashMap加入一个元素的过程,须要明白的就是ConcurrentHashMap分段锁的概念。在JDK1.6中定位Segment较为简单,直接计算出Segment数组下标后就返回具体的Segment,而JDK1.7则经过偏移量来计算,算出为空时,还有一次检查获取Segment,猜想是1.7使用底层native是为了提升效率,JDK1.8的ConcurrentHashMap又有不一样,暂未深刻研究,它的数据结果彷佛变成了红黑树。 

  有关ConcurrentHashMap的get方法再也不分析,过程总结为一句话:根据key值计算出hash值,根据hash值计算出对应的Segment,再在Segment下的HashEntry链表遍历查找。

相关文章
相关标签/搜索