Java7 HashMap和ConcurrentHashMap源码阅读

首先来看下HashMap的类继承结构:

public class HashMap extends AbstractMap<K,V> impement Map<K,V>,Coloneable,Serializable{java

}

能够看出HashMap实现了Map接口。其里面的方法都是非线程安全的,且不支持并发操做。
对于HashMap主要看的是get/put方法实现,其在jdk1.7,及1.8在解决哈希冲突的上有所不一样。
1、Java7 HashMap
Java7 HashMap和ConcurrentHashMap源码阅读node

从上面的结构图中,能够大体看出,HashMap由数组:transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;没个元素对应为一个单向链表,链表数据结构以下:
    static class Entry<K,V> implements Map.Entry<K,V> {
    final K key;
    V value;
    Entry<K,V> next;
    int hash; 
        }

在HashMap中定义的成员变量:
capacity:当前数组容量,始终保持 2^n,能够扩容,扩容后数组大小为当前的 2 倍。
loadFactor:负载因子,默认为 0.75。
threshold:扩容的阈值,等于 capacity * loadFactor,当容量超过这个值时,数组将扩容。
transient int modCount; //HashMap修改次数,这个值用于和expectedModCount指望修改次数比较。数组

一、put方法解析:
public V put(K key, V value) {
//1.当插入第一个元素时,须要建立并初始化指定大小的数组
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}缓存

//2.若是 key 为 null,循环遍历table[0]上的链表,最终会将这个 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
            //3.计算key的哈希值
    int hash = hash(key);
            //四、经过h & (length-1)即h%length求模找到键值对放在哪一个位置。
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
              //循环对应位置的单向链表,逐个查找每一个链表节点上是否有整个键了。
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {//hash值为整数,比较性能比equals高;另外短路运算,哈希值系统了就不必在比较equals。
            V oldValue = e.value;//先将当前节点的键对应的值取出来。
            e.value = value; //替换为新值。
            e.recordAccess(this);
            return oldValue;
        }
    }

    modCount++; //容器修改次数加1
    addEntry(hash, key, value, i); //在指定的位置上添加数据,若空间不够则动态扩充,当前容量乘以2,新建一个数组,长度为capacity*2;并将原来的数组拷贝过来,更新对应变量。
    return null;
}

    数组初始化:
        private void inflateTable(int toSize) {
    // Find a power of 2 >= toSize
    int capacity = roundUpToPowerOf2(toSize); //指定数组容量,默认为16

    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    table = new Entry[capacity]; //改变数组的引用,指向新建立的数组
    initHashSeedAsNeeded(capacity);
}

    计算键值对的位置:
        static int indexFor(int h, int length) {
    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
    return h & (length-1); //等同于求模:h%length
}

    添加节点到链表中
        void addEntry(int hash, K key, V value, int bucketIndex) {
            //假如map的元素个数大于等于阈值,而且bucketIndex的位置已经元素,则须要动态扩容
    if ((size >= threshold) && (null != table[bucketIndex])) {
                //扩容
        resize(2 * table.length);
        hash = (null != key) ? hash(key) : 0;
                    //从新计算应该存储下标
        bucketIndex = indexFor(hash, table.length);
    }

            //建立元素及链表节点
    createEntry(hash, key, value, bucketIndex);
}
   //新建一个Entry对象,插入单向链表表头,并增长size(不管是否扩容这一步都要进行)
   void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

    数组扩容:
        void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }

            //新建一个容量扩充2倍的数组
    Entry[] newTable = new Entry[newCapacity];
            //调用transfer方法将旧数组中的键值对拷贝过来
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
            //旧数组原来的堆空间设置为引用切断,指向新数组
    table = newTable;
            //从新计算阈值
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

    键值对移植:
    /**
 * Transfers all entries from current table to newTable.
 */
void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;
                            //是否从新计算key的哈希
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
                            //从新计算元素位置
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];
            newTable[i] = e;
            e = next;
        }
    }
}

    以上就是保存键值对的主要代码,基本步骤:
    1)、计算key的哈希值;
    2)、根据哈希值计算数组元素的保存位置(h&(length-1)或h%length);
    3)、根据须要扩充数组大小;
    4)、将键值对插入到对应的链表头部或更新已有值;

    二、get方法解析
        public V get(Object key) {
            //若是key为空则直接,在存放元素时是直接存放到table[0],因此直接调用getForNullKey方法遍历对应链表便可。
    if (key == null)
        return getForNullKey();
    Entry<K,V> entry = getEntry(key);

    return null == entry ? null : entry.getValue();
}
    遍历table[0]位置的链表,返回对应key==null的值,若果返回null,则有两种状况,要么没有key==null的键值对,要么对应位置上的值为null。
private V getForNullKey() {
    if (size == 0) {
        return null;
    }
    for (Entry<K,V> e = table[0]; e != null; e = e.next) {
        if (e.key == null)
            return e.value;
    }
    return null;
}

    key值不为空,则调用返回对应的值:
        final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }

    int hash = (key == null) ? 0 : hash(key);
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

    总结基本流程:
    一、计算键的哈希值;
    二、根据哈希值找到数组中对于的链表;
    三、遍历链表,查找对应key的值;
    四、在比较查找的过程当中,先快速比较哈希值,hash相同则再继续经过equals比较;

2、java7 ConcurrentHashMap
在java7 下ConcurrentHashMap结构以下:
Java7 HashMap和ConcurrentHashMap源码阅读安全

ConcurrentHashMap是并发版的HashMap,支持复杂的并发操做,经过下降锁的粒度和cas等实现了高并发,支持原子条件的更新操做,不会抛出ConcurrentModificationException,实现了弱一致性。
    ConCurrentHashMap是一个Segment数组,每一个segment元素对应一个哈希表(结构相似于HashMap),每一个Segment段的数组一经构造方法初始化,不可再扩容。而Segment[i]段上的HashEntry[]数组则能够作相似HashMap结构同样扩容,只是操做比HashMap复杂。
    初始化:
    initialCapacity:初始容量,这里指的是ConcurrentHashMap容量,会平均分配给每一个Segment。
    loadFactor:负载因子,因为Segment[]数组不能够扩展,因此这个参数是给Segment[i]内部使用的.
        concurrencyLevel:并发级别,默认为16,表示ConcurrentHashMap被分红16段。若是初始化时传入的不是2的n次幂,则将会取最近最大的一个2的幂,好比14则扩展为16,28扩展为32。
    @SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
            //参数检查
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    //MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0; //位移位数
    int ssize = 1; //segment数组长度,
    while (ssize < concurrencyLevel) {
        ++sshift;   //左移一位
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1; //掩吗
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
            //根据ConcurrentHashMap容量,计算segment每一个段中数组的长度,也必定是2的n次
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    //segment[i]中数组容量最小为2,这样不至于插入一个元素时,不会发生扩容。只有插入第二个的时候才会扩容。
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]  建立segment数组并初始化第一个元素,其他的延时初始化
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            //Class sc = Segment[].class;
            //SBASE = UNSAFE.arrayBaseOffset(sc);获取数组中第一个元素的地址偏移量
            //putOrderedObject是putObjectVolitle的volitile版本,
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
Segment数组的大小ssize是由concurrentLevel来决定的,可是却不必定等于concurrentLevel,ssize必定是大于或等于concurrentLevel的最小的2的次幂。

put方法解析:
  @SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K,V> s;
    //这地方明确说明了ConcurrentHashMap不容许null的key/value
    if (value == null)
        throw new NullPointerException();
            //hash方法不是简单的取得hashCode,是再hashCode的基础上进行了位运算,保证散列均匀
    int hash = hash(key);
            //返回的hashcode先无符号位右移segmentShift位,在段掩码位运算
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        //ensureSegment(j) 对 segment[j] 进行初始化
        s = ensureSegment(j);
    //插入key、value到s段中
    return s.put(key, hash, value, false);
}
    此ConcurrentHashMap的put方法就作了两件事情:
    一、定位segment段,并初始化segment[j]。
    二、调用segment段上的put方法。

    接下来,接着看ConcurrentHashMap代理到Segment上的put方法,这里就须要考虑并发,只是锁的粒度被细化了而已。
            final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                    //写入前先得到独占锁,tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则建立HashEntry。tryLock必定次数后(MAX_SCAN_RETRIES变量决定,单个1,多核64),则lock等待阻塞。若遍历过程当中,因为其余线程的操做致使链表头结点变化,则须要从新遍历。
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;// // 这个是 segment 内部的数组
            int index = (tab.length - 1) & hash;  //和HashMap相似,取模,定位HashEntry,能够看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位(无符号右移)。
            HashEntry<K,V> first = entryAt(tab, index); //取得制定位置的节点 , first 是数组该位置处的链表的表头
                            //循环遍历链表,考虑到链表是否存在
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount; //修改次数
                        }
                        break;
                    }
                    e = e.next; //不存在这样的key,则指针指向下一个节点
                }
                else {
                                    //若是不为 null,那就直接将它设置为链表表头;若是是null,初始化并设置为链表表头。
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                                            //判断是否须要扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                // 其实就是将新的节点设置成原链表的表头
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;  //告诉GC能够回收
                    break;
                }
            }
        } finally {
            unlock(); //释放锁
        }
        return oldValue;
    }

            初始化segment段:
            private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;  //之因此强调是“当前”,是由于在ConcurrentHashMap构造时,初始化了segment[0].
    long u = (k << SSHIFT) + SBASE; // raw offset  
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck  //之因此DCL是为了不其余线程已经初始化了。
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                             // 使用 while 循环,内部用 CAS,当前线程成功设值或其余线程成功设值后,退出
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

    获取写入锁:在往segment[i]上进行put操做的时候,HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);先是经过快速tryLock获取独占锁,若是获取失败,则通scanAndLockForPut获取。
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
                    //循环获取锁,再次使用trylock快速的去获取锁。
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) {
                    //链表为空,则再次判断node是否为空,避免其余线程获取锁后初始化了node 节点
                    if (node == null) // speculatively create node
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }//重试次数超过MAX_SCAN_RETRIES(单核1次,多核64次)后,则进入阻塞队列等待锁
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();//lock是阻塞方法,直到得到锁后才返回。
                break;
            }
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                            //此时有新元素进入了链表中,成为了表头,这里处理方法是从新遍历链表,在走一遍scanAndLockForPut
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }

            这个方法作了两件事情:1:、得到segment独占锁,二、若是链表为空则初始化node节点

    扩容:rehash
    首先须要注意segment[]一旦被初始化是不能够被扩容的,能扩容的只是segment[i]位置上的HashEntry[]数组,当数组实际容量超过阈值,则须要扩容,容量*2。这个过程当中不用再考虑并发的问题了,由于这个时候必定得到了独占锁。
      //node为本次扩容要添加到链表的节点
    private void rehash(HashEntry<K,V> node) {
        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        int newCapacity = oldCapacity << 1; //容量*2
        threshold = (int)(newCapacity * loadFactor);
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];  //建立新数组
        int sizeMask = newCapacity - 1; //新的掩码
                    // 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
        for (int i = 0; i < oldCapacity ; i++) {
                    //e是链表的第一个元素
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                    // 计算应该放置在新数组中的位置,
        // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只多是 3 或者是 3 + 16 = 19
                int idx = e.hash & sizeMask;
                if (next == null)   //  Single node on list
                    newTable[idx] = e; //只有一个元素,直接放到idx位置
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;
                       // idx 是当前链表的头结点 e 的新位置
                    int lastIdx = idx;
                    //下面这个 for 循环会找到一个 lastRun 节点,这个节点以后的全部元素是将要放到一块儿的
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    // 将 lastRun 及其以后的全部节点组成的这个链表放到 lastIdx 这个位置
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                                      // 下面的操做是处理 lastRun 以前的节点,
            //    这些节点可能分配在另外一个链表中,也可能分配到上面的那个链表中
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
                // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }   

 get方法分析:
 public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead  手工集成访问方法以减小开销
    HashEntry<K,V>[] tab;
    //计算hash值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    //根据hash值找到对应位置的segment段
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
    //找到segment内部数组对应位置上的链表,遍历
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
    其基本流程为:
    一、计算hash值,根据hash找到对应的segment段。
    二、找到segment中数组的具体位置。
    三、遍历链表查找。

    并发问题分析

如今咱们已经说完了 put 过程和 get 过程,咱们能够看到 get 过程当中是没有加锁的,那天然咱们就须要去考虑并发问题。数据结构

添加节点的操做 put 和删除节点的操做 remove 都是要加 segment 上的独占锁的,因此它们之间天然不会有问题,咱们须要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操做。并发

put 操做的线程安全性。
初始化槽,这个咱们以前就说过了,使用了 CAS 来初始化 Segment 中的数组。
添加节点到链表的操做是插入到表头的,因此,若是这个时候 get 操做在链表遍历的过程已经到了中间,是不会影响的。固然,另外一个并发问题就是 get 操做在 put 以后,须要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
扩容。扩容是新建立了数组,而后进行迁移数据,最后面将 newTable 设置给属性 table。因此,若是 get 操做此时也在进行,那么也不要紧,若是 get 先行,那么就是在旧的 table 上作查询操做;而 put 先行,那么 put 操做的可见性保证就是 table 使用了 volatile 关键字。ssh

remove 操做的线程安全性。
get 操做须要遍历链表,可是 remove 操做会”破坏”链表。
若是 remove 破坏的节点 get 操做已通过去了,那么这里不存在任何问题。
若是 remove 先破坏了一个节点,分两种状况考虑。 一、若是此节点是头结点,那么须要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,可是 volatile 并不能提供数组内部操做的可见性保证,因此源码中使用了 UNSAFE 来操做数组,请看方法 setEntryAt。二、若是要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。ide

参考:https://javadoop.com/post/hashmap高并发

相关文章
相关标签/搜索