在写本篇以前,JDK10已经出来了,好像吐槽我连8都没看完呢。奔跑不停的程序猿,努力的前进吧!数组
在多线程环境下,使用HashMap进行put操做时存在丢失数据的状况,为了不这种bug的隐患,强烈建议使用ConcurrentHashMap代替HashMap,为了对ConcurrentHashMap有更深刻的了解,本文将对ConcurrentHashMap1.7和1.8的不一样实现进行分析。安全
jdk1.7中采用Segment + HashEntry的方式进行实现,结构以下:数据结构
ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每一个Segment中HashEntry数组的大小cap,并初始化Segment数组的第一个元素;其中ssize大小为2的幂次方,默认为16,cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量initialCapacity进行计算,计算过程以下:多线程
if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
其中Segment在实现上继承了ReentrantLock,这样就自带了锁的功能。并发
当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,若是相应位置的Segment还未初始化,则经过CAS进行赋值,接着执行Segment对象的put方法经过加锁机制插入数据,实现以下:dom
场景:线程A和线程B同时执行相同Segment对象的put方法this
由于ConcurrentHashMap是能够并发插入数据的,因此在准确计算元素时存在必定的难度,通常的思路是统计每一个Segment对象中的元素个数,而后进行累加,可是这种方式计算出来的结果并不同的准确的,由于在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了以下方式:线程
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
先采用不加锁的方式,连续计算元素的个数,最多计算3次:设计
1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构以下:code
只有在执行第一次put方法时才会调用initTable()初始化Node数组,实现以下:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
当执行put方法插入数据时,根据key的hash值,在Node数组中找到相应的位置,实现以下:
一、若是相应位置的Node还未初始化,则经过CAS插入相应的数据;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
二、若是相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,若是该节点的hash不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
三、若是该节点是TreeBin类型的节点,说明是红黑树结构,则经过putTreeVal方法往红黑树中插入节点;
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
四、若是binCount不为0,说明put操做对数据产生了影响,若是当前链表的个数达到8个,则经过treeifyBin方法转化为红黑树,若是oldVal不为空,说明是一次更新操做,没有对元素个数产生影响,则直接返回旧值;
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
五、若是插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseCount;
1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会经过addCount()方法更新baseCount,实现以下:
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }
一、初始化时counterCells为空,在并发量很高时,若是存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;
二、若是CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,经过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现以下:
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
三、若是经过CAS设置cellsBusy字段失败的话,则继续尝试经过CAS修改baseCount字段,若是修改baseCount字段成功的话,就退出循环,不然继续循环插入CounterCell对象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
因此在1.8中的size实现比1.7简单多,由于元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现以下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
经过累加baseCount和CounterCell数组中的数量,便可获得元素的总个数;