深刻剖析 Java 7 中的 HashMap 和 ConcurrentHashMap

本文将深刻剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源码,解析 HashMap 线程不安全的原理以及解决方案,最后以测试用例加以验证。java

1 Java7 HashMap

HashMap 的数据结构:node

从上图中能够看出,HashMap 底层就是一个数组结构,数组中的每一项又是一个链表。linux

经过查看 JDK 中的 HashMap 源码,能够看到其构造函数有一行代码:数组

public HashMap(int initialCapacity, float loadFactor) {
    ...
    table = new Entry[capacity];
    ...
}

即建立了一个大小为 capacity 的 Entry 数组,而 Entry 的结构以下:安全

static class Entry<K,V> implements Map.Entry<K,V> {
    final K key;
    V value;
    Entry<K,V> next;
    final int hash;
    ……
}

能够看到,Entry 是一个 static class,其中包含了 key 和 value ,也就是键值对,另外还包含了一个 next 的 Entry 指针。数据结构

  • capacity:当前数组容量,始终保持 2^n,能够扩容,扩容后数组大小为当前的 2 倍。默认初始容量为 16。
  • loadFactor:负载因子,默认为 0.75。
  • threshold:扩容的阈值,等于 capacity * loadFactor

1.1 put过程分析

public V put(K key, V value) {
    // 当插入第一个元素的时候,须要先初始化数组大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 若是 key 为 null,则这个 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // key 的 hash 值
    int hash = hash(key);
    // 找到对应的数组下标
    int i = indexFor(hash, table.length);
    // 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,
    // 若是有,直接覆盖,put 方法返回旧值就结束了
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
 
    modCount++;
    // 不存在重复的 key,将此 entry 添加到链表中
    addEntry(hash, key, value, i);
    return null;
}

这里对一些方法作深刻解析。多线程

  • 数组初始化
private void inflateTable(int toSize) {
    // 保证数组大小必定是 2^n
    int capacity = roundUpToPowerOf2(toSize);
    // 计算扩容阈值
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 初始化数组
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity);
}
  • 找到对应的数组下标
static int indexFor(int hash, int length) {
    // 做用等价于取模运算,但这种方式效率更高
    return hash & (length-1);
}

由于HashMap的底层数组长度老是 2^n,当 length 为 2 的 n 次方时,hash & (length-1) 就至关于对length取模,并且速度比直接取模要快的多。并发

  • 添加节点到链表中
void addEntry(int hash, K key, V value, int bucketIndex) {
    // 若是当前 HashMap 大小已经达到了阈值,而且新值要插入的数组位置已经有元素了,那么要扩容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 扩容
        resize(2 * table.length);
        // 从新计算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 计算扩容后的新的下标
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}
// 永远都是在链表的表头添加新元素
void createEntry(int hash, K key, V value, int bucketIndex) {
    // 获取指定 bucketIndex 索引处的 Entry
    Entry<K,V> e = table[bucketIndex];
    // 将新建立的 Entry 放入 bucketIndex 索引处,并让新的 Entry 指向原来的 Entry
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

当系统决定存储 HashMap 中的 key-value 对时,彻底没有考虑 Entry 中的 value,仅仅只是 根据 key 来计算并决定每一个 Entry 的存储位置 。咱们彻底能够把 Map 集合中的 value 当成 key 的附属,当系统决定了 key 的存储位置以后,value 随之保存在那里便可。app

  • 数组扩容

随着 HashMap 中元素的数量愈来愈多,发生碰撞的几率将愈来愈大,所产生的子链长度就会愈来愈长,这样势必会影响 HashMap 的存取速度。为了保证 HashMap 的效率,系统必需要在某个临界点进行扩容处理,该临界点 threshold。而在 HashMap 数组扩容以后,最消耗性能的点就出现了:原数组中的数据必须从新计算其在新数组中的位置,并放进去,这就是 resize。ssh

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 若 oldCapacity 已达到最大值,直接将 threshold 设为 Integer.MAX_VALUE
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return; // 直接返回
    }
    // 不然,建立一个更大的数组
    Entry[] newTable = new Entry[newCapacity];
    //将每条Entry从新哈希到新的数组中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    // 从新设定 threshold
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

1.2 get过程分析

public V get(Object key) {
    // key 为 null 的话,会被放到 table[0],因此只要遍历下 table[0] 处的链表就能够了
    if (key == null)
        return getForNullKey();
    // key 非 null 的状况,详见下文
    Entry<K,V> entry = getEntry(key);
 
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    // The number of key-value mappings contained in this map.
    if (size == 0) {
        return null;
    }
 
    // 根据该 key 的 hashCode 值计算它的 hash 码 
    int hash = (key == null) ? 0 : hash(key);
    // 肯定数组下标,而后从头开始遍历链表,直到找到为止
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        //若搜索的key与查找的key相同,则返回相对应的value
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

2 Java7 ConcurrentHashMap

ConcurrentHashMap 的成员变量中,包含了一个 Segment 数组 final Segment<K,V>[] segments;,而 Segment 是ConcurrentHashMap 的内部类。

而后在 Segment 这个类中,包含了一个 HashEntry 的数组transient volatile HashEntry<K,V>[] table,而 HashEntry 也是 ConcurrentHashMap 的内部类。

HashEntry 中,包含了 key 和 value 以及 next 指针(相似于 HashMap 中的 Entry),因此 HashEntry 能够构成一个链表。

2.1 成员变量及构造函数

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable { 
    ...
    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的并发等级,表示当前更新线程的估计数
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment数量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment数量
    static final int MAX_SEGMENTS = 1 << 16; 
    //
    static final int RETRIES_BEFORE_LOCK = 2;
    // segments 的掩码值, key 的散列码的高位用来选择具体的 segment
    final int segmentMask; 
    // 偏移量
    final int segmentShift; 
    final Segment<K,V>[] segments; 
    ...
    // 建立一个带有指定初始容量、加载因子和并发级别的新的空映射
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 寻找最佳匹配参数(不小于给定参数的最接近的 2^n)
        int sshift = 0; // 用来记录向左按位移动的次数
        int ssize = 1; // 用来记录Segment数组的大小
        // 计算并行级别 ssize,由于要保持并行级别是 2^n
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 若为默认值,concurrencyLevel 为 16,sshift 为 4
        // 那么计算出 segmentShift 为 28,segmentMask 为 15
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 记录每一个 Segment 上要放置多少个元素
        int c = initialCapacity / ssize;
        // 假若有余数,则Segment数量加1
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

当用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不能够扩容
  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是之后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了 segment[0],其余位置仍是 null,至于为何要初始化 segment[0],后面的代码会介绍
  • 当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值立刻就会用到

2.2 put过程分析

根据 hash 值很快就能找到相应的 Segment,以后就是 Segment 内部的 put 操做。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 根据 hash 值找到 Segment 数组中的位置 j
    // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    // 而后和 segmentMask(15) 作一次与操做,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],可是其余位置仍是 null,
    // ensureSegment(j) 对 segment[j] 进行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

Segment 内部是由 数组+链表 组成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先获取该 segment 的独占锁
    // 每个Segment进行put时,都会加锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 内部的数组
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // 数组该位置处的链表的表头
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            // 若是链头不为 null
            if (e != null) {
                K k;
                //若是在该链中找到相同的key,则用新值替换旧值,并退出循环
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //若是没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
                e = e.next;
            }
            else {
                // node 究竟是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 若是不为 null,那就直接将它设置为链表表头;若是是null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 若是超过了该 segment 的阈值,这个 segment 须要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

2.3 初始化Segment

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其余槽来讲,在插入第一个值的时候进行初始化。

这里须要考虑并发,由于极可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就能够。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为何以前要初始化 segment[0] 了,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
        // 为何要用“当前”,由于 segment[0] 可能早就扩容过了
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        // 初始化 segment[k] 内部的数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck Segment[k] 是否被其它线程初始化了
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循环,内部用 CAS,当前线程成功设值或其余线程成功设值后,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

2.4 get过程分析

比较简单,先找到 Segment 数组的位置,而后找到 HashEntry 数组的位置,最后顺着链表查找便可。

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

3 线程不安全

3.1 哈希碰撞

多个线程同时使用 put() 方法添加元素,若存在两个或多个 put() 的 key 发生了碰撞,那么有可能其中一个线程的数据被覆盖。

3.2 扩容

当数据要插入 HashMap 时,都会检查容量有没有超过设定的 thredhold,若是超过,则须要扩容。而多线程会致使扩容后的链表造成环形数据结构,一旦造成环形数据结构,Entry 的 next 的节点永远不为 null,就会在获取 Entry 时产生死循环。

例子可见文章《HashMap多线程死循环问题》。

不过要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加节点到链表中是先扩容后再添加,而例子中的源码是:

void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    // 先添加节点
    table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
    
    // 而后扩容
    if (size++ >= threshold)
        resize(2 * table.length);
}

因此在 Java7 中此例子无效。而在 Java8 中,经过确保建新链与旧链的顺序是相同的,便可避免产生死循环。

4 HashMap遍历方式

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class HashMapTest {
    private final static Map<Integer, Object> map = new HashMap<Integer, Object>(10000);
    private static final Object PRESENT = new Object();
    public static void main(String args[]) {
        long startTime;
        long endTime;
        long totalTime;
        for (int i = 0; i < 7500; i++) {
            map.put(i, PRESENT);
        }
        // 方法一
        startTime = System.nanoTime();
        Iterator iter1 = map.entrySet().iterator();
        while (iter1.hasNext()) {
            Map.Entry<Integer, Object> entry = (Map.Entry) iter1.next();
            Integer key = entry.getKey();
            Object val = entry.getValue();
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor1 pays " + totalTime + " ms");
        
        // 方法二
        startTime = System.nanoTime();
        Iterator iter2 = map.keySet().iterator();
        while (iter2.hasNext()) {
            Object key = iter2.next();
            Object val = map.get(key);
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor2 pays " + totalTime + " ms");
    }
}

运行结果:

5 性能对比

线程安全的使用 HashMap 有三种方式,分别为 Hashtable、SynchronizedMap()、ConcurrentHashMap。

Hashtable

使用 synchronized 来保证线程安全,几乎全部的 public 的方法都是 synchronized 的,而有些方法也是在内部经过 synchronized 代码块来实现。

synchronizedMap()

经过建立一个线程安全的 Map 对象,并把它做为一个封装的对象来返回。

ConcurrentHashMap

支持多线程对 Map 作读操做,而且不须要任何的 blocking 。这得益于 CHM 将 Map 分割成了不一样的部分,在执行更新操做时只锁住一部分。根据默认的并发级别, Map 被分割成 16 个部分,而且由不一样的锁控制。这意味着,同时最多能够有 16个 写线程操做 Map 。试想一下,由只能一个线程进入变成同时可由 16 个写线程同时进入(读线程几乎不受限制),性能的提高是显而易见的。但因为一些更新操做,如 put(), remove(), putAll(), clear()只锁住操做的部分,因此在检索操做不能保证返回的是最新的结果。

在迭代遍历 CHM 时, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不会返回某些最近的改变,而且在遍历过程当中,若是已经遍历的数组上的内容变化了,不会抛出 ConcurrentModificationExceptoin 的异常。

何时使用 ConcurrentHashMap ?

CHM 适用于读者数量超过写者时,当写者数量大于等于读者时,CHM 的性能是低于 Hashtable 和 synchronizedMap 的。这是由于当锁住了整个 Map 时,读操做要等待对同一部分执行写操做的线程结束。

CHM 适用于作 cache ,在程序启动时初始化,以后能够被多个请求线程访问。

CHM 是Hashtable一个很好的替代,但要记住, CHM 的比 Hashrable 的同步性稍弱。

6 拓展:Java8 HashMap & ConcurrentHashMap

Java8 对 HashMap 和 ConcurrentHashMap 作了一些修改:

  • 两者均利用了红黑树,因此其数据结构由 数组 + 链表 + 红黑树 组成。咱们知道,链表上的数据须要一个一个比较下去才能找到咱们须要的,时间复杂度取决于链表的长度,为 O(n)。为了下降这一部分的开销,在 Java8 中,当链表中的元素超过了 8 个之后,会将链表转换为红黑树,这个时候时间复杂度就降为了 O(logN)
  • Java8 中 ConcurrentHashMap 摒弃 Java7 中的 Segment 的概念,使用了另外一种方式实现保证线程安全。

Linux公社的RSS地址: https://www.linuxidc.com/rssFeed.aspx

本文永久更新连接地址: https://www.linuxidc.com/Linux/2018-09/154133.htm

相关文章
相关标签/搜索