ConcurrentHashMap原理分析

曾经在 [高并发Java 五] JDK并发包1 中提到过ConcurrentHashMap,只是简单的提到了下ConcurrentHashMap的优势,以及大概的实现原理。html

而本文则重点介绍ConcurrentHashMap实现的细节。node

HashMap就不介绍了,具体请查看JDK7与JDK8中HashMap的实现算法

HashTable是一个线程安全的类,它使用synchronized来锁住整张Hash表来实现线程安全,即每次锁住整张表让线程独占。ConcurrentHashMap容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的Hashtable,它们有本身的锁。只要多个修改操做发生在不一样的段上,它们就能够并发进行。数组

有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。这里“按顺序”是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。安全

1. 实现原理

ConcurrentHashMap使用分段锁技术,将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问,可以实现真正的并发访问。以下图是ConcurrentHashMap的内部结构图:多线程

 

从图中能够看到,ConcurrentHashMap内部分为不少个Segment,每个Segment拥有一把锁,而后每一个Segment(继承ReentrantLock并发

static final class Segment<K,V> extends ReentrantLock implements Serializable

Segment继承了ReentrantLock,代表每一个segment均可以当作一个锁。(ReentrantLock前文已经提到,不了解的话就把当作synchronized的替代者吧)这样对每一个segment中的数据须要同步操做的话都是使用每一个segment容器对象自身的锁来实现。只有对全局须要改变时锁定的是全部的segment。app

Segment下面包含不少个HashEntry列表数组。对于一个key,须要通过三次(为何要hash三次下文会详细讲解)hash操做,才能最终定位这个元素的位置,这三次hash分别为:ssh

  1. 对于一个key,先进行一次hash操做,获得hash值h1,也即h1 = hash1(key);
  2. 将获得的h1的高几位进行第二次hash,获得hash值h2,也即h2 = hash2(h1高几位),经过h2可以肯定该元素的放在哪一个Segment;
  3. 将获得的h1进行第三次hash,获得hash值h3,也即h3 = hash3(h1),经过h3可以肯定该元素放置在哪一个HashEntry。

ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图能够看出之间的关系async

/** 
* The segments, each of which is a specialized hash table 
*/  
final Segment<K,V>[] segments;

不变(Immutable)和易变(Volatile)ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。若是使用传统的技术,如HashMap中的实现,若是容许能够在hash链的中间添加或删除元素,读操做不加锁将获得不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,其结构以下所示:

static final class HashEntry<K,V> {  
     final K key;  
     final int hash;  
     volatile V value;  
     volatile HashEntry<K,V> next;  
 }

在JDK 1.6中,HashEntry中的next指针也定义为final,而且每次插入将新添加节点做为链的头节点(同HashMap实现),并且每次删除一个节点时,会将删除节点以前的全部节点 拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除之后 有两条链存在,于是能够保证即便在同一条链中,有一个线程在删除,而另外一个线程在遍历,它们都能工做良好,由于遍历的线程能继续使用原有的链。于是这种实现是一种更加细粒度的happens-before关系,即若是遍历线程在删除线程结束后开始,则它能看到删除后的变化,若是它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。若是这不符合你的需求,仍是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()作的包装。

而HashMap中的Entry只有key是final的

static class Entry<K,V> implements Map.Entry<K,V> {
        final K key;
        V value;
        Entry<K,V> next;
        int hash;

不变 模式(immutable)是多线程安全里最简单的一种保障方式。由于你拿他没有办法,想改变它也没有机会。
不变模式主要经过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不须要同步就能自由地被访问和共享。

1.1 初始化

先看看ConcurrentHashMap的初始化作了哪些事情,构造函数的源码以下:

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。

  • initialCapacity表示新建立的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;
  • loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就须要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • concurrencyLevel表示并发级别,这个值用来肯定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。好比,若是concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想状况下ConcurrentHashMap的真正的并发访问量可以达到concurrencyLevel,由于有concurrencyLevel个Segment,假若有concurrencyLevel个线程须要访问Map,而且须要访问的数据都刚好分别落在不一样的Segment中,则这些线程可以无竞争地自由访问(由于他们不须要竞争同一把锁),达到同时访问的效果。这也是为何这个参数起名为“并发级别”的缘由。

初始化的一些动做:

  1. 验证参数的合法性,若是不合法,直接抛出异常。
  2. concurrencyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,若是超过这个值,设置为这个值。
  3. 而后使用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的大小,并记录一共向左按位移动的次数sshift,并令segmentShift = 32 - sshift,而且segmentMask的值等于ssize - 1,segmentMask的各个二进制位都为1,目的是以后能够经过key的hash值与这个值作&运算肯定Segment的索引。
  4. 检查给的容量值是否大于容许的最大容量值,若是大于该值,设置为该值。最大容量值为static final int MAXIMUM_CAPACITY = 1 << 30;。
  5. 而后计算每一个Segment平均应该放置多少个元素,这个值c是向上取整的值。好比初始容量为15,Segment个数为4,则每一个Segment平均须要放置4个元素。
  6. 最后建立一个Segment实例,将其当作Segment数组的第一个元素。

1.2 put操做

put操做的源码以下:

public V put(K key, V value) {
      Segment<K,V> s;
      if (value == null)
          throw new NullPointerException();
      int hash = hash(key);
      int j = (hash >>> segmentShift) & segmentMask;
      if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
           (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
          s = ensureSegment(j);
      return s.put(key, hash, value, false);
  }

操做步骤以下:

  1. 判断value是否为null,若是为null,直接抛出异常。
  2. key经过一次hash运算获得一个hash值。(这个hash运算下文详说)
  3. 将获得hash值向右按位移动segmentShift位,而后再与segmentMask作&运算获得segment的索引j。
    在初始化的时候咱们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
    0000 0000 0000 0000 0000 0000 0000 xxxx,而后再用这个值与segmentMask作&运算,也就是取最后四位的值。这个值肯定Segment的索引。
  4. 使用Unsafe的方式从Segment数组中获取该索引对应的Segment对象。
  5. 向这个Segment对象中put值,这个put操做也基本是同样的步骤(经过&运算获取HashEntry的索引,而后set)。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

put操做是要加锁的。

1.3 get操做

get操做的源码以下:

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

操做步骤为:

  1. 和put操做同样,先经过key进行两次hash肯定应该去哪一个Segment中取数据。
  2. 使用Unsafe获取对应的Segment,而后再进行一次&运算获得HashEntry链表的位置,而后从链表头开始遍历整个链表(由于Hash可能会有碰撞,因此用一个链表保存),若是找到对应的key,则返回对应的value值,若是链表遍历完都没有找到对应的key,则说明Map中不包含该key,返回null。

值得注意的是,get操做是不须要加锁的(若是value为null,会调用readValueUnderLock,只有这个步骤会加锁,经过前面提到的volatile和final来确保数据安全。

1.4 size操做

size操做与put和get操做最大的区别在于,size操做须要遍历全部的Segment才能算出整个Map的大小,而put和get都只关心一个Segment。假设咱们当前遍历的Segment为SA,那么在遍历SA过程当中其余的Segment好比SB可能会被修改,因而这一次运算出来的size值可能并非Map当前的真正大小。因此一个比较简单的办法就是计算Map大小的时候全部的Segment都Lock住,不能更新(包含put,remove等等)数据,计算完以后再Unlock。这是普通人可以想到的方案,可是牛逼的做者还有一个更好的Idea:先给3次机会,不lock全部的Segment,遍历全部Segment,累加各个Segment的大小获得整个Map的大小,若是某相邻的两次计算获取的全部Segment的更新的次数(每一个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,经过这个值能够获得每一个Segment的更新操做的次数)是同样的,说明计算过程当中没有更新操做,则直接返回这个值。若是这三次不加锁的计算过程当中Map的更新次数有变化,则以后的计算先对全部的Segment加锁,再遍历全部Segment计算Map大小,最后再解锁全部Segment。源代码以下:

public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

举个例子:

一个Map有4个Segment,标记为S1,S2,S3,S4,如今咱们要获取Map的size。计算过程是这样的:第一次计算,不对S1,S2,S3,S4加锁,遍历全部的Segment,假设每一个Segment的大小分别为1,2,3,4,更新操做次数分别为:2,2,3,1,则此次计算能够获得Map的总大小为1+2+3+4=10,总共更新操做次数为2+2+3+1=8;第二次计算,不对S1,S2,S3,S4加锁,遍历全部Segment,假设此次每一个Segment的大小变成了2,2,3,4,更新次数分别为3,2,3,1,由于两次计算获得的Map更新次数不一致(第一次是8,第二次是9)则能够判定这段时间Map数据被更新,则此时应该再试一次;第三次计算,不对S1,S2,S3,S4加锁,遍历全部Segment,假设每一个Segment的更新操做次数仍是为3,2,3,1,则由于第二次计算和第三次计算获得的Map的更新操做的次数是一致的,就能说明第二次计算和第三次计算这段时间内Map数据没有被更新,此时能够直接返回第三次计算获得的Map的大小。最坏的状况:第三次计算获得的数据更新次数和第二次也不同,则只能先对全部Segment加锁再计算最后解锁。

1.5 containsValue操做

containsValue操做采用了和size操做同样的想法:

public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }

2. 关于hash

看看hash的源代码:

private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

源码中的注释是这样的:

Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.

这里用到了Wang/Jenkins hash算法的变种,主要的目的是为了减小哈希冲突,使元素可以均匀的分布在不一样的Segment上,从而提升容器的存取效率。假如哈希的质量差到极点,那么全部的元素都在一个Segment中,不只存取元素缓慢,分段锁也会失去意义。

举个简单的例子:

System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);

这些数字获得的hash值都是同样的,全是15,因此若是不进行第一次预hash,发生冲突的概率仍是很大的,可是若是咱们先把上例中的二进制数字使用hash()函数先进行一次预hash,获得的结果是这样的:

0100|0111|0110|0111|1101|1010|0100|1110 1111|0111|0100|0011|0000|0001|1011|1000 0111|0111|0110|1001|0100|0110|0011|1110 1000|0011|0000|0000|1100|1000|0001|1010

上面这个例子引用自:  InfoQ

能够看到每一位的数据都散开了,而且ConcurrentHashMap中是使用预hash值的高位参与运算的。好比以前说的先将hash值向右按位移动28位,再与15作&运算,获得的结果都别为:4,15,7,8,没有冲突!

3. 注意事项

  • ConcurrentHashMap中的key和value值都不能为null,HashMap中key能够为null,HashTable中key不能为null。
  • ConcurrentHashMap是线程安全的类并不能保证使用了ConcurrentHashMap的操做都是线程安全的!
  • ConcurrentHashMap的get操做不须要加锁,put操做须要加锁

Reference:

1. http://www.cnblogs.com/ITtangtang/p/3948786.html

2. http://qifuguang.me/2015/09/10/[Java%E5%B9%B6%E5%8F%91%E5%8C%85%E5%AD%A6%E4%B9%A0%E5%85%AB]%E6%B7%B1%E5%BA%A6%E5%89%96%E6%9E%90ConcurrentHashMap/

3. http://www.cnblogs.com/yydcdut/p/3959815.html

相关文章
相关标签/搜索