Java并发5:ConcurrentHashMap

为何要使用 ConcurrentHashMap

HashMap 是非线程安全的,put操做可能致使死循环。其解决方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。这两种方案都是对读写加锁,独占式,效率比较低下。java

HashMap 在并发执行put操做时会引发死循环,由于多线程致使 HashMap 的 Entry 链表造成环形数据结构,则 Entry 的 next 节点永远不为空,会死循环获取 Entry。算法

HashTable 使用 synchronized 来保证线程安全,可是在线程竞争激烈的状况下,效率很是低。其缘由是全部访问该容器的线程都必须竞争一把锁。编程

针对上述问题,ConcurrentHashMap 使用锁分段技术,容器里有多把锁,每一把锁用于其中一部分数据,当多线程访问不一样数据段的数据时,线程间就不会存在锁的竞争。数组

ConcurrentHashMap 实现(JDK 1.7)

在JDK 1.7中,ConcurrentHashMap 采用了 Segment 数组和 HashEntry 数组的方式进行实现。其中 Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。而 HashEntry 则是用于存储键值对的数据。结构以下图所示:安全

一个 Segment 包含一个 HashEntry 数组,每一个 HashEntry 是一个链表结构的元素。每一个 Segment 守护一个 HashEntry 数组的元素。数据结构

初始化

初始化时,计算出 Segment 数组的大小 ssize 和每一个 Segment 中 HashEntry 数组的大小 cap,并初始化 Segment 数组的第一个元素。其中 ssize 为2的幂次方,默认为16,cap 大小也是2的幂次方,最小值为2。最终结果根据初始化容量 initialCapacity 计算。多线程

//计算segment数组长度
if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //初始化segmentShift和SegmentMask
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //计算每一个Segment中HashEntry数组大小cap
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化segment数组和segment[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
复制代码

首先,初始化了 segments 数组,其长度 ssize 是经过 concurrencyLevel 计算得出的。须要保证ssize的长度是2的N次方,segments 数组的长度最大是65536。并发

而后初始化了 segmentShift 和 segmentMask 这两个全局变量,用于定位 segment 的散列算法。segmentShift 是用于散列运算的位数, segmentMask 是散列运算的掩码。ssh

以后根据 initialCapaicity 和 loadfactor 这两个参数来计算每一个 Segment 中 HashEntry 数组的大小 cap。函数

最后根据以上肯定的参数,初始化了 segment 数组以及 segment[0]。

get操做

整个 get 操做过程都不须要加锁,所以很是高效。首先将 key 通过 Hash 以后定位到 Segment,而后再经过一个 Hash 定位到具体元素。不须要加锁是由于 get 方法将须要使用的共享变量都定义成 volatile 类型,所以能在线程之间保持可见性,在多线程同时读时能保证不会读到过时的值。

put操做

put 方法须要对共享变量进行写入操做,为了线程安全,必须加锁。 put 方法首先定位到 Segment,而后在 Segment 里进行插入操做。插入操做首先要判断是否须要对 Segment 里的 HashEntry 数组进行扩容,而后定位添加元素的位置,将其放入到 HashEntry 数组。

Segment 的扩容比 HashMap 更恰当,由于后者是插入元素后判断是否已经到达容量,若是到达了就扩容,可是可能扩容后没有插入,进行了无效的扩容。Segment 在扩容时,首先建立一个原来容量两倍的数组,而后将原数组里的元素进行再散列后插入到新的数组。同时为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而是只对某个 segment 进行扩容。

size方法

每一个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将全部的 count 累加便可。可是 volatile 修饰的变量却不能保证多线程的原子性,全部直接累加很容易出现并发问题。可是若是在调用 size 方法时锁住其他的操做,效率也很低。

ConcurrentHashMap 的作法是先尝试两次经过不加锁的方式进行计算,若是两次结果相同,说明结果正确。若是计算结果不一样,则给每一个 Segment 加锁,进行统计。

ConcurrentHashMap 实现(JDK 1.8)

在JDK 1.8中,改变了分段锁的思路,采用了 Node数组 + CAS + Synchronized 来保证并发安全。底层仍然采用了数组+链表+红黑树的存储结构。

Node

在JDK 1.8中,使用 Node 替换 HashEntry,二者做用相同。在 Node 中, val 和 next 两个变量都是 volatile 修饰的,保证了可见性。

使用 table 变量存放 Node 节点数组,默认为 null, 默认大小为16,且每次扩容时大小老是2的幂次方。在扩容时,使用 nextTable 存放新生成的数据,数组为 table 的两倍。

ForwardingNode 是一个特殊的 Node 节点,hash 值为-1,存储了 nextTable 的引用。只有table 发生扩容时,其发生做用,做为占位符放在 table 中表示当前节点为null或者已经被移动。

TreeNode

在 HashMap 中,其核心的数据结构是链表。而在 ConcurrentHashMap 中,若是链表的数据过长会转换为红黑树来处理。经过将链表的节点包装成 TreeNode 放在 TreeBin 中,而后经由 TreeBin 完成红黑树的转换。

TreeBin

TreeBin 不负责键值对的包装,用于在链表转换为红黑树时包装 TreeNode 节点,用来构建红黑树。

初始化:initTable()

在构造函数中,ConcurrentHashMap 仅仅设置了一些参数。当首次插入元素时,才经过 initTable() 方法进行了初始化。

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //有其余线程在初始化,挂起当前线程
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
        //得到了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //进行初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //下次扩容的大小,至关于0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
复制代码

该方法的关键为sizeCtl。

sizeCtl:控制标识符,用来控制table初始化和扩容操做的,在不一样的地方有不一样的用途,其值也不一样,所表明的含义也不一样:

  • 负数表明正在进行初始化或扩容操做
  • -1表明正在初始化
  • -N 表示有N-1个线程正在进行扩容操做
  • 正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小

sizeCtl 默认为0。若是该值小于0,表示有其余线程在初始化,须要暂停该线程。若是该线程获取了初始化的权利,先将其设置为-1。最后将 sizeCtl 设置为 0.75*n,表示扩容的阈值。

put操做

put操做的核心思想依然是根据 hash 计算节点插入在 table 的位置,若是为空,直接插入,不然插入到链表或树中。

首先计算hash值,而后进入循环中遍历table,尝试插入。

int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
//详细代码接下来分别讲述
}
复制代码

首先判断 table 是否为空,若是为空或者是 null,则先进行初始化操做。

if (tab == null || (n = tab.length) == 0)
                tab = initTable();
复制代码

若是已经初始化过,且插入的位置没有节点,直接插入该节点。使用CAS尝试插入该节点。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
复制代码

若是有线程在扩容,先帮助扩容。

//当前位置的hashcode等于-1,须要扩容
 else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
复制代码

若是都不知足,使用 synchronized 锁写入数据。根据数据结构的不一样,若是是链表则插入尾部;若是是树节点,使用树的插入操做。

else {
V oldVal = null;
//对该节点进行加锁处理(hash值相同的链表的头节点)
synchronized (f) {
    if (tabAt(tab, i) == f) {
        //fh > 0 表示为链表,将该节点插入到链表尾部
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                //hash 和 key 都同样,替换value
                if (e.hash == hash &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    //putIfAbsent()
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                //链表尾部 直接插入
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                            value, null);
                    break;
                }
            }
        }
        //树节点,按照树的插入操做进行插入
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                    value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}
复制代码

在for循环的最后,判断链表的长度是否须要链表转换为树结构。

if (binCount != 0) {
    // 若是链表长度已经达到临界值8,把链表转换为树结构
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}
复制代码

最后,若是是更新节点,前边已经返回了 oldVal,不然就是插入新的节点。还须要使用 addCount() 方法,为 size 加一。

总结步骤以下:

  1. 判断 key 和 value 是否为 null,若是是则抛出异常
  2. 计算 hash
  3. 遍历 table,插入节点
    • 若是 table 为空,进行初始化
    • 插入位置为空,直接插入,无需加锁
    • 若是是 ForwardingNode 节点,表示有其余线程正在扩容,帮助线程一块儿进行扩容。
    • 若是是链表结构,遍历链表,若是存在 key 则更新 value,不然插入到链表尾部;若是是 TreeBin 节点,按照红黑树的方法更新或增长节点。
    • 若是完成后发现链表长度大于设定的阈值,将其装换为红黑树
  4. 若是是更新,返回oddVal;若是是插入,使用 addCount() 方法,增长 size, 返回 null。

get方法

  1. 计算 hash 值
  2. 判断 table 是否为空,为空返回 null
  3. 根据 hash 获取 Node 节点,若是是该节点则返回 value
  4. 根据链表或红黑树查找到对应节点,返回 value
  5. 找不到则返回 null

参考资料

相关文章
相关标签/搜索