concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每一个分段,ConcurrentHashmap采用final和内存可见修饰符volatile关键字(内存当即可见:Java 的内存模型能够保证:某个写线程对 value 域的写入立刻能够被后续的某个读线程“看”到。注:并不能保证对volatile变量状态有依赖的其余操做的原子性)html
借用某博客对concurrentHashmap对结构图:java
不难看出,concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不一样桶中。web
为何要用二次hash,主要缘由是为了构造分离锁,使得对于map的修改不会锁住整个容器,提升并发能力。固然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,因此,若是不是并发情形,不要使用concurrentHashmap。算法
该数据结构中,最核心的部分是两个内部类,HashEntry和Segment数组
concurrentHashmap维护一个segment数组,将元素分红若干段(第一次hash)
数据结构
/** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments;
segments的每个segment维护一个链表数组并发
代码:ssh
再来看看构造方法高并发
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
代码28行,一旦指定了concurrencyLevel(segments数组大小)便不能改变,这样,一旦threshold超标,rehash真不会影响segments数组,这样,在大并发的状况下,只会影响某一个segment的rehash而其余segment不会受到影响性能
(put方法都要上锁)
与hashmap相似,concurrentHashmap也采用了链表做为每一个hash桶中的元素,不过concurrentHashmap又有些不一样
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
HashEntry的key,hash采用final,能够避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,能够避免使用同步形成的并发性能灾难,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe类提供的原子操做,直接调用底层操做系统,提升性能(这块我也不是特别清楚)
1.6
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
1.6的jdk采用了乐观锁的方式处理了get方法,在get的时候put方法正在new对象,而此时value并未赋值,这时判断为空则加锁访问
1.7
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
1.7并无判断value=null的状况,不知为什么
跟同事沟经过,不管是1.6仍是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提高,但同时也带来数据的弱一致性,若是你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中同样二次加锁获取)
http://ifeve.com/concurrenthashmap-weakly-consistent/
这篇文章能够很好地解释弱一致性问题
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
对于put,concurrentHashmap采用自旋锁的方式,不一样于1.6的直接获取锁
注:我的理解,这里采用自旋锁可能做者是以为在分段锁的状态下,并发的可能原本就比较小,而且锁占用时间又并非特别长,所以自旋锁能够减少线程唤醒和切换的开销
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
concurrentHashMap采用自己hashcode的同时,采用Wang/Jenkins算法对每位都作了处理,使得发生hash冲突的可能性大大减少(不然效率会不好)
而对于concurrentHashMap,segments的大小在初始时肯定,此后不变,而元素所在segments桶序列由hash的高位决定
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
segmentShift为(32-segments大小的二进制长度)
concurrentHashmap主要是为并发设计,与Collections的包装不一样,他不是采用全同步的方式,而是采用非锁get方式,经过数据的弱一致性带来性能上的大幅提高,同时采用分段锁的策略,提升并发能力
参考:
http://www.jb51.net/article/49699.htm
http://my.oschina.net/chihz/blog/58035
http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/