在多线程环境下,使用HashMap
进行put
操做时存在丢失数据的状况,为了不这种bug的隐患,强烈建议使用ConcurrentHashMap
代替HashMap。
node
HashTable是一个线程安全的类,它使用synchronized来锁住整张Hash表来实现线程安全,即每次锁住整张表让线程独占,至关于全部线程进行读写时都去竞争一把锁,致使效率很是低下。ConcurrentHashMap能够作到读取数据不加锁,而且其内部的结构可让其在进行写操做的时候可以将锁的粒度保持地尽可能地小,容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的Hashtable,它们有本身的锁。只要多个修改操做发生在不一样的段上,它们就能够并发进行。面试
CouncurrentHashMap实现原理算法
ConcurrentHashMap 为了提升自己的并发能力,在内部采用了一个叫作 Segment 的结构,一个 Segment 其实就是一个类 Hash Table 的结构,Segment 内部维护了一个链表数组,咱们用下面这一幅图来看下 ConcurrentHashMap 的内部结构,从下面的结构咱们能够了解到,ConcurrentHashMap 定位一个元素的过程须要进行两次Hash操做,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表的头部,所以,这一种结构的带来的反作用是 Hash 的过程要比普通的 HashMap 要长,可是带来的好处是写操做的时候能够只对元素所在的 Segment 进行操做便可,不会影响到其余的 Segment,这样,在最理想的状况下,ConcurrentHashMap 能够最高同时支持 Segment 数量大小的写操做(恰好这些写操做都很是平均地分布在全部的 Segment上),因此,经过这一种结构,ConcurrentHashMap 的并发能力能够大大的提升。咱们用下面这一幅图来看下ConcurrentHashMap的内部结构详情图,以下:编程
不难看出,ConcurrentHashMap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不一样桶(bucket)中。数组
为何要用二次hash,主要缘由是为了构造分离锁,使得对于map的修改不会锁住整个容器,提升并发能力。固然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,因此,若是不是并发情形,不要使用concurrentHashmap。安全
JAVA7以前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操做时,将该Segment锁定,不容许对其进行非查询操做,而在JAVA8以后采用CAS无锁算法,这种乐观操做在完成前进行判断,若是符合预期结果才给予执行,对并发操做提供良好的优化.数据结构
让咱们先看JDK1.7的ConcurrentHashMap的原理分析多线程
如上所示,是由 Segment 数组、HashEntry 组成,和 HashMap 同样,仍然是数组加链表。并发
让咱们看看Segment里面的成员变量,源码以下:app
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile int count; //Segment中元素的数量 transient int modCount; //对table的大小形成影响的操做的数量(好比put或者remove操做) transient int threshold; //阈值,Segment里面元素的数量超过这个值那么就会对Segment进行扩容 final float loadFactor; //负载因子,用于肯定threshold transient volatile HashEntry<K,V>[] table; //链表数组,数组中的每个元素表明了一个链表的头部 }
接着再看看HashEntry中的组成,源码以下:
/** * ConcurrentHashMap列表Entry。注意,这不会做为用户可见的Map.Entry导出。 */ static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * 设置具备volatile写语义的next字段。 final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE;
//下一个HashEntry的偏移量 static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class;
//获取HashEntry next在内存中的偏移量 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
和 HashMap 很是相似,惟一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
原理上来讲:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样无论是 put 仍是 get 操做都须要作同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其余的 Segment。
接着让咱们继续看看JDK1.7中ConcurrentHashMap的成员变量和构造函数,源码以下:
// 默认初始容量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默认加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默认segment层级 static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; // segment最小容量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; // 一个segment最大容量 static final int MAX_SEGMENTS = 1 << 16; // 锁以前重试次数 static final int RETRIES_BEFORE_LOCK = 2; public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 找到两种大小的最匹配参数 int sshift = 0; // segment数组的长度是由concurrentLevel计算来的,segment数组的长度是2的N次方, // 默认concurrencyLevel = 16, 因此ssize在默认状况下也是16,此时 sshift = 4 // sshift至关于ssize从1向左移的次数 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 段偏移量,默认值状况下此时segmentShift = 28 this.segmentShift = 32 - sshift; // 散列算法的掩码,默认值状况下segmentMask = 15 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); // 建立ssize长度的Segment数组 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
其中,concurrencyLevel 一经指定,不可改变,后续若是ConcurrentHashMap的元素数量增长致使ConrruentHashMap须要扩容,ConcurrentHashMap不会增长Segment的数量,而只会增长Segment中链表数组的容量大小,这样的好处是扩容过程不须要对整个ConcurrentHashMap作rehash,而只须要对Segment里面的元素作一次rehash就能够了。
整个ConcurrentHashMap的初始化方法仍是很是简单的,先是根据concurrencyLevel来new出Segment,这里Segment的数量是不大于concurrencyLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操做来进行hash,加快hash的过程。接下来就是根据intialCapacity肯定Segment的容量的大小,每个Segment的容量大小也是2的指数,一样使为了加快hash的过程。
注意一下两个变量segmentShift和segmentMask,这两个变量在后面将会起到很大的做用,假设构造函数肯定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。
接下来让咱们看看JDK1.7中的ConcurrentHashMap的核心方法 put 方法和get 方法。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException();
//(1) int hash = hash(key);
//(2) int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
//(3) return s.put(key, hash, value, false); }
代码(1)计算key的hash值
代码(2)根据hash值,segmentShift,segmentMask定位到哪一个Segment。
代码(3)将键值对保存到对应的segment中。
能够看到首先是经过 key 定位到 Segment,以后在对应的 Segment 中进行具体的 put。 Segment 中进行具体的 put的源码以下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//(1) HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try {
//(2) HashEntry<K,V>[] tab = table;
//(3) int index = (tab.length - 1) & hash;
//(4) HashEntry<K,V> first = entryAt(tab, index);
//(5) for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; }
//(6) else {
if (node != null)
//(7) node.setNext(first); else //(8) node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;
//(9) if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //(10) setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally {
//(11) unlock(); } return oldValue; }
虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,可是并不能保证并发的原子性,因此 put 操做时仍然须要加锁处理。
代码(1)首先第一步的时候会尝试获取锁,若是获取失败确定就有其余线程存在竞争,则利用 scanAndLockForPut()
自旋获取锁。
代码(2)每个Segment对应一个HashEntry[ ]数组。
代码(3)计算对应HashEntry数组的下标 ,每一个segment中数组的长度都是2的N次方,因此这里通过运算以后,取的是hash的低几位数据。
代码(4)定位到HashEntry的某一个结点(对应链表的表头结点)。
代码(5)遍历链表。
代码(6)若是链表为空(即表头为空)
代码(7)将新节点插入到链表做为链表头。、
代码(8)根据key和value 建立结点并插入链表。
代码(9)判断元素个数是否超过了阈值或者segment中数组的长度超过了MAXIMUM_CAPACITY,若是知足条件则rehash扩容!
代码(10)不须要扩容时,将node放到数组(HashEntry[])中对应的位置
代码(11)最后释放锁。
总的来讲,put 的流程以下:
接着让咱们看看其扩容,rehash源码以下:
/** * 两倍于以前的容量 */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 扩大1倍(左移一位) int newCapacity = oldCapacity << 1; // 计算新的阈值 threshold = (int)(newCapacity * loadFactor); // 建立新的数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // mask int sizeMask = newCapacity - 1; // 遍历旧数组数据 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; // 对应一个链表的表头结点 if (e != null) { HashEntry<K,V> next = e.next; // 计算e对应的这条链表在新数组中对应的下标 int idx = e.hash & sizeMask; if (next == null) // 只有一个结点时直接放入(新的)数组中 newTable[idx] = e; else { // 链表有多个结点时: HashEntry<K,V> lastRun = e; // 就链表的表头结点作为新链表的尾结点 int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { // 旧数组中一个链表中的数据并不必定在新数组中属于同一个链表,因此这里须要每次都从新计算 int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // lastRun(和以后的元素)插入数组中。 newTable[lastIdx] = lastRun; // 从(旧链表)头结点向后遍历,遍历到最后一组不一样于前面hash值的组头。 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); // 拼接链表 } } } } // 将以前的旧数据都添加到新的结构中以后,才会插入新的结点(依旧是插入表头) int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
接着,再看看scanAndLockForPut()
自旋获取锁,源码以下:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // 定位节点时为负数
//(1) while (!tryLock()) { HashEntry<K,V> f; // 首先在下面从新检查 if (retries < 0) { if (e == null) { if (node == null) // 推测性地建立节点 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; }
//(2) else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // 若是Entry改变则从新遍历 retries = -1; } } return node; }
扫描包含给定key的节点,同时尝试获取锁,若是没有找到,则建立并返回一个。
返回时,保证锁被持有。
与大多数方法不一样,对方法equals的调用不进行筛选:因为遍历速度可有可无,咱们还能够帮助预热相关代码和访问。
代码(1)尝试自旋获取锁。
代码(2)若是重试的次数达到了 MAX_SCAN_RETRIES
则改成阻塞锁获取,保证能获取成功。
接下来,再让咱们看看JDK1.7中的get方法,源码以下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 首先计算出segment数组的下标 ((h >>> segmentShift) & segmentMask)) long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 根据下标找到segment // 而后(tab.length - 1) & h) 获得对应HashEntry数组的下标 // 遍历链表 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
能够看到get 逻辑没有前面的方法复杂:
只须要将 Key 经过 Hash 以后定位到具体的 Segment ,再经过一次 Hash 定位到具体的元素上。
因为 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,因此每次获取时都是最新值。
ConcurrentHashMap 的 get 方法是很是高效的,由于整个过程都不须要加锁。
接着再看看remove方法,源码以下:
public V remove(Object key) { // 计算hash值 int hash = hash(key); // 根据hash值找到对应的segment Segment<K,V> s = segmentForHash(hash); // 调用Segment.remove 函数 return s == null ? null : s.remove(key, hash, null); } public boolean remove(Object key, Object value) { int hash = hash(key); Segment<K,V> s; return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null; }
Segment.remove函数的源码以下:
/** * Remove; match on key only if value null, else match both. */ final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; // 计算HashEntry数组下标 int index = (tab.length - 1) & hash; // 找到头结点 HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 找到对应节点 V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) // 当pred为空时,表示要移除的是链表的表头节点,从新设置链表 setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; // 记录旧value值 oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
1.7 已经解决了并发问题,而且能支持 N 个 Segment 这么屡次数的并发,但依然存在 HashMap 在 1.7 版本中的问题。那么是什么问题呢?
很明显那就是查询遍历链表效率过低。
所以 1.8 作了一些数据结构上的调整。,在 JAVA8 中它摒弃了 Segment(锁段)的概念,而是启用了一种全新的方式实现,利用 CAS 算法。底层依然由“数组”+链表+红黑树的方式思想,可是为了作到并发,又增长了不少辅助的类,例如 TreeBin、Traverser等对象内部类。
如何让多线程之间,对象的状态对于各线程的“可视性”是顺序一致的:ConcurrentHashMap 使用了 happens-before 规则来实现。 happens-before规则(摘取自 JAVA 并发编程):
假设代码有两条语句,代码顺序是语句1先于语句2执行;那么只要语句之间不存在依赖关系,那么打乱它们的顺序对最终的结果没有影响的话,那么真正交给CPU去执行时,他们的执行顺序能够是先执行语句2而后语句1。
首先来看下底层的组成结构(下图是百度来的,懒得画了):
能够看到JDK1.8ConcurrentHashMap 和JDK1.8的HashMap是很类似的。其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized
来保证并发安全性。
//键值输入。 此类永远不会做为用户可变的Map.Entry导出(即,一个支持setValue;请参阅下面的MapEntry),
//但能够用于批量任务中使用的只读遍历。 具备负哈希字段的节点的子类是特殊的,而且包含空键和值(但永远不会导出)。 不然,键和val永远不会为空。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * 对map.get()的虚拟化支持; 在子类中重写。 */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
也将 1.7 中存放数据的 HashEntry 改成 Node,但做用都是相同的。
其中的 val next
都用了 volatile 修饰,保证了可见性。
接着再看看put方法的源码,源码以下:
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) {
//(1) if (key == null || value == null) throw new NullPointerException();
//(2) int hash = spread(key.hashCode()); int binCount = 0;
//(3) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh;
//(4) if (tab == null || (n = tab.length) == 0) tab = initTable();
//(5) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
//(6) else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null;
//(7) synchronized (f) { if (tabAt(tab, i) == f) {
//(8) if (fh >= 0) { binCount = 1;
//(9) for (Node<K,V> e = f;; ++binCount) { K ek;
//(10) if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e;
//(11)若是遍历到了最后一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
//(12) else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) {
//(13) if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } }
//代码(14) addCount(1L, binCount); return null; }
代码(1)若为空 抛异常
代码(2)计算hash值
代码(3)
代码(4)判断是否须要进行初始化。
代码(5)f
即为当前 key 定位出的 Node,若是为空表示当前位置能够写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
代码(6)若是当前位置的 hashcode == MOVED == -1
,则须要进行扩容。
代码(7)若是都不知足,则利用 synchronized 锁写入数据。结点上锁 这里的结点能够理解为hash值相同组成的链表的头结点
代码(8)fh〉0 说明这个节点是一个链表的节点 不是树的节点.
代码(9)在这里遍历链表全部的结点
代码(10)若是hash值和key值相同 则修改对应结点的value值
代码(11)若是遍历到了最后一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部
代码(12)若是这个节点是树节点,就按照树的方式插入值
代码(13)若是链表长度已经达到临界值8 就须要把链表转换为树结构。若是数量大于 TREEIFY_THRESHOLD
则要转换为红黑树。
代码(14)将当前ConcurrentHashMap的元素数量+1
接着我咱们在看看JDK1.8中ConcurrentHashMap的get方法源码,源码以下:
// GET方法(JAVA8) public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //根据hash值肯定节点位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //若是搜索到的节点key与传入的key相同且不为null,直接返回这个节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //若是eh<0 说明这个节点在树上 直接寻找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //不然遍历链表 找到对应的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
接着再看看JDK1.8中ConcurrentHashMap的remove方法源码,源码以下:
// REMOVE OR REPLACE方法(JAVA8) final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 数组不为空,长度不为0,指定hash码值为0 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; // 是一个 forwardNode else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; // 循环寻找 for (Node<K,V> e = f, pred = null;;) { K ek; // equal 相同 取出 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; // value为null或value和查到的值相等 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } // 如果树 红黑树高效查找/删除 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
能够看出 JDK1.8 和 JDK1.7 对 ConcurrentHashMap 的实现改变,笔者更喜欢 CAS 无锁机制,若是只是看我写以上代码注释明显不足以了解 JAVA8 的 ConcurrentHashMap 的实现,我也仅仅提供源码阅读的思路,其中 cas、volatile、final 等注意已经给解释,因此若是你们真的感兴趣仍是写程序,打断点,一步步看看这个代码的实现.
1.8 在 1.7 的数据结构上作了大的改动,采用红黑树以后能够保证查询效率(O(logn)
),甚至取消了 ReentrantLock 改成了 synchronized,这样能够看出在新版的 JDK 中对 synchronized 优化是很到位的。
相信到这里为止,理解上面的内容,遇到面试,问题都迎刃而解,下面是网上找的面试题,以下:
(1)你知道 HashMap 的工做原理吗?你知道 HashMap 的 get() 方法的工做原理吗?
HashMap 是基于 hashing 的原理,咱们使用 put(key, value) 存储对象到 HashMap 中,使用 get(key) 从 HashMap 中获取对象。当咱们给 put() 方法传递键和值时,咱们先对键调用 hashCode() 方法,返回的 hashCode 用于找到 bucket 位置来储存 Entry 对象。
(2)你知道 ConcurrentHashMap 的工做原理吗?你知道 ConcurrentHashMap 在 JAVA8 和 JAVA7 对比有哪些不一样呢?
ConcurrentHashMap 为了提升自己的并发能力,在内部采用了一个叫作 Segment 的结构,一个 Segment 其实就是一个类 Hash Table 的结构,Segment 内部维护了一个链表数组,咱们用下面这一幅图来看下 ConcurrentHashMap 的内部结构,从下面的结构咱们能够了解到,ConcurrentHashMap 定位一个元素的过程须要进行两次Hash操做,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表的头部,所以,这一种结构的带来的反作用是 Hash 的过程要比普通的 HashMap 要长,可是带来的好处是写操做的时候能够只对元素所在的 Segment 进行操做便可,不会影响到其余的 Segment,这样,在最理想的状况下,ConcurrentHashMap 能够最高同时支持 Segment 数量大小的写操做(恰好这些写操做都很是平均地分布在全部的 Segment上),因此,经过这一种结构,ConcurrentHashMap 的并发能力能够大大的提升。
JAVA7以前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操做时,将该Segment锁定,不容许对其进行非查询操做,而在JAVA8以后采用CAS无锁算法,这种乐观操做在完成前进行判断,若是符合预期结果才给予执行,对并发操做提供良好的优化
(3)当两个对象的hashcode相同会发生什么?
由于hashcode相同,因此它们的bucket位置相同,‘碰撞’会发生。由于Map使用LinkedList存储对象,这个Entry(包含有键值对的Map.Entry对象)会存储在LinkedList中。(当向 Map 中添加 key-value 对,由其 key 的 hashCode() 返回值决定该 key-value 对(就是 Entry 对象)的存储位置。当两个 Entry 对象的 key 的 hashCode() 返回值相同时,将由 key 经过 eqauls() 比较值决定是采用覆盖行为(返回 true),仍是产生 Entry 链(返回 false)),此时若你能讲解JDK1.8红黑树引入,面试官或许会另眼相看。
(4)若是两个键的 hashcode 相同,你如何获取值对象?
当咱们调用get()方法,HashMap 会使用键对象的 hashcode 找到 bucket 位置,而后获取值对象。若是有两个值对象储存在同一个 bucket,将会遍历 LinkedList 直到找到值对象。找到 bucket 位置以后,会调用 keys.equals() 方法去找到 LinkedList 中正确的节点,最终找到要找的值对象。(当程序经过 key 取出对应 value 时,系统只要先计算出该 key 的 hashCode() 返回值,在根据该 hashCode 返回值找出该 key 在 table 数组中的索引,而后取出该索引处的 Entry,最后返回该 key 对应的 value 便可)。
(5)若是HashMap的大小超过了负载因子(load factor)定义的容量,怎么办?
当一个map填满了75%的bucket时候,和其它集合类(如ArrayList等)同样,将会建立原来HashMap大小的两倍的bucket数组,来从新调整map的大小,并将原来的对象放入新的bucket数组中。这个过程叫做rehashing,由于它调用hash方法找到新的bucket位置。
(6)你了解从新调整HashMap大小存在什么问题吗?
当从新调整HashMap大小的时候,确实存在条件竞争,由于若是两个线程都发现HashMap须要从新调整大小了,它们会同时试着调整大小。在调整大小的过程当中,存储在LinkedList中的元素的次序会反过来,由于移动到新的bucket位置的时候,HashMap并不会将元素放在LinkedList的尾部,而是放在头部,这是为了不尾部遍历(tail traversing)。若是条件竞争发生了,那么就死循环了。这个时候,你能够质问面试官,为何这么奇怪,要在多线程的环境下使用HashMap呢?
(7)请问ConcurrentHashMap中变量使用final和volatile修饰有什么用呢?其中链表是final的next属性,那么发生删除某个元素,如何实现的?
使用final来实现不变模式(immutable),他是多线程安全里最简单的一种保障方式。由于你拿他没有办法,想改变它也没有机会。不变模式主要经过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不须要同步就能自由地被访问和共享。
使用volatile来保证某个变量内存的改变对其余线程即时可见,在配合CAS能够实现不加锁对并发操做的支持
remove执行的开始就将table赋给一个局部变量tab,将tab依次复制出来,最后直到该删除位置,将指针指向下一个变量。
(8)描述一下ConcurrentHashMap中remove操做,有什么须要注意的?
须要注意以下几点。第一,当要删除的结点存在时,删除的最后一步操做要将count的值减一。这必须是最后一步操做,不然读取操做可能看不到以前对段所作的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是由于table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写作任何优化,直接屡次访问非volatile实例变量没有多大影响,编译器会作相应优化。
(9)HashTable与ConcurrentHashMap有什么区别,描述锁分段技术。
HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由,是由于全部访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效的提升并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。这里“按顺序”是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。