java基础提升之ConcurrentHashMap

        这一篇咱们来讲map系列的最后一个--ConcurrentHashMap。jdk1.7与jdk1.8中此类的实现有很大差别,因为笔者使用jdk1.8,因此如下内容均为jdk1.8版本。html

简介

熟悉的套路,再来一次!java

  1. ConcurrentHashMap 支持检索的彻底并发和更新的高预期并发性。换句话说,ConcurrentHashMap是同步容器。即便全部操做都是线程安全的,可是检索操做不须要锁,而且不支持以阻止全部访问操做的方式锁住全表。因此说ConcurrentHashMap性能是很是高的。数组

  2. 检索操做包括get通常来讲不会阻塞,因此可能会与一些更新操做重叠。检索操做的结果反映了最近完成的更新操做,换句话说,对一个给定key的更新操做happen-before 对这个key的非null检索操做。可是对于聚合操做,好比putAll以及clear,并发的检索可能只会表现出一部分元素的插入或者删除。一样,Iterators、Spliterators、Enumerations返回的元素反映了哈希表某一刻或者说iterator以及enumeration建立的时刻的状态。而且不会抛出ConcurrentModificationException。iterators被设计为一次只能由一个线程使用。请记住,包括size、isEmpty以及containsValue这样的聚合状态方法的结果只在map并无在其余线程中被同步更新的状况下有用,不然,这些结果可能反映了足够用来监视或者估计目的的瞬态,可是不能用来做为程序控制。安全

  3. 在任何状况下,可以估计ConcurrentHashMap中将要存放键值对的数量并在构造函数中将这个值体如今initialCapacity中的作法可以避免因为扩容带来的性能的下降。多线程

  4. ConcurrentHashMap能够用使用java.util.concurrent.atomic.LongAdder来作频率记录,好比在ConcurrentHashMap<String,LongAdder> freqs中增长一个数可使用freqs.computeIfAbsent(Key,k -> new LongAdder()).increment();并发

  5. 像Hashtable但不像HashMap,它不容许null值用来作key或者value。app

  6. ConcurrentHashMaps支持一组顺序和并行批量操做,与大多数 Stream方法不一样,它们被设计为安全且一般合理地应用,即便是由其余线程同时更新的映射;这些批处理操做容许一个parallelismThreshold参数来决定是否进行并行进行操做,好比使用Long.MAX_VALUE能够抑制全部的并行性,使用1的话能够充分利用用于并行计算的ForkJoinPoll的commonPool;。一般来讲,在实际使用中,咱们经过这二者在其之间找寻一个最佳性能的值。dom

CAS

有人说java.util.concurrent的实现彻底依赖于CAS,那啥是CAS? CAS(Compare and Swap)比较与交换,是一个乐观锁,采用自旋的方式去更新值,能高效的完成原子操做。CAS有3个操做数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。函数

  1. ABA问题。由于CAS须要在操做值的时候检查下值有没有发生变化,若是没有发生变化则更新,可是若是一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,可是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。 从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法做用是首先检查当前引用是否等于预期引用,而且当前标志是否等于预期标志,若是所有相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 关于ABA问题参考文档: blog.hesey.net/2011/09/res…性能

  2. 循环时间长开销大。自旋CAS若是长时间不成功,会给CPU带来很是大的执行开销。若是JVM能支持处理器提供的pause指令那么效率会有必定的提高,pause指令有两个做用,第一它能够延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它能够避免在退出循环的时候因内存顺序冲突(memory order violation)而引发CPU流水线被清空(CPU pipeline flush),从而提升CPU的执行效率。

  3. 只能保证一个共享变量的原子操做。当对一个共享变量执行操做时,咱们可使用循环CAS的方式来保证原子操做,可是对多个共享变量操做时,循环CAS就没法保证操做的原子性,这个时候就能够用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操做。好比有两个共享变量i=2,j=a,合并一下ij=2a,而后用CAS来操做ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你能够把多个变量放在一个对象里来进行CAS操做。

【参考连接】www.jianshu.com/p/450925729…

存储结构

其实单看ConcurrentHashMap的存储结构来讲,跟HashMap的很像能够说同样,都是数组+链表或者数组+红黑树的方式。因此不作多解释。

基础操做中看原理

咱们以putVal为例查看,看一下源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
复制代码
  1. 咱们先将一下插入的流程,首先先检查key以及value是否为空,以后根据spread方法计算出这个key的hash值
  2. 对table进行for循环,这里说对table进行for循环其实表达有些不正确,前面咱们说过CAS经过自旋进行更新,这里的for循环在我看来就是进行自旋操做的。若是更新了就跳出循环,没有的话再来一次。
  3. 循环中,首先查看是否是空表,是的话调用initTable进行表的初始化。
  4. 不是空表的话则根据计算出的hash值找出对应的索引,若是此索引位置为空,则调用casTabAt方法尝试更新,此时不加锁。注意,这里就使用到了CAS,成功了则跳出循环。看一下具体代码:
  5. 若是fh的值==MOVED 说明此时正在进行扩容,则调用helpTransfer方法帮助扩容,扩容完成后返回新的table并进行下一次的循环尝试。这里也能够看出来,ConcurrentHashMap是能够多线程进行扩容的。
  6. 一次循环中最后一种状况,即既没有在扩容中,当前索引位置也不为空,则须要对位于这个索引位置的第一个元素进行加锁,并在进入真正插入操做时进行对加锁的这个第一个元素使用CAS方式进行验证,确认加锁的这个第一个元素在加锁前的这一时间段中没有被更改。更改的话则进入自旋操做,对上面所讲的过程在进行一遍。没有更改的话则进入下面的流程。
  7. 注意在后面再也不使用CAS方式进行更新了,由于对第一个元素进行了加锁,因此能到这一步骤的只有一个线程。这里以后的过程就跟HashMap中的很相似了,就是普通的比较决定是否更新、记录数量决定是否树化。

扩容

前面咱们说到了扩容,也提到了扩容的话,ConcurrentHashMap是支持多线程的。那咱们具体来看一下: 在putVal操做中是从下面这个方法中进入扩容机制的。

咱们看一下addCount的源码:

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
复制代码

看到用了不少的私有属性,咱们看看这些属性表明啥意思: 捡着重要的说,

  1. sizeCtl,哈希表初始化以及扩容控制。当为负数时,table正在初始化或者库容。-1表明初始化,-(1+正在帮助扩容的线程数量)表明扩容中。不是负数的话,当表是空的时也就是为初始化时,存着初始表的容量或者默认为0;初始化以后,保存下一个元素计数值,在该值上调整表的大小,此时做用相似于阈值。

  1. nextTable,用来帮助扩容的数组,只有在扩容时才不为空。其大小为源数组的两倍。

  1. table,初始化发生在第一次插入操做,默认大小为16的数组,用来存储Node节点数据,扩容时大小老是2的幂次方。

  1. transferIndex, 扩容时要分割的下一个表的索引(须要加1)

  1. baseCount,ConcurrentHashMap的元素个数=baseCount+SUM(counterCells)元素基础个数,经过CAS更新,当CAS失败则将要加的值加到counterCells数组

  1. counterCells,同上面解释的。

如今咱们开始讲讲addCount的流程:

  1. addCount中有两个大if分支,第一个if分支做用是将增长的元素数量增长到baseCount上,若是CAS失败则添加到counterCells上。

  2. 第二个分支则是扩容的主要步骤,固然只有check>=0的时候进行扩容判断。

  3. 在第二个判断中首先判断当前元素数量是否已经超出sizeCtl而且table的值不为null且table的长度不超过默认最大的容量。若是是则进行下面的扩容判断。

  4. 以后进行sc判断,小于0的话说明正在进行扩容,则判断是否扩容完成,若是完成的话则break出去结束,没有的话则调用transfer方法帮助扩容。并使用CAS更新正在扩容的线程数。

  5. 若是sc>0说明本身是第一个发起扩容的线程,则调用transfer进行扩容。

总结

  1. ConcurrentHashMap 是同步容器,采用CAS+synchronized方式保证线程安全。与java1.7不能1.7中采用分段锁的方式。

  2. ConcurrentHashMap扩容能够多线程进行协助扩容。

  3. 存储结构也是数组+链表以及数组+红黑树

  4. 扩容时默认增加为原来的二倍

  5. 扩容时同HashMap同样,也会将一个链表上的元素分红两个链表并插入到新数组的这个索引处以及(这个索引+旧数组的长度)索引处。

相关文章
相关标签/搜索