《HashMap 源码详细分析(JDK1.8)》:http://www.javashuo.com/article/p-hbiddcap-mp.htmljava
Java7 整个 ConcurrentHashMap 是一个 Segment 数组,Segment 经过继承 ReentrantLock 来进行加锁,因此每次须要加锁的操做锁住的是一个 segment,这样只要保证每一个 Segment 是线程安全的,也就实现了全局的线程安全。因此不少地方都会将其描述为分段锁。segmentfault
Java8 对 ConcurrentHashMap 进行了比较大的改动。结构上和 Java8 的 HashMap 基本上同样,不过它要保证线程安全性,因此在源码上确实要复杂一些。数组
ConcurrentHashMap 重点关注它是如何保证线程安全的,和 HashMap 相似的地方就再也不赘述。安全
// 这构造函数里,什么都不干 public ConcurrentHashMap() { } // HashMap 初始容量 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
这个初始化方法有点意思,经过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),而后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么获得 sizeCtl 为 16,若是 initialCapacity 为 11,获得 sizeCtl 为 32。多线程
在 HashMap 中初始容量直接使用的是 tableSizeFor(initialCapacity),不知道为何在 ConcurrentHashMap 改在 1.5 * initialCapacity,至于加 1 估计是考虑 initialCapacity=0 的状况。并发
sizeCtl 这个属性使用的场景不少,这里为第一个使用场景:ConcurrentHashMap 初始化。sizeCtl=0(也就是无参构造器) 表示使用默认的初始化大小,不然使用自定义的容量。less
由 put 入手分析 ConcurrentHashMap 中可能出现的线程安全性问题。dom
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 1. 计算 hash 值,(h ^ (h >>> 16)) & HASH_BITS int hash = spread(key.hashCode()); int binCount = 0; // 2. 经过自旋保证新添加的元素必定会成功添加到 HashMap 中 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 2.1 若是数组"空",进行数组初始化。如何保证数组扩容的线程安全(重点) if (tab == null || (n = tab.length) == 0) tab = initTable(); // 2.2 该 hash 对应的槽位(也叫桶bucket) 为空,直接将这个新值放入其中便可 // 数组 tab 是 volatile 修辞的,并不能说明其元素是 volatile 的。 U.getObjectVolatile else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // CAS 操做将这个新值放入该槽位,若是成功就结束了 // 若是 CAS 失败,那就是有并发操做,则走第 3 或 4 步 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 2.3 只有扩容时 f.hash==MOVED,该线程先帮助扩容才添加值 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 2.4 锁住对应的槽位,剩下的操做和 HashMap 就差很少 else { V oldVal = null; // 下面的操做都是线程安全的了 synchronized (f) { // f 加锁后对 f 的全部操做都是线程安全的,但 tab 自己并非线程安全的 // 也就是说 tab[i] 可能发生变化 if (tabAt(tab, i) == f) { // 2.4.1 头结点的 hash>=0,说明是链表 if (fh >= 0) { ... } // 2.4.2 表示红黑树 else if (f instanceof TreeBin) { ... } } } // 链表binCount表示添加元素后的长度,红黑树binCount=2不能进行treeifyBin方法 if (binCount != 0) { // 判断是否要将链表转换为红黑树,临界值和 HashMap 同样,也是 8 if (binCount >= TREEIFY_THRESHOLD) // 这个方法和 HashMap 中稍微有一点点不一样,那就是它不是必定会进行红黑树转换, // 若是当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 3. 元素个数加1,并判断是否扩容,如何保证线程安全 addCount(1L, binCount); return null; }
put 的主流程看完了,可是在下面几个过程当中是如何保证线程安全的:ide
// 总之就一个目的,让节点在 HashMap 中分布更均匀 static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
h 是对应 key 的 hashcode,计算节点的槽位是 key.hashcode 对数组的长度取余(hash%length),但若是数组长度为 2^n,则能够直接使用位运算 hash&(length-1)。这个运算实际上只有 hash 的后几位参与运算,为了让 hash 散列的更均匀,也就是 hash 更随机,让 hash 的高 16 和低 16 进行异或运算,这样 hash 的后 16 位就更不容易重复了。函数
注意此时 hashcode 可能为负值,负数在 ConcurrentHashMap 中有特殊的含义,为了保证计算的 hash 必定是正数,能够对于计算获得的 hash 值,强制把符号位去掉,保证结果只在正数区间。 更多参考 hashcode 可能为负值
hash = key.hashCode() & Integer.MAX_VALUE;
初始化方法中的并发问题是经过对 sizeCtl 进行一个 CAS 操做来控制的。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // sizeCtl=-1 表示数组正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // sizeCtl=0(也就是无参构造器) 表示使用默认的初始化大小,不然使用自定义的容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // n-n/4,即 0.75*n,和 HashMap 中阈值的计算方式同样,只是这里使用位运算 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
sizeCtl 这个属性以前在 ConcurrentHashMap 初始化化时已经提到过,这里引出了 sizeCtl 另外的使用场景:
前面咱们在 put 源码分析也说过,treeifyBin 不必定就会进行红黑树转换,也多是仅仅作数组扩容。咱们仍是进行源码分析吧。
// index是须要链表转红黑树的节点索引 private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // MIN_TREEIFY_CAPACITY=64,当数组长度小于64时优先扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 后面咱们再详细分析这个方法 tryPresize(n << 1); // 加锁后链表转红黑树,这个和 HashMap 同样 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
若是说 Java8 ConcurrentHashMap 的源码不简单,那么说的就是扩容操做和迁移操做。数组扩容后真正进行数据迁移的其实是 transfer 方法,读者应该提早知道这点。这里的扩容也是作翻倍扩容的,扩容后数组容量为原来的 2 倍。
// tryPresize 有两个方法调用 putAll 或 putVal,不论是那个方法 size 都是扩容后的长度 private final void tryPresize(int size) { // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 1. 数组没有初始化先初始化,和数组 initTable 相似 // putAll 时可能数组还未初始化 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } // 2. 数组容量已经最大或足够了,不须要扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; // 3. 真正开始扩容 else if (tab == table) { // 每次扩容时都会生成一个相似时间戳的校验标记,上一次扩容和下一次扩容都不同 // 这个值由当前数组的长度决定,格式必定为 0000 0000 100x xxxx // 高 16 所有是0,低 16 的第一位是 1,后五位由 n 决定(n转二进制后最高位1以前的0个数,最大32) int rs = resizeStamp(n); // sc<0 表示其它线程正在扩容,帮助扩容(sc=-1表示数组初始化,已经处理过了) // 扩容时 sizeCtl 高 16 位表示扩容戳(校验标记),低 16 位表示(正在参与扩容的线程数+1) if (sc < 0) { Node<K,V>[] nt; // 3.1 若是 sc 的高 16 位不等于标识符(说明sizeCtl已经改变,扩容已经结束) // 3.2 若是 sc == 标识符 + 1 (扩容结束了,再也不有线程进行扩容) // 第一个线程设置 sc = rs << 16 + 2 // 第二个线程设置 sc = sc + 1 // 当一个线程扩容结束 sc = sc -1 // 最后一个线程扩容结束后 sc == rs + 1(为何是sc==rs+1,不该该是sc==(rs<<16)+1吗????) // 3.3 若是 sc == 标识符 + 65535(辅助扩容线程数已经达到最大,问题同上????) // 3.4 若是 nextTable == null(结束扩容了,或者扩容还未开始) // 3.5 若是 transferIndex <= 0 (转移状态变化了) if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 开始扩容(网上说sizeCtl=-1表示数组初始化,所以这里直接+2) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
首先要注意方法名为 tryPresize,既然方法名中包含了 try 就说明扩容有多是不成功的。事实上也正是如此,扩容的条件是 sizeCtl>=0,也就是说此时没有线程在进行数组初始化或扩容的操做才会进行扩容。
真正的扩容是由方法 transfer 实现的,这个方法的第二个参数表示扩容后的数组。若是是由当前线程发起的扩容第二个参数为 null,若是其它线程已经在扩容,则当前线程也加入到扩容中去,扩容后的数组已经存在 nextTable。
sizeCtl
前面已经讲解了 sizeCtl 在数组初始化前中后值的变化,这里须要重点关注一下 sizeCtl 在扩容中的使用。sizeCtl 分为两部分,高 16 位表示扩容戳(校验标记),低 16 位表示正在参与扩容的线程数(线程数+1)。
高 16 位表示扩容戳(校验标记)
static final int resizeStamp(int n) { // Integer.numberOfLeadingZeros(n) 表示 n 转二进制后最高位 1 以前的 0 个数,这个数必定小于 32 // 1 << (RESIZE_STAMP_BITS - 1) 表示 0x7fff,也就是将低 16 位的第一位改为 1 // 最终结果为 0000 0000 100x xxxx return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
这个值由当前数组的长度决定,格式必定为 0000 0000 100x xxxx,这个数 rs<<16 后最高位必定是 1,也就是说 sizeCtl 是一个负数。
低 16 位正在扩容的线程数 + 1
初始化时直接 +2,以后每多一个线程参与扩容 +1,这个线程扩容线束则 -1,最终扩容完成则是 1。
putVal 时发现节点为节点的 f.hash=MOVED,说明有其它线程在对数组进行扩容则会调用 helpTransfer,也就是当前线程先帮助数组扩容后再添加元素。
// 和 tryPresize 差很少,最复杂的部分仍是这个if条件判断 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 即然节点已经修改成 ForwardingNode 则说明扩容后的数组已经建立 // 因此这里的条件判断少了一个 nextTable=null if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
阅读源码以前,先要理解并发操做的机制。原数组长度为 n,因此咱们有 n 个迁移任务,让每一个线程每次负责一个小任务是最简单的,每作完一个任务再检测是否有其余没作完的任务,帮助迁移就能够了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每一个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。因此,咱们就须要一个全局的调度者来安排哪一个线程执行哪几个任务,这个就是属性 transferIndex 的做用。
第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,而后从后往前的 stride 个任务属于第一个线程,而后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。固然,这里说的第二个线程不是真的必定指代了第二个线程,也能够是同一个线程,这个读者应该能理解吧。其实就是将一个大的迁移任务分为了一个个任务包。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 1. stride 能够理解为”步长“,有 n 个位置是须要进行迁移的 // 将这 n 个任务分为多个任务包,每一个任务包有 stride 个任务 // stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 2. 若是 nextTab 为 null,先进行一次初始化,为何是线程安全的???? // 前面咱们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab=null // 以后参与迁移的线程调用此方法时,nextTab!=null if (nextTab == null) { // initiating try { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 3. ForwardingNode 是占位用的,标记该节点已经处理过了 // 这个构造方法会生成一个 Node,key、value 和 next 都为 null,关键是 hash 为 MOVED // 后面咱们会看到,原数组中位置 i 处的节点完成迁移工做后, // 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其余线程该位置已经处理过了 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // advance=true 表示一个节点的数据已经处理完了,准备获取下一个节点 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // i 每次任务的上边界,bound 是下边界,注意是从后往前 // --i < bound 时就领取一次的任务,直到任务处理完毕 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 4. advance 为 true 表示能够进行下一个位置的迁移了 // 第一次while循环:当前线程领取任务,走第三个else(一直自旋尝试领取任务) // 最终的结果为:i 指向了 transferIndex,bound 指向了 transferIndex-stride // 以后每处理完一个节点:走第一个if,处理的下一个槽位的节点,直到当前线程领取的任务处理完毕 // 再次走第三个else,领取步长stride的任务直到transferIndex<=0 while (advance) { int nextIndex, nextBound; // 4.1 --i表示处理一下槽位的节点 if (--i >= bound || finishing) advance = false; // 4.2 transferIndex每领取一次任务减去一个步长stride // transferIndex初始值为table.length else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 4.3 自旋尝试领取步长stride的任务 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // nextBound 是此次迁移任务的下边界,注意,是从后往前 bound = nextBound; // i 是此次迁移任务的上边界 i = nextIndex - 1; advance = false; } } // 5. 若是 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束) // 若是 i >= tab.length(不知道为何这么判断) // 若是 i + tab.length >= nextTable.length(不知道为何这么判断) if (i < 0 || i >= n || i + n >= nextn) { int sc; // 5.1 完成扩容 if (finishing) { // 完成扩容 nextTable = null; table = nextTab; // 更新 table sizeCtl = (n << 1) - (n >>> 1); // 更新阈值 return; } // 5.2 sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2 // 而后,每有一个线程参与迁移就会将 sizeCtl 加 1 // 这里使用 CAS 操做对 sizeCtl 进行减 1,表明作完了属于本身的任务 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 不相等说明有其它线程在辅助扩容,当前线程直接返回,注意 sc 是-1以前的值 // 还有其它线程在参与扩容,也就是说扩容还未结束,当前线程直接返回 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 相等说明没有线程在帮助他们扩容了。也就是说,扩容结束了。 finishing = advance = true; i = n; // recheck before commit } } // 6. 若是位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“ else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 7. 该位置处是一个 ForwardingNode,表明该位置已经迁移过了 else if ((fh = f.hash) == MOVED) advance = true; // already processed // 8. 数组迁移,原理和 HashMap 同样 else { // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工做 synchronized (f) { ... } } } }
上面这部分代码看懂了,下面这部分数据迁移的代码能够先不看,和 HashMap 是同样的。
只要对 ConcurrentHashMap 中的元素进行增删,元素的个数就会发生变化,这时就须要调用 addCount 方法。在看这个方法以前先看一下 ConcurrentHashMap 是如何记录元素的个数的。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } // sumCount 用于统计当前的元素个数 final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
能够看到元素的个数分为两部分,一是 baseCount;二是 counterCells 数组中的元素累加。为何这么复杂?弄个 int 自增,或者多线程环境使用 AtomicInteger 不就能够了?
AtomicLong缺点
咱们都知道 AtomicLong 是经过 CAS 自旋的方法去设置 value,直到成功为止。那么当并发数比较多时,就会致使 CAS 的失败机率变高,重试次数更多,越多的线程重试,CAS 失败的机率越高,造成恶性循环,从而下降了效率。
《LongAddr源码解析》:https://www.jianshu.com/p/d9d4be67aa56
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 1. 统计元素个数 // 1.1 尝试直接使用 cas 修改 baseCount 的值,若是不存在锁竞争,元素个数修改为功,直接结束 // 若是失败则存在锁竞争,使用更小粒度的 cas 操做 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 1.1 uncontended表示是否存在锁竞争 boolean uncontended = true; // 1.2 将线程分流到 CounterCell[] 数组中,使用更小粒度的 cas 操做 // (1) CounterCell[]未初始化 // (2) CounterCell[]中的节点未初始化 // (3) 对 as[random] 进行 cas 操做,失败则存在锁竞争问题uncontended=false,成功直接返回 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; // 1.3 统计当前元素的个数 s = sumCount(); } // 2. 扩容操做,见 2.3 节 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // sc<0 表示有其它线程在扩容 if (sc < 0) { // 扩容已经结束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 加入到扩容中,扩容的线程数+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 当前线程发起扩容,注意线程数是直接+2 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
addCount 方法分为两部分:一是统计元素个数,二是查看更不须要扩容。这里只关注第一部分:
//基础值,没有竞争时会使用这个值 private transient volatile long baseCount; //存放Cell的hash表,大小为2的幂 private transient volatile CounterCell[] counterCells; // 经过cas实现的锁,0无锁,1得到锁 private transient volatile int cellsBusy;
private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 1. counterCells 已经初始化 if ((as = counterCells) != null && (n = as.length) > 0) { // 1.1 counterCells 子节点未初始化 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } // 1.2 执行到这里说明 cell 位置不为空 // wasUncontended=false时表示有锁竞争,直接空轮询一次???? else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 1.3 经过cas将x值加到a的value上 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 1.4 counterCells表大于可用cpu数量,或者as数组过期;设置碰撞标识为false else if (counterCells != as || n >= NCPU) collide = false; // 1.5 collide=false则空轮询一次 else if (!collide) collide = true; // 1.6 counterCells扩容一倍 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } // 2. counterCells 初始化长度为 2 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 3. 再次尝试修改 baseCount,失败就一直自旋直至成功 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
天天用心记录一点点。内容也许不重要,但习惯很重要!