HashMap 是非线程安全的,put操做可能致使死循环。其解决方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。这两种方案都是对读写加锁,独占式,效率比较低下。java
HashMap 在并发执行put操做时会引发死循环,由于多线程致使 HashMap 的 Entry 链表造成环形数据结构,则 Entry 的 next 节点永远不为空,会死循环获取 Entry。算法
HashTable 使用 synchronized 来保证线程安全,可是在线程竞争激烈的状况下,效率很是低。其缘由是全部访问该容器的线程都必须竞争一把锁。编程
针对上述问题,ConcurrentHashMap 使用锁分段技术,容器里有多把锁,每一把锁用于其中一部分数据,当多线程访问不一样数据段的数据时,线程间就不会存在锁的竞争。数组
在JDK 1.7中,ConcurrentHashMap 采用了 Segment 数组和 HashEntry 数组的方式进行实现。其中 Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。而 HashEntry 则是用于存储键值对的数据。结构以下图所示:安全
一个 Segment 包含一个 HashEntry 数组,每一个 HashEntry 是一个链表结构的元素。每一个 Segment 守护一个 HashEntry 数组的元素。数据结构
初始化时,计算出 Segment 数组的大小 ssize 和每一个 Segment 中 HashEntry 数组的大小 cap,并初始化 Segment 数组的第一个元素。其中 ssize 为2的幂次方,默认为16,cap 大小也是2的幂次方,最小值为2。最终结果根据初始化容量 initialCapacity 计算。多线程
//计算segment数组长度
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//初始化segmentShift和SegmentMask
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//计算每一个Segment中HashEntry数组大小cap
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 初始化segment数组和segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
复制代码
首先,初始化了 segments 数组,其长度 ssize 是经过 concurrencyLevel 计算得出的。须要保证ssize的长度是2的N次方,segments 数组的长度最大是65536。并发
而后初始化了 segmentShift 和 segmentMask 这两个全局变量,用于定位 segment 的散列算法。segmentShift 是用于散列运算的位数, segmentMask 是散列运算的掩码。ssh
以后根据 initialCapaicity 和 loadfactor 这两个参数来计算每一个 Segment 中 HashEntry 数组的大小 cap。函数
最后根据以上肯定的参数,初始化了 segment 数组以及 segment[0]。
整个 get 操做过程都不须要加锁,所以很是高效。首先将 key 通过 Hash 以后定位到 Segment,而后再经过一个 Hash 定位到具体元素。不须要加锁是由于 get 方法将须要使用的共享变量都定义成 volatile 类型,所以能在线程之间保持可见性,在多线程同时读时能保证不会读到过时的值。
put 方法须要对共享变量进行写入操做,为了线程安全,必须加锁。 put 方法首先定位到 Segment,而后在 Segment 里进行插入操做。插入操做首先要判断是否须要对 Segment 里的 HashEntry 数组进行扩容,而后定位添加元素的位置,将其放入到 HashEntry 数组。
Segment 的扩容比 HashMap 更恰当,由于后者是插入元素后判断是否已经到达容量,若是到达了就扩容,可是可能扩容后没有插入,进行了无效的扩容。Segment 在扩容时,首先建立一个原来容量两倍的数组,而后将原数组里的元素进行再散列后插入到新的数组。同时为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而是只对某个 segment 进行扩容。
每一个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将全部的 count 累加便可。可是 volatile 修饰的变量却不能保证多线程的原子性,全部直接累加很容易出现并发问题。可是若是在调用 size 方法时锁住其他的操做,效率也很低。
ConcurrentHashMap 的作法是先尝试两次经过不加锁的方式进行计算,若是两次结果相同,说明结果正确。若是计算结果不一样,则给每一个 Segment 加锁,进行统计。
在JDK 1.8中,改变了分段锁的思路,采用了 Node数组 + CAS + Synchronized 来保证并发安全。底层仍然采用了数组+链表+红黑树的存储结构。
在JDK 1.8中,使用 Node 替换 HashEntry,二者做用相同。在 Node 中, val 和 next 两个变量都是 volatile 修饰的,保证了可见性。
使用 table 变量存放 Node 节点数组,默认为 null, 默认大小为16,且每次扩容时大小老是2的幂次方。在扩容时,使用 nextTable 存放新生成的数据,数组为 table 的两倍。
ForwardingNode 是一个特殊的 Node 节点,hash 值为-1,存储了 nextTable 的引用。只有table 发生扩容时,其发生做用,做为占位符放在 table 中表示当前节点为null或者已经被移动。
在 HashMap 中,其核心的数据结构是链表。而在 ConcurrentHashMap 中,若是链表的数据过长会转换为红黑树来处理。经过将链表的节点包装成 TreeNode 放在 TreeBin 中,而后经由 TreeBin 完成红黑树的转换。
TreeBin 不负责键值对的包装,用于在链表转换为红黑树时包装 TreeNode 节点,用来构建红黑树。
在构造函数中,ConcurrentHashMap 仅仅设置了一些参数。当首次插入元素时,才经过 initTable() 方法进行了初始化。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//有其余线程在初始化,挂起当前线程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//得到了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//下次扩容的大小,至关于0.75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
该方法的关键为sizeCtl。
sizeCtl:控制标识符,用来控制table初始化和扩容操做的,在不一样的地方有不一样的用途,其值也不一样,所表明的含义也不一样:
sizeCtl 默认为0。若是该值小于0,表示有其余线程在初始化,须要暂停该线程。若是该线程获取了初始化的权利,先将其设置为-1。最后将 sizeCtl 设置为 0.75*n,表示扩容的阈值。
put操做的核心思想依然是根据 hash 计算节点插入在 table 的位置,若是为空,直接插入,不然插入到链表或树中。
首先计算hash值,而后进入循环中遍历table,尝试插入。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//详细代码接下来分别讲述
}
复制代码
首先判断 table 是否为空,若是为空或者是 null,则先进行初始化操做。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
复制代码
若是已经初始化过,且插入的位置没有节点,直接插入该节点。使用CAS尝试插入该节点。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
复制代码
若是有线程在扩容,先帮助扩容。
//当前位置的hashcode等于-1,须要扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
复制代码
若是都不知足,使用 synchronized 锁写入数据。根据数据结构的不一样,若是是链表则插入尾部;若是是树节点,使用树的插入操做。
else {
V oldVal = null;
//对该节点进行加锁处理(hash值相同的链表的头节点)
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 表示为链表,将该节点插入到链表尾部
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//hash 和 key 都同样,替换value
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
//putIfAbsent()
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//链表尾部 直接插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,按照树的插入操做进行插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
复制代码
在for循环的最后,判断链表的长度是否须要链表转换为树结构。
if (binCount != 0) {
// 若是链表长度已经达到临界值8,把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
复制代码
最后,若是是更新节点,前边已经返回了 oldVal,不然就是插入新的节点。还须要使用 addCount() 方法,为 size 加一。
总结步骤以下:
参考资料