并发编程——ConcurrentHashMap#helpTransfer() 分析

前言

ConcurrentHashMap 鬼斧神工,并发添加元素时,若是 map 正在扩容,其余线程甚至于还会帮助扩容,也就是多线程扩容。就这一点,就能够写一篇文章好好讲讲。今天一块儿来看看。java

源码分析

为何帮助扩容?node

在 putVal 方法中,若是发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且若是这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度。多线程

具体代码以下:并发

int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin
    } // 若是对应槽位不为空,且他的 hash 值是 -1,说明正在扩容,那么就帮助其扩容。以加快速度
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
复制代码

传入的参数是:成员变量 table 和 对应槽位的 f 变量。源码分析

怎么验证 hash 值是 MOVED 就是正在扩容呢?this

在 Cmap(ConcurrentHashMap 简称) 中,定义了一堆常量,其中:spa

static final int MOVED     = -1; // hash for forwarding nodes
复制代码

hash for forwarding nodes,说明这个为了移动节点而准备的常量。线程

在 Node 的子类 ForwardingNode 的构造方法中,能够看到这个变量做为 hash 值进行了初始化。code

ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);
    this.nextTable = tab;
}
复制代码

而这个构造方法只在一个地方调用了,即 transfer(扩容) 方法。cdn

点到为止。

关于扩容后面再开一篇。

好了,如何帮助扩容呢?那要看看 helpTransfer 方法的实现。

/** * Helps transfer if a resize is in progress. */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 若是 table 不是空 且 node 节点是转移类型,数据检验
    // 且 node 节点的 nextTable(新 table) 不是空,一样也是数据校验
    // 尝试帮助扩容
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
    	// 根据 length 获得一个标识符号
        int rs = resizeStamp(tab.length);
    	// 若是 nextTab 没有被并发修改 且 tab 也没有被并发修改
    	// 且 sizeCtl < 0 (说明还在扩容)
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
        	// 若是 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位若是不等于标识符,则标识符变化了)
        	// 或者 sizeCtl == rs + 1 (扩容结束了,再也不有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
        	// 或者 sizeCtl == rs + 65535 (若是达到最大帮助线程的数量,即 65535)
        	// 或者转移下标正在调整 (扩容结束)
        	// 结束循环,返回 table
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 若是以上都不是, 将 sizeCtl + 1, (表示增长了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            	// 进行转移
                transfer(tab, nextTab);
                // 结束循环
                break;
            }
        }
        return nextTab;
    }
    return table;
}
复制代码

关于 sizeCtl 变量:

-1 :表明table正在初始化,其余线程应该交出CPU时间片 -N: 表示正有N-1个线程执行扩容操做(高 16 位是 length 生成的标识符,低 16 位是扩容的线程数) 大于 0: 若是table已经初始化,表明table容量,默认为table大小的0.75,若是还未初始化,表明须要初始化的大小

代码步骤:

  1. 判 tab 空,判断是不是转移节点。判断 nextTable 是否更改了。
  2. 更加 length 获得标识符。
  3. 判断是否并发修改了,判断是否还在扩容。
  4. 若是还在扩容,判断标识符是否变化,判断扩容是否结束,判断是否达到最大线程数,判断扩容转移下标是否在调整(扩容结束),若是知足任意条件,结束循环。
  5. 若是不知足,并发转移。

这里有一个花费了很长时间纠结的地方:

sc == rs + 1
复制代码

这个判断能够在 addCount 方法中找到答案:默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1。

若是 sizeCtl == 标识符 + 1 ,说明库容结束了,没有必要再扩容了。

总结一下:

当 Cmap put 元素的时候,若是发现这个节点的元素类型是 forward 的话,就帮助正在扩容的线程一块儿扩容,提升速度。其中, sizeCtl 是关键,该变量高 16 位保存 length 生成的标识符,低 16 位保存并发扩容的线程数,经过这连个数字,能够判断出,是否结束扩容了。

以下图:

相关文章
相关标签/搜索