Java7 中的 HashMap 和 ConcurrentHashMap 全解析

阅读建议:四节基本上能够进行独立阅读,建议初学者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 顺序进行阅读,可适当下降阅读门槛。node

阅读前提:本文分析的是源码,因此至少读者要熟悉它们的接口使用,同时,对于并发,读者至少要知道 CAS、ReentrantLock、UNSAFE 操做这几个基本的知识,文中不会对这些知识进行介绍。Java8 用到了红黑树,不过本文不会进行展开,感兴趣的读者请自行查找相关资料。数组

Java7 HashMap安全

HashMap 是最简单的,一来咱们很是熟悉,二来就是它不支持并发操做,因此源码也很是简单。并发

首先,咱们用下面这张图来介绍 HashMap 的结构。ssh

这个仅仅是示意图,由于没有考虑到数组要扩容的状况,具体的后面再说。函数

大方向上,HashMap 里面是一个数组,而后数组中每一个元素是一个单向链表。this

上图中,每一个绿色的实体是嵌套类 Entry 的实例,Entry 包含四个属性:key, value, hash 值和用于单向链表的 next。spa

capacity:当前数组容量,始终保持 2^n,能够扩容,扩容后数组大小为当前的 2 倍。线程

loadFactor:负载因子,默认为 0.75。翻译

threshold:扩容的阈值,等于 capacity * loadFactor

put 过程分析

仍是比较简单的,跟着代码走一遍吧。

public V put(K key, V value) {
    // 当插入第一个元素的时候,须要先初始化数组大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 若是 key 为 null,感兴趣的能够往里看,最终会将这个 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // 1. 求 key 的 hash 值
    int hash = hash(key);
    // 2. 找到对应的数组下标
    int i = indexFor(hash, table.length);
    // 3. 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,
    //    若是有,直接覆盖,put 方法返回旧值就结束了
    for (Entry<K, V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    // 4. 不存在重复的 key,将此 entry 添加到链表中,细节后面说
    addEntry(hash, key, value, i);
    return null;
}

数组初始化

在第一个元素插入 HashMap 的时候作一次数组的初始化,就是先肯定初始的数组大小,并计算数组扩容的阈值。

private void inflateTable(int toSize) {
    // 保证数组大小必定是 2 的 n 次方。
    // 好比这样初始化:new HashMap(20),那么处理成初始数组大小是 32
    int capacity = roundUpToPowerOf2(toSize);
    // 计算扩容阈值:capacity * loadFactor
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 算是初始化数组吧
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity); //ignore
}

这里有一个将数组大小保持为 2 的 n 次方的作法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相应的要求,只不过实现的代码稍微有些不一样,后面再看到的时候就知道了。

计算具体数组位置

这个简单,咱们本身也能 YY 一个:使用 key 的 hash 值对数组长度进行取模就能够了。

static int indexFor(int hash, int length) {
    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
    return hash & (length - 1);
}

这个方法很简单,简单说就是取 hash 值的低 n 位。如在数组长度为 32 的时候,其实取的就是 key 的 hash 值的低 5 位,做为它在数组中的下标位置。

添加节点到链表中

找到数组下标后,会先进行 key 判重,若是没有重复,就准备将新值放入到链表的表头。

void addEntry(int hash, K key, V value, int bucketIndex) {
    // 若是当前 HashMap 大小已经达到了阈值,而且新值要插入的数组位置已经有元素了,那么要扩容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 扩容,后面会介绍一下
        resize(2 * table.length);
        // 扩容之后,从新计算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 从新计算扩容后的新的下标
        bucketIndex = indexFor(hash, table.length);
    }
    // 往下看
    createEntry(hash, key, value, bucketIndex);
}

// 这个很简单,其实就是将新值放到链表的表头,而后 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K, V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

这个方法的主要逻辑就是先判断是否须要扩容,须要的话先扩容,而后再将这个新的数据插入到扩容后的数组的相应位置处的链表的表头。

数组扩容

前面咱们看到,在插入新值的时候,若是当前的 size 已经达到了阈值,而且要插入的数组位置上已经有元素,那么就会触发扩容,扩容后,数组大小为原来的 2 倍。

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }
    // 新的数组
    Entry[] newTable = new Entry[newCapacity];
    // 将原来数组中的值迁移到新的更大的数组中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    threshold = (int) Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

扩容就是用一个新的大数组替换原来的小数组,并将原来数组中的值迁移到新的数组中。

因为是双倍扩容,迁移过程当中,会将原来 table[i] 中的链表的全部节点,分拆到新的数组的 newTable[i] 和 newTable[i + oldLength] 位置上。如原来数组长度是 16,那么扩容后,原来 table[0] 处的链表中的全部元素会被分配到新数组中 newTable[0] 和 newTable[16] 这两个位置。代码比较简单,这里就不展开了。

get 过程分析

相对于 put 过程,get 过程是很是简单的。

  1. 根据 key 计算 hash 值。

  2. 找到相应的数组下标:hash & (length – 1)。

  3. 遍历该数组位置处的链表,直到找到相等(==或equals)的 key。

public V get(Object key) {
    // 以前说过,key 为 null 的话,会被放到 table[0],因此只要遍历下 table[0] 处的链表就能够了
    if (key == null)
        return getForNullKey();
    //
    Entry<K, V> entry = getEntry(key);
    return null == entry ? null : entry.getValue();
}

final Entry<K, V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }
    int hash = (key == null) ? 0 : hash(key);
    // 肯定数组下标,而后从头开始遍历链表,直到找到为止
    for (Entry<K, V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
                ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差很少的,可是由于它支持并发操做,因此要复杂一些。

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 表明”部分“或”一段“的意思,因此不少地方都会将其描述为分段锁。注意,行文中,我不少地方用了“槽”来表明一个 segment。

简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 经过继承 ReentrantLock 来进行加锁,因此每次须要加锁的操做锁住的是一个 segment,这样只要保证每一个 Segment 是线程安全的,也就实现了全局的线程安全。

concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,因此理论上,这个时候,最多能够同时支持 16 个线程并发写,只要它们的操做分别分布在不一样的 Segment 上。这个值能够在初始化的时候设置为其余值,可是一旦初始化之后,它是不能够扩容的。

再具体到每一个 Segment 内部,其实每一个 Segment 很像以前介绍的 HashMap,不过它要保证线程安全,因此处理起来要麻烦些。

初始化

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操做的时候须要平均分给每一个 Segment。

loadFactor:负载因子,以前咱们说了,Segment 数组不能够扩容,因此这个负载因子是给每一个 Segment 内部使用的。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 计算并行级别 ssize,由于要保持并行级别是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 咱们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // initialCapacity 是设置整个 map 初始的大小,
    // 这里根据 initialCapacity 计算 Segment 数组中每一个位置能够分到的大小
    // 如 initialCapacity 为 64,那么每一个 Segment 或称之为"槽"能够分到 4 个
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,由于这样的话,对于具体的槽上,
    // 插入一个元素不至于扩容,插入第二个的时候才会扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 建立 Segment 数组,
    // 并建立数组的第一个元素 segment[0]
    Segment<K, V> s0 =
            new Segment<K, V>(loadFactor, (int) (cap * loadFactor),
                    (HashEntry<K, V>[]) new HashEntry[cap]);
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
    // 往数组写入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

初始化完成,咱们获得了一个 Segment 数组。

咱们就当是用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不能够扩容

  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是之后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容

  • 这里初始化了 segment[0],其余位置仍是 null,至于为何要初始化 segment[0],后面的代码会介绍

  • 当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值立刻就会用到

put 过程分析

咱们先看 put 的主流程,对于其中的一些关键细节操做,后面会进行详细介绍。

public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 计算 key 的 hash 值
    int hash = hash(key);
    // 2. 根据 hash 值找到 Segment 数组中的位置 j
    //    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    //    而后和 segmentMask(15) 作一次与操做,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],可是其余位置仍是 null,
    // ensureSegment(j) 对 segment[j] 进行初始化
    if ((s = (Segment<K, V>) UNSAFE.getObject          // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

第一层皮很简单,根据 hash 值很快就能找到相应的 Segment,以后就是 Segment 内部的 put 操做了。

Segment 内部是由 数组+链表 组成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往该 segment 写入前,须要先获取该 segment 的独占锁
    //    先看主流程,后面还会具体介绍这部份内容
    HashEntry<K, V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 这个是 segment 内部的数组
        HashEntry<K, V>[] tab = table;
        // 再利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // first 是数组该位置处的链表的表头
        HashEntry<K, V> first = entryAt(tab, index);
        // 下面这串 for 循环虽然很长,不过也很好理解,想一想该位置没有任何元素和已经存在一个链表这两种状况
        for (HashEntry<K, V> e = first; ; ) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 继续顺着链表走
                e = e.next;
            } else {
                // node 究竟是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 若是不为 null,那就直接将它设置为链表表头;若是是null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K, V>(hash, key, value, first);
                int c = count + 1;
                // 若是超过了该 segment 的阈值,这个 segment 须要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容后面也会具体分析
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

总体流程仍是比较简单的,因为有独占锁的保护,因此 segment 内部的操做并不复杂。至于这里面的并发问题,咱们稍后再进行介绍。

到这里 put 操做就结束了,接下来,咱们说一说其中几步关键的操做。

初始化槽: ensureSegment

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其余槽来讲,在插入第一个值的时候进行初始化。

这里须要考虑并发,由于极可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就能够。

private Segment<K, V> ensureSegment(int k) {
    final Segment<K, V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K, V> seg;
    if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为何以前要初始化 segment[0] 了,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
        // 为何要用“当前”,由于 segment[0] 可能早就扩容过了
        Segment<K, V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int) (cap * lf);
        // 初始化 segment[k] 内部的数组
        HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];
        if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u))
                == null) { // 再次检查一遍该槽是否被其余线程初始化了。
            Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);
            // 使用 while 循环,内部用 CAS,当前线程成功设值或其余线程成功设值后,退出
            while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u))
                    == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

总的来讲,ensureSegment(int k) 比较简单,对于并发操做使用 CAS 进行控制。

我没搞懂这里为何要搞一个 while 循环,CAS 失败不就表明有其余线程成功了吗,为何要再进行判断?

获取写入锁: scanAndLockForPut

前面咱们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,若是失败,那么进入到 scanAndLockForPut 这个方法来获取锁。

下面咱们来具体分析这个方法中是怎么控制加锁的。

private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K, V> first = entryForHash(this, hash);
    HashEntry<K, V> e = first;
    HashEntry<K, V> node = null;
    int retries = -1; // negative while locating node
    // 循环获取锁
    while (!tryLock()) {
        HashEntry<K, V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 进到这里说明数组该位置的链表是空的,没有任何元素
                    // 固然,进到这里的另外一个缘由是 tryLock() 失败,因此该槽存在并发,不必定是该位置
                    node = new HashEntry<K, V>(hash, key, value, null);
                retries = 0;
            } else if (key.equals(e.key))
                retries = 0;
            else
                // 顺着链表往下走
                e = e.next;
        }
        // 重试次数若是超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
        //    lock() 是阻塞方法,直到获取锁后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        } else if ((retries & 1) == 0 &&
                // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
                //     因此这边的策略是,至关于从新走一遍这个 scanAndLockForPut 方法
                (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另外一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,可是其实就是作了一件事,那就是获取该 segment 的独占锁,若是须要的话顺便实例化了一下 node。

扩容: rehash

重复一下,segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry\[] 进行扩容,扩容后,容量为原来的 2 倍。

首先,咱们要回顾一下触发扩容的地方,put 的时候,若是判断该值的插入会致使该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候能够回去 put 方法看一眼。

该方法不须要考虑并发,由于到这里的时候,是持有该 segment 的独占锁的。

private void rehash(HashEntry<K, V> node) {
    HashEntry<K, V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int) (newCapacity * loadFactor);
    // 建立新数组
    HashEntry<K, V>[] newTable =
            (HashEntry<K, V>[]) new HashEntry[newCapacity];
    // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
    // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
    for (int i = 0; i < oldCapacity; i++) {
        // e 是链表的第一个元素
        HashEntry<K, V> e = oldTable[i];
        if (e != null) {
            HashEntry<K, V> next = e.next;
            // 计算应该放置在新数组中的位置,
            // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只多是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 该位置处只有一个元素,那比较好办
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是链表表头
                HashEntry<K, V> lastRun = e;
                // idx 是当前链表的头结点 e 的新位置
                int lastIdx = idx;
                // 下面这个 for 循环会找到一个 lastRun 节点,这个节点以后的全部元素是将要放到一块儿的
                for (HashEntry<K, V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将 lastRun 及其以后的全部节点组成的这个链表放到 lastIdx 这个位置
                newTable[lastIdx] = lastRun;
                // 下面的操做是处理 lastRun 以前的节点,
                //    这些节点可能分配在另外一个链表中,也可能分配到上面的那个链表中
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
                }
            }
        }
    }
    // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

这里的扩容比以前的 HashMap 要复杂一些,代码难懂一点。上面有两个挨着的 for 循环,第一个 for 有什么用呢?

仔细一看发现,若是没有第一个 for 循环,也是能够工做的,可是,这个 for 循环下来,若是 lastRun 的后面还有比较多的节点,那么此次就是值得的。由于咱们只须要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不须要作任何操做。

我以为 Doug Lea 的这个想法也是挺有意思的,不过比较坏的状况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么此次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,若是使用默认的阈值,大约只有 1/6 的节点须要克隆。

get 过程分析

相对于 put 来讲,get 真的不要太简单。

计算 hash 值,找到 segment 数组中的具体位置,或咱们前面用的“槽”

槽中也是一个数组,根据 hash 找到数组中具体的位置

到这里是链表了,顺着链表进行查找便可

public V get(Object key) {
    Segment<K, V> s; // manually integrate access methods to reduce overhead
    HashEntry<K, V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根据 hash 找到对应的 segment
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
        // 3. 找到segment 内部数组相应位置的链表,遍历
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
                (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

并发问题分析

如今咱们已经说完了 put 过程和 get 过程,咱们能够看到 get 过程当中是没有加锁的,那天然咱们就须要去考虑并发问题。

添加节点的操做 put 和删除节点的操做 remove 都是要加 segment 上的独占锁的,因此它们之间天然不会有问题,咱们须要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操做。

put 操做的线程安全性。

  • 初始化槽,这个咱们以前就说过了,使用了 CAS 来初始化 Segment 中的数组。

  • 添加节点到链表的操做是插入到表头的,因此,若是这个时候 get 操做在链表遍历的过程已经到了中间,是不会影响的。固然,另外一个并发问题就是 get 操做在 put 以后,须要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。

  • 扩容。扩容是新建立了数组,而后进行迁移数据,最后面将 newTable 设置给属性 table。因此,若是 get 操做此时也在进行,那么也不要紧,若是 get 先行,那么就是在旧的 table 上作查询操做;而 put 先行,那么 put 操做的可见性保证就是 table 使用了 volatile 关键字。

remove 操做的线程安全性。

remove 操做咱们没有分析源码,因此这里说的读者感兴趣的话仍是须要到源码中去求实一下的。

get 操做须要遍历链表,可是 remove 操做会”破坏”链表。

若是 remove 破坏的节点 get 操做已通过去了,那么这里不存在任何问题。

若是 remove 先破坏了一个节点,分两种状况考虑。 一、若是此节点是头结点,那么须要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,可是 volatile 并不能提供数组内部操做的可见性保证,因此源码中使用了 UNSAFE 来操做数组,请看方法 setEntryAt。二、若是要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。

相关文章
相关标签/搜索