图解ConcurrentHashMap

概述

上篇文章介绍了 HashMap 在多线程并发状况下是不安全的,多线程并发推荐使用 ConcurrentHashMap ,那么 ConcurrentHashMap 是什么?它的设计思想是什么,源码是怎么实现的?node

ConcurrentHashMap是什么

Concurrent翻译过来是并发的意思,字面理解它的做用是处理并发状况的 HashMap,在介绍它以前先回顾下以前的知识。经过前面两篇学习,咱们知道多线程并发下 HashMap 是不安全的(如死循环),更广泛的是多线程并发下,因为堆内存对于各个线程是共享的,而 HashMap 的 put 方法不是原子操做,假设Thread1先 put 值,而后 sleep 2秒(也能够是系统时间片切换失去执行权),在这2秒内值被Thread2改了,Thread1“醒来”再 get 的时候发现已经不是原来的值了,这就容易出问题。数组

那么如何避免这种多线程“奥迪变奥拓”的状况呢?常规思路就是给 HashMap 的 put 方法加锁(synchronized),保证同一个时刻只容许一个线程拥有对 hashmap 有写的操做权限便可。然而假如线程1中操做耗时,占着茅坑半天不出来,其余须要操做该 hashmap 的线程就须要在门口排队半天,严重影响用户体验(HashTable 就是这么干的)。举个生活中的例子,不少银行除了存取钱,还支持存取贵重物品,贵重物品都放在保险箱里,把 HashMap 和 HashTable 比做银行,结构:安全

把线程比做人,对应的状况以下:bash

  • HashMap牌银行:咱们的服务宗旨是不用排队,同一时间多人都有机会修改保险柜里的东西,你觉得你存的是美圆?取出来的实际上是日元,破产就在一瞬间,刺不刺激。
  • HashTable牌银行:咱们的服务宗旨是要排队,同一时间只有一我的有机会修改保险柜里的东西,其他的人只能看不能动手改,保你存的是美圆取得仍是美圆。什么?你说若是那人在里面睡着了不出来怎么办?不要着急,来,坐下来打会麻将等他出来。

多线程下用 HashMap 不肯定性过高,有破产的风险,不能选;用 HashTable 不会破产,可是用户体验不太好,那么怎样才能作到多人存取既不影响他人存值,又不用排队呢?有人提议搞个「银行者联盟」,多开几个像HashTable 这种「带锁」的银行就行了,有多少人办理业务,就开多少个银行,一对一服务,这个区都是大老板,开银行的成本都是小钱,因而「银行者联盟」成立了。多线程

接下来的状况是这样的:好比盖伦和亚索一块儿去银行存他们的大宝剑,这个「银行者联盟」一顿操做,而后对盖伦说,1号银行如今没人,你能够去那存,不用排队,而后盖伦就去1号银行存他的大宝剑,1号银行把盖伦接进门,立刻拉闸,一顿操做,而后把盖伦的大宝剑放在第x行第x个保险箱,等盖伦办妥离开后,再开闸;一样「银行者联盟」对亚索说,2号银行如今没人,你能够去那存,不用排队,而后亚索去2号银行存他的大宝剑,2号银行把亚索接进门,立刻拉闸,一顿操做把亚索的大宝剑放在第x行第x号保险箱,等亚索离开后再开闸,此时无论盖伦和亚索在各自银行里面待多久都不会影响到彼此,不用担忧本身的大宝剑被人偷换了。这就是ConcurrentHashMap的设计思路,用一个图来理解并发

从上图能够看出,此时锁的是对应的单个银行,而不是整个「银行者联盟」。分析下这种设计的特色:ssh

  • 多个银行组成的「银行者联盟」
  • 当有人来办理业务时,「银行者联盟」须要肯定这我的去哪一个银行
  • 当此人去到指定银行办理业务后,该银行上锁,其余人不能同时执行修改操做,直到此人离开后解锁

由这几点基本思想能够引起一些思考,好比:函数

1.成立「银行者联盟」时初识银行数是多少?怎么设计合理?源码分析

上面这张图没有给出是否须要排队的结论,这是由于须要结合实际状况分析,好比初识化有16个银行,只有两我的来办理业务,那天然不须要排队;若是如今16个银行都有人在办理业务,这时候来了第17我的,那么他仍是须要排队的。因为「银行者联盟」事先没法得知会有多少人来办理业务,因此在它创立的时候须要制定一个「标准」,即初始银行数量,人多的状况「银行者联盟」应该多开几家银行,避免别人排队;人少的状况应该少开,避免浪费钱(什么,你说不差钱?那也不行)性能

2.当有人来办理业务的时候,「银行者联盟」怎么肯定此人去哪一个银行?

正常状况下,若是全部银行都是未上锁状态,那么有人来办理业务去哪都不用排队,当其中有些银行已经上锁,那么后续「银行者联盟」给人推荐的时候就不能把客户往上锁的银行引了,不然分分钟给人锤成麻瓜。所以「银行者联盟」须要时刻保持清醒的头脑,对本身的银行空闲状况了如指掌,每次给用户推荐都应该是最好的选择。

3.「银行者联盟」怎么保证同一时间不会有两我的在同一个银行拥有存权限?

经过对指定银行加锁/解锁的方式实现。

源码分析

Java7 源码分析

经过 Java7 的源码分析下代码实现,先看下一些重要的成员

//默认的数组大小16(HashMap里的那个数组)
static final int DEFAULT_INITIAL_CAPACITY = 16;

//扩容因子0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
 
//ConcurrentHashMap中的数组
final Segment<K,V>[] segments

//默认并发标准16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//Segment是ReentrantLock子类,所以拥有锁的操做
 static final class Segment<K,V> extends ReentrantLock implements Serializable {
  //HashMap的那一套,分别是数组、键值对数量、阈值、负载因子
  transient volatile HashEntry<K,V>[] table;
  transient int count;
  transient int threshold;
  final float loadFactor;

  Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
 }
 
 //换了马甲仍是认识你!!!HashEntry对象,存key、value、hash值以及下一个节点
 static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
 }
//segment中HashEntry[]数组最小长度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用于定位在segments数组中的位置,下面介绍
final int segmentMask;
final int segmentShift;
复制代码

上面这些一下出来有点接受不了不要紧,下面都会介绍到。

接下来从最简单的初识化开始分析

ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
复制代码

默认构造函数会调用带三个参数的构造函数

public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        //步骤① start
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //步骤① end
        //步骤② start
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        //步骤② end
        // create segments and segments[0]
        //步骤③ start
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
        //步骤③ end
    }

复制代码

上面定义了许多临时变量,注释写的又少,第一次看名字根本不知道这鬼东西表明什么意思,不过咱们能够把已知的数据代进去,算出这些变量的值,再分析能不能找出一些猫腻。假设这是第一次默认建立:

  • 步骤① concurrencyLevel = 16 ,能够计算出 sshift = 4,ssize = 16,segmentShift = 28,segmentMask = 15;
  • 步骤② c = 16/16 = 1,cap = 2;
  • 步骤③有句注释,建立 Segment 数组 segments 并初始化 segments [0] ,因此 s0 初始化后数组长度为2,负载因子0.75,阈值为1;再看这里的ss的初始化(重点,圈起来要考!!!), ssize 此时为16,因此默认数组长度16,给人一种感受正好和咱们传的 concurrencyLevel 同样?看下下面的例子
例子1 例子2
ssize = 1,concurrencyLevel = 10 ssize = 1,concurrencyLevel = 8
ssize <<= 1 —> 2<10 知足 ssize <<= 1 —> 2<10 知足
ssize <<= 1 —> 4<10 知足 ssize <<= 1 —> 4<10 知足
ssize <<= 1 —> 8<10 知足 ssize <<= 1 —> 8<10 不知足 ssize = 8
ssize <<= 1 —> 16<10 不知足 ssize = 16

因此咱们传 concurrencyLevel 不必定就是最后数组的长度,长度的计算公式:

长度 = 2的n次方(2的n次方 >= concurrencyLevel)

到这里只是建立了一个长度为16的Segment 数组,并初始化数组0号位置,segmentShift和segmentMask还没派上用场,画图存档:

接着看 put 方法

public V put(K key, V value) {
        Segment<K,V> s;
        //步骤①注意valus不能为空!!!
        if (value == null)
            throw new NullPointerException();
        //根据key计算hash值,key也不能为null,不然hash(key)报空指针
        int hash = hash(key);
        //步骤②派上用场了,根据hash值计算在segments数组中的位置
        int j = (hash >>> segmentShift) & segmentMask;
        //步骤③查看当前数组中指定位置Segment是否为空
        //若为空,先建立初始化Segment再put值,不为空,直接put值。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
复制代码

步骤①能够看到和 HashMap 的区别,这里的 key/value 为空会报空指针异常;步骤②先根据 key 值计算 hash 值,再和前面算出来的两个变量计算出这个 key 应该放在哪一个Segment中(具体怎么计算的有兴趣能够去研究下,先高位运算再取与),假设咱们算出来该键值对应该放在5号,步骤③判断5号为空,看下 ensureSegment() 方法

private Segment<K,V> ensureSegment(int k) {
        //获取segments
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            //拷贝一份和segment 0同样的segment
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            //大小和segment 0一致,为2
            int cap = proto.table.length;
            //负载因子和segment 0一致,为0.75
            float lf = proto.loadFactor;
            //阈值和segment 0一致,为1
            int threshold = (int)(cap * lf);
            //根据大小建立HashEntry数组tab
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            //再次检查
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                根据已有属性建立指定位置的Segment
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
复制代码

该方法重点在于拷贝了segments[0],所以新建立的Segment与segment[0]的配置相同,因为多个线程都会有可能执行该方法,所以这里经过UNSAFE的一些原子性操做的方法作了屡次的检查,到目前为止画图存档:

如今“舞台”也有了,请开始你的表演,看下 Segment 的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //步骤① start
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            //步骤① end
            V oldValue;
            try {
                //步骤② start
                //获取Segment中的HashEntry[]
                HashEntry<K,V>[] tab = table;
                //算出在HashEntry[]中的位置
                int index = (tab.length - 1) & hash;
                //找到HashEntry[]中的指定位置的第一个节点
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //若是不为空,遍历这条链
                    if (e != null) {
                        K k;
                        //状况① 以前已存过,则替换原值
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        //状况② 另外一个线程的准备工做
                        if (node != null)
                            //链表头插入方式
                            node.setNext(first);
                        else //状况③ 该位置为空,则新建一个节点(注意这里采用链表头插入方式)
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //键值对数量+1
                        int c = count + 1;
                        //若是键值对数量超过阈值
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //扩容
                            rehash(node);
                        else //未超过阈值,直接放在指定位置
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        //插入成功返回null
                        oldValue = null;
                        break;
                    }
                }
            //步骤② end
            } finally {
                //步骤③
                //解锁
                unlock();
            }
            //修改为功,返回原值
            return oldValue;
        }
复制代码

上面的 put 方法其实和 Java7 HashMap里大体是同样的,只是多了加锁/解锁两步,也正由于这样才保证了同一时刻只有一个线程拥有修改的权限。按步骤分析下上面的流程:

  • 步骤① 执行 tryLock 方法获取锁,拿到锁返回null,没拿到锁执行 scanAndLockForPut 方法;
  • 步骤② 和 HashMap 里的那一套思路是同样的,不理解能够看下以前的文章介绍(状况②下面介绍);
  • 步骤③ 执行 unLock 方法解锁

假设如今Thread1进来存值,前面没人来过,它能够成功拿到锁,根据计算,得出它要存的键值对应该放在HashEntry[] 的0号位置,0号位置为空,因而新建一个 HashEntry,并经过 setEntryAt() 方法,放在0号位置,然而还没等 Thread1 释放锁,系统的时间片切到了 Thread2 ,先画图存档

Thread2 也来存值,经过前面的计算,刚好 Thread2 也被定位到 segments[5],接下来 Thread2 尝试获取锁,没有成功(Thread1 还未释放),执行 scanAndLockForPut() 方法:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            //经过Segment和hash值寻找匹配的HashEntry
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            //重试次数
            int retries = -1; // negative while locating node
            //循环尝试获取锁
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                //步骤①
                if (retries < 0) {
                    //状况① 没找到,以前表中不存在
                    if (e == null) {
                        if (node == null) // speculatively create node
                            //新建 HashEntry 备用,retries改为0
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    //状况② 找到,恰好第一个节点就是,retries改为0
                    else if (key.equals(e.key))
                        retries = 0;
                    //状况③ 第一个节点不是,移到下一个,retries仍是-1,继续找
                    else
                        e = e.next;
                }
                //步骤②
                //尝试了MAX_SCAN_RETRIES次还没拿到锁,简直B了dog!
                else if (++retries > MAX_SCAN_RETRIES) {
                    //泉水挂机
                    lock();
                    break;
                }
                //步骤③
                //在MAX_SCAN_RETRIES次过程当中,key对应的entry发生了变化,则从头开始
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }
复制代码

经过上面的注释分析能够看出,Thread2 虽然此刻没有权限修改,可是它也没闲着,利用等锁的这个时间,把本身要放的键值对在数组中哪一个位置计算出来了,这样当 Thread2 一拿到锁就能够立马定位到具体位置操做,节省时间。上面的步骤③稍微解释下,好比 Thread2 经过查找得知本身要修改的值在0号位置,但在 Thread1 里面又把该值改到了1号位置,若是它还去0号操做那确定出问题了,因此须要从新肯定。

假设 Thread2 put 值为("亚索",“98”),对应1号位置,那么在 scanAndLockForPut 方法中对应状况①,画图存档:

再回到 Segment put 方法中的状况②,当 Thread1 释放锁后,Thread2 持有锁,并准备把亚索放在1号位置,然而此时 Segment[5] 里的键值对数量2 > 阈值1,因此调用 rehash() 方法扩容,

private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ //旧数组引用 HashEntry<K,V>[] oldTable = table; //旧数组长度 int oldCapacity = oldTable.length; //新数组长度为旧数组的2倍 int newCapacity = oldCapacity << 1; //修改新的阈值 threshold = (int)(newCapacity * loadFactor); //建立新表 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //遍历旧表 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //肯定在新表中的位置 int idx = e.hash & sizeMask; //状况① 链表只有一个节点,指定转移到新表指定位置 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { //状况② 扩容先后位置发生改变 int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } //将改变的键值对放到新表的对应位置 newTable[lastIdx] = lastRun; // Clone remaining nodes //状况③ 把链表中剩下的节点拷到新表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //添加新的节点(链表头插入方式) int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } 复制代码

一样是扩容转移,这里的代码比 HashMap 中的 transfer 多了一些操做,在上上篇学习 HashMap 扩容可知,扩容后键值对的新位置要么和原位置同样,要么等于原位置+旧数组的长度,因此画个图来理解下上面代码这么写的缘由:

前提:当前 HashEntry[] 长度为8,阈值为 8*0.75 = 6,因此 put 第7个键值对须要扩容 ,盖伦和亚索扩容先后位置不变,妖姬和卡特扩容后位置须要加上原数组长度,因此执行上面代码流程:

上面的代码先找出扩容先后须要转移的节点,先执行转移,而后再把该条链上剩下的节点转移,之因此这么写是起到复用的效果,注释中也说了,在使用默认阈值的状况下,只有大约 1/6 的节点须要被 clone 。注意到目前为止,能够看到不管是扩容转移仍是新增节点,Java7都是采用的头插入方式,流程图以下:

相比之下,get 方法没有加锁/解锁的操做,代码比较简单就不分析了。

稍微说下Java8

Java8 对比Java7有很大的不一样,好比取消了Segments数组,容许并发扩容。

先看下ConcurrentHashMap的初始化

public ConcurrentHashMap() {
}
复制代码

和Java7不同,这里是个空方法,那么它具体的初始化操做呢?直接看下 put 方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key/value不能为空!!!
    if (key == null || value == null) throw new NullPointerException();
    //计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //注释① 表为null则初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //CAS方法判断指定位置是否为null,为空则经过建立新节点,经过CAS方法设置在指定位置
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //当前节点正在扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //指定位置不为空
        else {
            V oldVal = null;
            //注释② 加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //节点是链表的状况
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历总体链
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //若是已存在,替换原值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //若是是新加节点,则以尾部插入实现添加
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //节点是红黑树的状况
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //遍历红黑树
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                //链表中节点个数超过8转成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //注释③ 添加节点
    addCount(1L, binCount);
    return null;
}
复制代码

代码有点长,第一次看颇有可能引发身体不适,主要是由于引入了红黑树的判断和操做,以及线程安全的操做。一样key/value 为空会报空指针异常,这也是和 HashMap 一个明显的区别。

注释①

调用 initTable 初始化数组

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl小于0,当前线程让出执行权
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //CAS 操做将 sizeCtl 值改成-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //默认建立大小为16的数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                //初始化完再改回来
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
复制代码

put方法并无加锁,那么它是如何保证建立新表的时候并发安全呢?答案就是这里的 sizeCtl ,sizeCtl 默认值为0,当一个线程初始化数组时,会将 sizeCtl 改为 -1,因为被 volatile 修饰,对于其余线程来讲这个变化是可见的,上面代码看到后续线程判断 sizeCtl 小于0 就会让出执行权。

注释②

Java8 摒弃了Segment,而是对数组中单个位置加锁。当指定位置节点不为 null 时,状况与 Java8 HashMap 操做相似,新节点的添加仍是尾部插入方式。

注释③

无论是链表的仍是红黑树,肯定以后总的节点数会加1,可能会引发扩容,Java8 ConcunrrentHashMap 支持并发扩容,以前扩容老是由一个线程将旧数组中的键值对转移到新的数组中,支持并发的话,转移所须要的时间就能够缩短了,固然相应的并发处理控制逻辑也就更复杂了,扩容转移经过 transfer 方法完成,Java8中该方法很长,感兴趣的能够看下源码。。。

用一个图来表示 Java8 ConcurrentHashMap的样子

总结

经过分析源码对比了 HashMap 与 ConcurrentHashMap的差异,以及Java7和Java8上 ConcurrentHashMap 设计的不一样,固然还有不少坑没有填,好比其中调用了不少UNSAFE的CAS方法,能够减小性能上的消耗,平时不多用,了解的比较少;以及红黑树的具体原理和实现,后续慢慢填。。。

相关文章
相关标签/搜索