ConcurrentHashMap源码阅读java
稍微有点粗糙的图片node
ConcurrentHashMap
是属于Java并发包,能够称之为是线程安全的HashMap.
(如下有简称CHM
)数组
总所周知,HashMap
有良好的存取性能,但并不支持并发环境,HashTable
支持并发环境,而在存取方法上直接加Synchronized
的方式会使性能明显降低,尽管Synchronize
在JDK1.6
以后进行了大量的优化,但依旧不是最优选.安全
在HashMap
中数组+链表/红黑树的结构基础上,区别于HashTable
中的对整个数组对象上锁,ConcurrentHashMap
使用为数组中的每一个桶上锁的机制,不知道还能不能成为分段锁一个桶就是一个段(JDK1.7
采用的是segment,1.8的代码中虽然保留了但很是简短仅为兼容)多线程
仅表明我的意见,有错误欢迎留言,谢谢!并发
1. transient volatile Node<K,V>[] table;
复制代码
实际存储数据的Node数组,volatile
保证可见性。dom
2. private transient volatile Node<K,V>[] nextTable;
复制代码
下一个使用的数组,仅在扩容更新的时候不为空,扩容时会慢慢把数据移动到这个数组.ide
该数组做为扩容的过分,类外没法访问函数
3. private transient volatile long baseCount;
复制代码
在没有发生争用时的元素统计工具
4. private transient volatile int transferIndex;
复制代码
扩容索引值,表示已经分配给扩容线程的table数组索引位置,主要用来协调多个线程间迁移任务的并发安全.
private transient volatile int sizeCtl;
复制代码
重要程度堪比AQS
的state
,是一个在多线程间共享的竞态变量,用于维护各类状态,保存各种信息.
sizeCtl > 0
时可分为两种状况:
sizeCtl
表示初始容量.sizeCtl = -1
: 表示正在初始化或者扩容阶段.
sizeCtl < -1
: sizeCtl
承担起了扩容时标识符(高16位)和参与线程数目(低16位)的存储
在addCount
和helpTransfer
的方法代码中,若是须要帮助扩容,则会CAS替换为sizeCtl+1
在完成当前扩容内容,且没有再分配的区域时,线程会退出扩容,此时会CAS替换为sizeCtl-1
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/** * Virtualized support for map.get(); overridden in subclasses. * 为map.get()提供虚拟化支持;在子类中覆盖. */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
// 里面就是普通的链表循环,直到拿到相应的值
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
复制代码
ConcurrentHashMap
,HashMap
中Node
的差异
val
和next
使用volatile
关键字修饰,确保多线程之间的可见性.hashCode
方法略有不一样,由于ConcurrentHashMap
不支持key
或value
为NULL值,因此直接使用key.hashCode() ^ val.hashCode()
跳过了为空判断.find()
方法用来在特定时间段帮忙获取节点后的元素.通常做为桶的头节点调用,用来查询桶中元素.转发节点
该类仅仅存活在扩容阶段,做为一个标记节点放在桶的首位,而且指向是nextTable
(扩容的中间数组)
从构造函数可知,ForwardingNode
的hash
为-1,其余为空,是个完彻底全的辅助类.
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
// 构造函数中默认以MOVED:-1为Hash,其它为空
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
// 帮助扩容时的元素查找
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
复制代码
find()
方法实在扩容期间帮助get
方法获取桶中元素.public V put(K key, V value) {
return putVal(key, value, false);
}
复制代码
/** * 方法参数: * 1. key,value 天然不用说就是k/v的两个值 * 2. onlyIfAbsent 若为true,则仅仅在值为空时覆盖 * 返回值: * 返回旧值,如果新增就为null. */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// CHM不支持NULL值的铁证.
if (key == null || value == null) throw new NullPointerException();
// 得到key的Hash,spread能够称之为扰动函数
int hash = spread(key.hashCode());
int binCount = 0;
// 无限循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 在tab为空时负责初始化Table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 使用`(n-1)&hash`肯定了元素的下标位置,获取对应节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 若是对应位置节点为空,直接以当前信息为桶的头节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 若是获取的桶的头结点的`Hash`为`MOVED`,表示该节点是`ForwardingNode`
// 也就表示数组正在进行扩容
else if ((fh = f.hash) == MOVED)
// 帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 上锁保证原子性,volatile仅能保证可见性
// f为key获取到的节点元素,以此为锁对象
synchronized (f) {
// f在上文就是根据`tabAt(tab,i)`获取的
// 此处是再次获取验证有没有被修改
if (tabAt(tab, i) == f) {
// 与else.if比较,得知
// fh >= 0表示当前节点为链表节点,即当前桶结构为链表 ???
if (fh >= 0) {
// 链表中的元素个数统计
binCount = 1;
// 循环遍历整个桶
// 跳出循环的两种状况:
// 1. 找到相同的值,binCount此时表示遍历的节点个数
// 2. 遍历到末尾,binCount就表示桶中的节点个数
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 源码中大量运用了表达式的短路特性,来展现判断的优先级
// 1. 若hash不相等,则直接跳过判断
// 2. hash相等以后,若key的地址相同,则直接进入if
// 3. 地址不一样时在进入判断内容是否相等
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// onlyIfAbsent为true,表示存在时不覆盖内容
if (!onlyIfAbsent)
e.val = value;
// 已经找到肯定的元素了,更新不更新都跳出
break;
}
// 由于e就在同步代码块中,桶已经被上锁,不可能有别的线程改变
// 因此不须要从新获取
Node<K,V> pred = e;
// 1. 若是e为空,则直接将元素挂接到e后面,跳出循环
// 2. e不为空,继续遍历
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 相似HashMap,树节点独立操做.
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 表示进入了上面的同步表达式,对桶进行修改以后
if (binCount != 0) {
// 若是binCount大于树的临界值,就将链表转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 若是oldVal部位空,则返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 添加元素计数,并在binCount大于0时检查是否须要扩容
addCount(1L, binCount);
return null;
}
复制代码
判断并排除key,value非空,ConcurrentHashMap
不支持key或value为空.
获得扰动后的hash,进入tab数组的遍历,若数组为空则进行初始化
经过(n - 1) & hash
的公式获取桶的下标 ,若桶为空则直接填充key,value为桶的头节点
判断桶的头节点hash,若hash == -1
表示数组在扩容并帮助扩容.
进入synchronize
的同步代码块,若是桶的头节点的hash大于0表示桶的结构为链表,接下去就是正常的链表遍历,新增或者覆盖.
若是桶的头节点是TreeBin
类型表示桶的结构为红黑树,按红黑树的操做进行遍历.
退出同步代码块,判断在遍历期间统计的binCount
是否须要转化为红黑树结构.
判断oldVal
是否为空,这步也挺关键的,若是不为空表示时覆盖操做,直接return
就好,不须要检查扩容.
若是oldVal
不为空调用addCount
方法新增元素个数,并检测是否须要扩容.
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获取hash,并进过扰动
int h = spread(key.hashCode());
// 判断以进入获取方法
// 1. 数组不为空 & 数组长度大于0
// 2. 获取的桶不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
// 获取桶下标的公式都是通用的 `(n -1) & h`
(e = tabAt(tab, (n - 1) & h)) != null)
{// 对于桶中头节点的hash,对比成功就不须要遍历整个列表了
if ((eh = e.hash) == h) {
// 返回匹配的元素value
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 元素hash < 0的状况有如下三种:
// 1. 数组正在扩容,Node的实际类型是ForwardingNode
// 2. 节点为树的root节点,TreeNode
// 3. 暂时保留的Hash, Node
// 不一样的Node都会调用各自的find()方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 若是头节点不是所需节点,且Map此时并未扩容
// 直接遍历桶中元素查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
复制代码
key
的hash,在获取以前会先判断tab是否为空以及长度(n -1)& hash
获取的桶下表获取桶.key
的hash和桶的头节点是否相等,相等则直接返回.hash < 0
,表示处于如下三种状态,则是经过调用各自实际节点类型的find
方法获取元素.
ForwardingNode
TreeNode
addCount
的做用:
ConcurrentHashMap
的元素计数/** * 参数: * x -> 具体增长的元素个数 * check -> 若是check<0不检查时都须要扩容, */
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 1. counterCells不为空
// 2. CAS修改baseCount属性成功
if ((as = counterCells) != null ||
// CAS增长baseCOunt
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 线程争用的状态标记
boolean uncontended = true;
// 1. 计数cell为null,或长度小于1
// 2. 随机去一个数组位置为为空
// 3. CAS替换CounterCell的value失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// CAS增长CounterCell的value值失败会调用fullAddCount方法
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 根据`check >= 0`判断是否须要检查扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 1. 若是元素总数大于sizeCtl,表示达到了扩容阈值
// 2. tab数组不能为空,已经初始化
// 3. table.length小于最大容,有扩容空间
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据数组长度获取一个扩容标志
int rs = resizeStamp(n);
if (sc < 0) {
// 若是sc的低16位不等于rs,表示标识符已经改变. // 待补充
// 若是nextTable为空,表示扩容已经结束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// CAS替换sc值为sc+1,成功则开始扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 调用transfer开始扩容,此时nextTable已经指定
transfer(tab, nt);
}
// `sc > 0`表示数组此时并不在扩容阶段,更新sizeCtl并开始扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 调用transfer,nextTable待生成
transfer(tab, null);
s = sumCount();
}
}
}
复制代码
/** * 参数: * tab -> 扩容的数组,通常为table * f -> 线程持有的锁对应的桶的头节点 * 调用地方: * 1. `putVal`检测到头节点Hash为MOVED */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 1.参数数组不能为空
// 2.参数f必须为ForwardingNode类型
// 3.f.nextTab不能为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// resizeStamp一顿位操做打的我头昏脑涨
// 获取扩容的标识
int rs = resizeStamp(tab.length);
// Map仍处在扩容状态的判断
// 1. 判断节点f的nextTable是否和成员变量的nextTable相同
// 2. 判断传入的tab和成员变量的table是否相同
// 3. sizeCtl是否小于0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 两种不一样的状况判断
// 一. 不须要帮助扩容的状况
// 1. sc的高16位不等于rs
// 2. sc等于rs+1
// 3. sc等于rs+MAX_RESIZERS
// 4. transferIndex <= 0, 这个好理解由于扩容时会分配并减去transferIndex,
// 小于0时表示数组的区域已分配完毕
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 二. CAS `sc+1`并调用transfer帮助扩容.
// 线程在帮助扩容时会对sizeCtl+1,完成时-1,表示标记
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
复制代码
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride为这次须要迁移的桶的数目
// NCPU为当前主机CPU数目
// MIN_TRANSFER_STRIDE为每一个线程最小处理的组数目
// 1. 在多核中stride为当前容量的1/8对CPU数目取整,例如容量为16时,CPU为2时结果是1
// 2. 在单核中stride为n就为当前数组容量
// !!! stride最小为16,被限定死.
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab是扩容的过渡对象,因此必需要先初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// !!! 重点就在这 扩容后的大小为当前的两倍 --> n << 1
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败,直接填充int的最大值
sizeCtl = Integer.MAX_VALUE;
// 直接退出
return;
}
// 更新成员变量
nextTable = nextTab;
// transferIndex为数组长度
transferIndex = n;
}
// 记录过渡数组的长度
int nextn = nextTab.length;
// 此处新建了一个ForwardingNode用于后续占位
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
/** * 以上为数据准备部分,初始化过渡数组,记录长度,建立填充节点等操做 * 如下时真正扩容的主要逻辑 */
// 该变量控制迁移的进行,
boolean advance = true;
boolean finishing = false; // 两个变量做用未知 finishing多是这次扩容标记
// 扩容的for循环里面能够分为两部分
// 一. while循环里面肯定须要迁移的桶的区域,以及本次须要迁移的桶的下标
// 这个i就是须要迁移的桶的下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 该while代码块根据if的顺序功能分别是
// --i: 负责迁移区域的向前推荐,i为桶下标
// nextIndex: 在没有获取负责区域时,检查是否还须要扩容
// CAS: 负责获取这次for循环的区域,每次都为stride个桶
while (advance) {
int nextIndex, nextBound;
// 这个`--i`每次都会进行,每次都会向前推动一个位置
if (--i >= bound || finishing)
advance = false;
// 所以若是当transferIndex<=0时,表示扩容的区域分配完
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
// CAS替换transferIndex的值,新值为旧值减去分到的stride
// stride就表示这次的迁移区域,nextIndex就表明了下次起点
// 从这里能够看出扩容是从数组末尾开始向前推动的
}else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// bount为这次扩容的推动终点,下次起点
bound = nextBound;
// i这次扩容开始的桶下表
i = nextIndex - 1;
advance = false;
}
}
// 二. 扩容的逻辑代码
// 1. 此if断定扩容的结果,中间是三种异常值
// 1). i < 0的状况时上面第二个if跳出的线程
// 2). i > 旧数组的长度
// 3). i+n大于新数组的长度
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 此阶段扩容结束后的操做
// 1. 将nextTable置空,
// 2. 将中间过渡的数组赋值给table
// 3. sizeCtl变为1.5倍(2n-0.5n)
if (finishing) {
nextTable = null;
table = nextTab;
// 分别使用有符号左移,无符号右移
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// CAS替换`sizeCtl-1`,表示本线程的扩容任务已经完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 表达式成立表示还有别的线程在执行扩容,直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 表达式成立,表示已经所有扩容完成.
finishing = advance = true;
// 提交前从新检查
i = n;
}
}
// 2. 扩容时发现负责的区域有空的桶直接使用ForwardingNode填充
// ForwardingNode持有nextTable的引用
else if ((f = tabAt(tab, i)) == null)
// CAS替换
advance = casTabAt(tab, i, null, fwd);
// 3. 表示处理完毕
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 4. 迁移桶的操做
else {
// sync保证原子性和可见性
synchronized (f) {
// 获取数组中的第i个桶的头节点
// 进入synchronized以后从新判断,保证数据的正确性没有在中间被修改
if (tabAt(tab, i) == f) {
// 此处扩容和HashMap有点像,分为了lowNode和highNode两个头结点
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
// true的话会从新
advance = true;
}
// 树的桶迁移操做
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
在addCount
方法中间检查元素个数是否达到扩容阈值(0.75 * table.length),超过则触发扩容,调用方法transfer
.
sizeCtl
会被CAS
替换为(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
接下来就是teansfer
的代码:
根据CPU
和当前容量算出每次扩容该分配的区域大小,最小为16,表示为stride
.
若过渡数组nextTab
未初始化,则先初始化数组.并使用transferIndex
记录下旧数组长度,做为扩容长度.
以上扩容须要的数据准备彻底开始具体的扩容操做:
在一个while
循环中获取本次扩容包含的桶的范围,即[transferIndex,transferIndex-stride]
的范围,i
表示当前扩容的桶的下标.
三个判断,四段代码分别完成不一样状况下的操做
i
数值异常,< 0 || >= n || + n > nextn
,表示扩容已完成,且在while
循环中没有分配的扩容任务.
若是此时finishing
参数为true
表示总体扩容完成,且完成结束前的检查.
若是finishing
为false
,则**CAS
替换sizeCtl为sizeCtl-1**,表示一个线程完成扩容任务并须要退出.
替换成功以后还会检查sc
是否等于addCount
进来时的值,不相等就直接return
,表示还有线程未完成扩容任务.
i
对应的桶为空,直接使用ForwardingNode
填充头节点,表示此处正在扩容.并设advance
为true
若是检查到节点hash
为Moved
表示当前节点为ForwardingNode
,advance
为true
.
排除了上面三种状况,就是对应的桶的迁移工做,和HashMap
有点像.结束后设置advance
为true
以后会再回到第4步.
putVal
等元素操做方法中,发现获取的桶头节点为ForwdingNode
就表示ConcurrentHashMap
当前正在扩容,会立刻调用helpTransfer
帮助扩容.helpTransfer
中会有各类正确性判断,只有在如下三个条件都都知足时才会帮助扩容.
tab
是否不空ForwardingNode
nextTable
是否初始化.sc >> 16 != rs
- 标识符已经改变.sc == rs+1
- 触发扩容的线程已退出,扩容已经完成sc == rs+MAX_RESIZER
- 参与扩容的线程达最大值transferIndex <= 0
- 扩容区域已经分配完transfer
帮助扩容.addCount
-> sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
helpTransfer
中会有判断sizeCtl
高16位的操做,resizeStamp(n)
的值推高16位,赋值给sizeCtl
,而低16位则保存了这个2.也就是说在扩容的时候sizeCtl
的高16为保存了标识符,而低16位保存了参与线程数目.有线程参与扩容 -> sizeCtl = sizeCtl - 1
线程退出扩容 -> sizeCtl = sizeCtl + 1
扩容完成 -> sizeCtl = nextTab.length * 0.75
HashMap
同样,ConcurrentHashMap
并非在构造函数中就直接初始化底层的数组,而是在put
等存方法中,判断是否须要扩容.private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// `sizeCtl`表示有别的数组正在初始化,让出CPU时间
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS操做,以-1置换`sizeCtl`的值
// 能够看出 `sizeCtl==-1`时,表示数组正在某个线程初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 置换以后须要从新检测数组是否未初始化
if ((tab = table) == null || tab.length == 0) {
// sc就是置换以前的sizeCtl.
// 此时sizeCtl做为初始容量.
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化结束以后sc变为0.75n,是扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 为避免异常退出致使sizeCtl永久为-1,此处强制赋值.
sizeCtl = sc;
}
break;
}
}
// 返回了新建的数组地址
return tab;
}
复制代码
initTable
方法时在putVal
而非构造函数,也算是CHM
中的一种懒加载机制.sizeCtl
经过CAS
置换为-1,表示正在初始化sizeCtl
以前的值为初始容量,sizeCtl
<=0时使用默认容量16sizeCtl
赋值为0.75*数组容量(sizeCtl贯穿全篇,真的很重要)static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
复制代码
Integer.numberOfLeadingZeros(n)
返回的是n的32位二进制形式前面的0的个数,例如值位16的int(32位)
类型二进制表示为000000...0010000
,1前面的就有27个0,返回就是27.|
操做如今此处能够简单理解为加法。static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
复制代码
HashMap
中的hash()
方法功能相似.CHM
中的扰动函数除了将高16位于低16位异或以外又与上HASH_BITS,能够有效下降哈希冲突的几率,使元素分散更加均匀.@SuppressWarnings("unchecked")
// tab: 数组 i : 下标
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
复制代码
CAS
形式替换数组元素// tab: 原始数组 i:下标 c:对比元素 v:替换元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
复制代码
// tab:原始数组 i:下标 v:替换元素
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
复制代码
Unsafe
是一块Java开发人员都不多接触的区域,但这里仍是简单了解一下private static final sun.misc.Unsafe U;
// sizeCtl属性的偏移地址
private static final long SIZECTL;
// transferIndex属性的偏移地址
private static final long TRANSFERINDEX;
// baseCount的偏移地址
private static final long BASECOUNT;
// cellsBusy的偏移地址
private static final long CELLSBUSY;
// CounterCell类中value的偏移地址
private static final long CELLVALUE;
// Node数组第一个元素的偏移地址
private static final long ABASE;
// Node数组中元素的增量地址,与ABASE配合使用能访问到数组的各元素
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
// 先经过反射获取到对应的属性值,再经过Unsafe类获取属性的偏移地址
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
// 获取数组中第一个元素的偏移地址
ABASE = U.arrayBaseOffset(ak);
// 获取数组的增量地址
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
复制代码