ConcurrentHashMap:源码分析到面试题

在多线程状况下,咱们的HashMap在JDK1.8以前最大的问题就是会形成环链,在JDK1.8开始以后虽然解决了环链,可是仍是会由于并发的状况下,致使数据覆盖而丢失。虽然咱们有HashTable和Collections下的同步器能够解决这个问题,可是这两种方案都不能算是一个优秀的解决方案,因此就有了咱们要介绍的ConcurrentHashMap。本文主要是针对JDK1.8的源码进行分析,可是在介绍以前也会简单提一下,1.8以前是如何设计的!html

在了解ConcurrentHashMap不妨先了解一下HashMapjava

1 JDK1.7

咱们在JDK1.8以前采用的是SegmentHashEntry的方式实现的。结构以下:node

咱们是采用分段锁来实现并发的更新。Segment是继承自咱们的ReentrantLock来充当锁的角色,每个Segment都对应一个锁。从图中咱们也能够看到,咱们的每个Segment对象都对应了哈希表的若干个哈希桶,至关于一小段哈希表!面试

这样咱们在实现并发更新的时候,就不会锁住这个哈希表,而是锁住Segment对应的那一个对象那一部分,就会提升了咱们的性能和效率。具体的源码这里就不分析了,由于咱们主要是介绍1.8的ConcurrentHashMap。数组

2 JDK1.8

咱们的ConcurrentHashMap在1.8以后就放弃了分段锁的解决方案,而是采用了CAS+Synchronized来保证并发更新的安全。底层和咱们的HashMap同样,采用的是数组+链表+红黑树的存储结构!安全

咱们在上面说到了1.8是采用CAS+Synchronized来保证并发安全,因此在若是对CAS还不了解的话,能够先看个人关于CAS的博客。(点击跳转数据结构

好了接下来咱们就开始对源码进行分析了。多线程

2.1 基本属性

ConcurrentHashMap不少基本属性都和咱们的HashMap同样,因此这里我只介绍几个不同的,并且后面咱们分析源码会用到的。并发

//咱们的哈希表,但是使用迭代器来进行迭代
transient volatile Node<K,V>[] table;
//默认为null,扩容的时候新生成的数组,其大小为原数组的两倍。
private transient volatile Node<K,V>[] nextTable;
//基础计数器,经过CAS来进行更新
private transient volatile long baseCount;
/*
*默认为0,用来控制table的初始化和扩容操做的
*当为负数时,它正在进行初始化或者在扩容:
*-1,表示正在进行初始化;-N表示N-1个线程在进行扩容
*当为正数的时候:
*若是table未初始化,表示须要初始化的大小;
*若是table初始化完成,表示table的容量,默认是table的0.75倍,
*/
private transient volatile int sizeCtl;

还有就是对比咱们的HashMap,咱们的Node也进行了重写,将咱们的值和下一个结点都用了Volatile来修饰,线程修改后马上刷回主存,增长了内存的可见性。源码分析

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
...部分代码省略...
}

2.2 构造方法

ConcurrentHashMap有五个构造方法,其中四个与HashMap相似,因此咱们主要介绍这个多了一个参数的构造方法

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

咱们第一个参数是容量大小,能够指定;第二个参数是咱们的负载装载因子;第三个是指定咱们的更新的并发线程数量;而后进行一些边界处理和赋值处理。最后就将咱们的要扩容的大小赋值给了sizeCtl(上面介绍了,咱们下次要扩容的大小),注意这里咱们并无进行初始化table,而是在第一次put的时候才会进行初始化,下面会讲到。

咱们一样会在上面的构造方法里面看到一个方法tableSizeFor,咱们点进去看,原来和咱们的HashMap的那个设计容量为2的整数次幂方法同样,至于为何要设置成2的整数次幂,我在HashMap方法里面也提到了。

private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

2.3 put方法

咱们知道在进行第一次put的时候会进行扩容,那么若是有多个线程同时进来,咱们是如何保证只有一个线程成功的进行了扩容呢?咱们在第一次put的时候putVal方法里面有这么一行代码

if (tab == null || (n = tab.length) == 0)
      tab = initTable();

咱们调用了initTable方法,在下面注释上给出解析

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //若是sizectl(sc)小于0,说明已经有线程进行在初始化了,咱们的其余进来的线程做罢
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //使用cas操做,将咱们的sc更新为-1,表明在进行初始化了
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //右移两位再操做,至关于0.75*n,设置了一个扩容的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

回来咱们继续看一下咱们的完整的putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
     //key的散列,获取哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //若是表没有进行初始化,则进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //若是能够直接将咱们的哈希值插入数组,则直接存进去,不用加锁
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //若是插入的点是咱们的table的链接点,说明在扩容,咱们就帮助当前线程扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //而后在进行具体的增长操做的时候,加锁
                synchronized (f) {
                    //肯定f在tab中是链表的头结点
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //若是哈希桶是树,按照树的方式插入新值
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //若是节点大于等于8,进行变换红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        //调用生成树的方法
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
     //能执行到这一步,说明节点不是被替换的,是被插入的,因此要将map的元素数量加1
        addCount(1L, binCount);
        return null;
    }

当table容量不足的时候,即table的元素数量达到容量阈值sizeCtl,须要对table进行扩容。
整个扩容分为两部分:

  1. 构建一个nextTable,大小为table的两倍。
  2. 把table的数据复制到nextTable中。

这两个过程在单线程下实现比较简单,可是在多线程下比较复杂。咱们的ConcurrentHashMap是支持并发插入的,这里用图文简单分析一下:

多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另外一个线程看到forward,就向后遍历。这样交叉就完成了复制工做。

(这里具体的addCount方法和transfer方法暂时看的不是大懂,后面会补上!)

2.4 get方法

get方法比较简单,就是若是是在桶第一个就返回;若是是树的结构调用树的方法去遍历查找;若是是链表就遍历下去查找;若是都没找到就返回null;

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法这么简单贴上来只是为了说明,咱们的get方法是没有加锁的,无阻塞的。之因此可以正确的读取值是由于咱们在上面也说到了,重写了node,里面的变量都用了volatile关键字来进行修饰。并且经过代码能够得出ConcurrentHashMap的key和Value都不能为null。

3 面试题分析

一样的,再进行了稍微的源码分析,咱们试着来解决一些面试题。

一、ConcurrentHashMap使用什么技术来保证线程安全?

咱们在上面分析过了,1.7的时候采用的Segment分段锁来实现,1.8采用的是CAS+Synchronized来实现的。具体实现细节,balabala简单描述一下。

二、ConcurrentHashMap的get方法是否要加锁,为何?

不用,咱们说过了,get方法是无阻塞不加锁的。由于咱们重写了node类,里面的变量都用了volatile关键字来进行修饰,能够保证最新值的获取!

三、ConcurrentHashMap1.7和1.8的区别?

数据结构

  • 1.7:SegmentHashEntry
  • 1.8:数组+链表+红黑树

并发安全实现

  • 1.7:分段式锁(锁的对象是一个Segment)
  • 1.8:CAS+Synchronized(下降了锁的粒度,对象是一个Node)

其余的面试题,无非与HashMap大径类似,能够看看个人HashMap分析,里面也有面试题详解。(点击跳转)

4 总结

关于源码其实还有不少都没有分析,由于这比HashMap要复杂也难。因此挑一些高频考点来进行分析。感谢下面的参考资料!

5 参考资料

https://www.jianshu.com/p/e694f1e868ec

公众号《Java3y》多线程系列文章

http://www.javashuo.com/article/p-exjabbki-ed.html

相关文章
相关标签/搜索