ConcurrenHashMap
。下面分享一下我对ConcurrentHashMap
的理解,主要用于我的备忘。若是有不对,请批评。html
HashMap
“严重”的勾起了我对HashMap
家族的好奇心,下面分享一下我对ConcurrentHashMap
的理解,主要用于我的备忘。若是有不对,请批评。java
想要解锁更多新姿式?请访问https://blog.tengshe789.tech/node
HashMap
是咱们平时开发过程当中用的比较多的集合,但它是非线程安全的,在涉及到多线程并发的状况,进行get操做有可能会引发死循环,致使CPU利用率接近100%。 算法
所以须要支持线程安全的并发容器 ConcurrentHashMap
。segmentfault
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table;
table
表明整个哈希表。 默认为null,初始化发生在第一次插入操做,默认大小为16的数组,用来存储Node节点数据,扩容时大小老是2的幂次方。数组
/** * The next table to use; non-null only while resizing. */ private transient volatile Node<K,V>[] nextTable;
nextTable
是一个链接表,用于哈希表扩容,默认为null,扩容时新生成的数组,其大小为原数组的两倍。安全
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount;
baseCount
保存着整个哈希表中存储的全部的结点的个数总和,有点相似于 HashMap 的 size 属性。 这个数经过CAS算法更新数据结构
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
初始化哈希表和扩容 rehash 的过程,都须要依赖sizeCtl
。该属性有如下几种取值:多线程
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//MAXIMUM_CAPACITY = 1 << 30 this.sizeCtl = cap;//ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操做。 } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY;//DEFAULT_CAPACITY = 16 putAll(m); }
构造方法是三个。重点是第二个,带参的构造方法。这个带参的构造方法会调用tableSizeFor()
方法,确保table的大小老是2的幂次方(假设参数为100,最终会调整成256)。算法以下:并发
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
put()
调用putVal()
方法,让咱们看看:
final V putVal(K key, V value, boolean onlyIfAbsent) { //对传入的参数进行合法性判断 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//计算键所对应的 hash 值 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若是哈希表还未初始化,那么初始化它 if (tab == null || (n = tab.length) == 0) tab = initTable(); //根据hash值计算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若是这个位置没有值 ,那么以CAS无锁式向该位置添加一个节点 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //检测到桶结点是 ForwardingNode 类型,协助扩容(MOVED = -1; // hash for forwarding nodes) else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点 else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //向普通的链表中添加元素 if (fh >= 0) { binCount = 1; //遍历链表全部的结点 for (Node<K,V> e = f;; ++binCount) { K ek; //若是hash值和key值相同,则修改对应结点的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是遍历到了最后一个结点,那么就证实新的节点须要插入链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //若是这个节点是树节点,就按照树的方式插入值 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //若是链表长度已经达到临界值8,就须要把链表转换为树结构(TREEIFY_THRESHOLD = 8) if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //CAS 式更新baseCount,并判断是否须要扩容 addCount(1L, binCount); return null; }
其实putVal()也多多少少掉用了其余方法,让咱们继续探究一下。
科普compare and swap,解决多线程并行状况下使用锁形成性能损耗的一种机制,CAS操做包含三个操做数——内存位置(V)、预期原值(A)和新值(B)。若是内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。不然,处理器不作任何操做。不管哪一种状况,它都会在CAS指令以前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;若是包含该值,则将B放到这个位置;不然,不要更改该位置,只告诉我这个位置如今的值便可。
首先,第四行出现的int hash = spread(key.hashCode());
这是传统的计算hash的方法。key的hash值高16位不变,低16位与高16位异或做为key的最终hash值。(h >>> 16,表示无符号右移16位,高位补0,任何数跟0异或都是其自己,所以key的hash值高16位不变。)
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
第十行, tab = initTable();
这个方法的亮点是,可让put并发执行,实现table只初始化一次 。
initTable()核心思想就是,只容许一个线程对表进行初始化,若是有其余线程进来了,那么会让其余线程交出 CPU 等待下次系统调度。这样,保证了表同时只会被一个线程初始化。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //若是表为空才进行初始化操做 while ((tab = table) == null || tab.length == 0) { //若是一个线程发现sizeCtl<0,意味着另外的线程执行CAS操做成功,当前线程只须要让出cpu时间片(放弃 CPU 的使用) if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //不然说明还未有线程对表进行初始化,那么本线程就来作这个工做 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //sc 大于零说明容量已经初始化了,不然使用默认容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //计算阈值,等效于 n*0.75 sc = n - (n >>> 2); } } finally { //设置阈值 sizeCtl = sc; } break; } } return tab; }
接下来,第19行。 tab = helpTransfer(tab, f);
这句话。要了解这个,首先须要知道ForwardingNode
这个节点类型。它一个用于链接两个table
的节点类。它包含一个nextTable
指针,用于指向下一张hash表。并且这个节点的key、value、next指针所有为null,它的hash值为MOVED(static final int MOVED
= -1)。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
在扩容操做中,咱们须要对每一个桶中的结点进行分离和转移。若是某个桶结点中全部节点都已经迁移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。
helpTransfer
什么做用呢?是检测到当前哈希表正在扩容,而后让当前线程去协助扩容 !
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//新的table,nextTab已经存在前提下才能帮助扩容 int rs = resizeStamp(tab.length);//返回一个 16 位长度的扩容校验标识 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {//sizeCtl 若是处于扩容状态的话 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) //前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数 //这里判断校验标识是否相等,若是校验符不等或者扩容操做已经完成了,直接退出循环,不用协助它们扩容了 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//sc + 1 标识增长了一个线程进行扩容 transfer(tab, nextTab);//调用扩容方法 break; } } return nextTab; } return table; }
helpTransfer
精髓的是能够调用多个工做线程一块儿帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操做,其余线程就要等待扩容操做完成才能工做。
既然这里涉及到扩容的操做,咱们也一块儿来看看扩容方法transfer()
:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //计算单个线程容许处理的最少table桶首节点个数,不能小于 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //刚开始扩容,初始化 nextTab if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //transferIndex 指向最后一个桶,方便从后向前遍历 transferIndex = n; } int nextn = nextTab.length; //定义 ForwardingNode 用于标记迁移完成的桶 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //i 指向当前桶,bound 指向当前线程须要处理的桶结点的区间下限 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //遍历当前线程所分配到的桶结点 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; //transferIndex <= 0 说明已经没有须要迁移的桶了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //更新 transferIndex //为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //当前线程全部任务完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //待迁移桶为空,那么在此位置 CAS 添加 ForwardingNode 结点标识该桶已经被处理过了 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //若是扫描到 ForwardingNode,说明此桶已经被处理过了,跳过便可 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //链表的迁移操做 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; //整个 for 循环为了找到整个桶中最后连续的 fh & n 不变的结点 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //红黑树的复制算法, else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
至此,put方法讲完了
感谢
结束
此片完了~
想要了解更多精彩新姿式?请访问个人我的博客本篇为原创内容,已经于07-06在我的博客率先发表,随后CSDN,segmentfault,juejin同步发出。若有雷同,