一、实现原理介绍java
ConcurrentHashMap采用分段锁的思想,将整个table分红不一样的segment。每一个Segment配有一个锁,这样每一个段都能并行的作增删改查等功能。算法
二、ConcurrentHashMap在代码实现上有哪些技巧编程
我的理解ConcurrentHashMap是比较好的线程安全容器不只仅是由于它的段锁。还在于他代码中包含了许多好的编程思想。数组
首先:ConcurrentHashMap在并发访问时采用了CAS技术,因此性能较好。安全
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } }
其次:ConcurrentHashMap的字段MAX_SCAN_RETRIES,用来限制尝试得到锁的次数。这样能够避免在尝试锁的时候发生修改而一直进行获取锁的操做。并发
第三:在isEmpty()方法的时候巧妙的使用了modCount字段判断是否为空,并无添加任何所信息。app
public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; //遍历各个段信息 for (int j = 0; j < segments.length; ++j) { //得到第j个段 Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; //modCount相加 sum += seg.modCount; } } //若是不为0,继续判断一次每一个段是否为空 if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } //sum加一次、减一次 两次事后看看sum是否为0L。若是不是说明在判断过程当中有修改,而且数据非空 if (sum != 0L) return false; } //不然为空 return true; }
三、Unsafe对象字段的内存定位。less
在ConcurrentHashMap源码中有不少Unsafe方法得到物理地址的代码:从hash值定位段,从段内地址到定位HashEntry地址都须要Unsafe的方法。性能
Unsafe实例可以作两件事情,this
第1、经过Unsafe类能够分配内存,能够释放内存。allocateMemory、reallocateMemory、freeMemory分别用于分配内存,扩充内存和释放内存
第2、能够定位对象某字段的内存位置,也能够修改对象的字段值,即便它是私有的。
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } 这些SSHIFT、SBASE就是相对地址偏移量。他们的得到以下: static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; //arrayBaseOffset方法是一个本地方法,能够获取数组第一个元素的偏移地址 //TBASE 第一个HashEntry的地址偏移量 TBASE = UNSAFE.arrayBaseOffset(tc); //SBASE 第一个 Segment的地址偏移量 SBASE = UNSAFE.arrayBaseOffset(sc); //arrayIndexScale是数组中元素的增量地址 ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); //long objectFieldOffset (Field field); 获得 filed在对象中的偏移 HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); //Segemnt增量按位数据非零的位数 SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); //HashEntry增量按位数据非零的值。 TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); }
有了上面的说明,咱们来看看最终定位一个key值对应的Entry的方法是如何实现的。
段定位:
private Segment<K,V> segmentForHash(int h) { //hash值向右无符号必定segmentShift个单位得到了h的高32-segmentShift位,而后将该值左移动SSHIFT位+段首地址,得到段偏移地址 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE //根据段偏移地址和段首信息定位具体的Segment。 return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); } Entry定位: static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { //将hash值右移动 TSHIFT个位置得到HashEntry偏移量 +HashEntry基本地址 return (tab == null) ? null :(HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
四、为何说ConcurrentHashMap存在弱一致的问题
jdk1.7版本ConcurrentHashMap已经不存在弱一致性问题了。那么什么是弱一致性问题,jdk1.7又是如何解决的呢?
所谓的弱一致性问题,就是在某些状况下,放置进去的数据不能准确的访问到该数据。下文的put、get方法在(2)、(8)和(3)、(9)这两个地方因为执行顺序的问题会产生某些已经放置进容器的数据没有被正确的获取到。这是由于ConcurrentHashMap的段里面put方法加锁执行、而get方法并无加锁执行,有可能和put方法在指令执行顺序上即知足happen-before原则又产生了数据的不一致性问题。
如图所示,在上面的执行路径下是没法正确得到放置的数据的。
jdk1.7以后Segement里面的get方法已经被拿掉,接下来,重点来解读下jdk1.7 ConcurrentHashMap的放置和获取数据的过程。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; //得到该key值的再hash值,该算法散列的更好,减小碰撞 int h = hash(key); //得到该hash值对应的段偏移量 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //得到具体段地址 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //得到该段内 该hash值对应的具体HashEntry的地址 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //若是hash值和key值和须要查询的key一致,则返回该值 if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } //不然返回null,未查询到信息 return null; } @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; //若是放置null值,则跑错 if (value == null) throw new NullPointerException(); //根据key值计算出hash值 int hash = hash(key); //得到该hash值的段偏移量 int j = (hash >>> segmentShift) & segmentMask; //根据偏移量计算出具体的短地址 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //首先判断该段数据数量是否超过了段能承受的阈值,超过则扩容将旧信息hash映射过去,没有超过则返回就segment s = ensureSegment(j); //将数据放置到该段的对应hashEntry中去。或者放置table上的HashEntry或者放置到table挂载的hashEntry中去。 return s.put(key, hash, value, false); }
参考文献:
http://ifeve.com/concurrenthashmap-weakly-consistent/
http://blog.csdn.net/aesop_wubo/article/details/7537278