AtomicLong是在高并发下对单一变量进行CAS操做,从而保证其原子性。java
public final long getAndAdd(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta); }
在Unsafe类中,若是有多个线程进入,只有一个线程能成功CAS,其余线程都失败。失败的线程会重复进行下一轮的CAS,可是下一轮仍是只有一个线程成功。数组
public final long getAndAddLong(Object o, long offset, long delta) { long v; do { v= this.getLongVolatile(o,offset); } while(!this.compareAndSwapLong(o,offset, v, v+delta)); return v; }
即在高并发下,AtomicLong的性能会愈来愈差劲。多线程
所以,引入了替代方案,LongAdder。并发
LongAdder是一种以空间换时间的解决方案。其内部维护了一个值base,和一个cell数组,当线程写base有冲突时,将其写入数组的一个cell中。将base和全部cell中的值求和就获得最终LongAdder的值了。
Method sum() (or, equivalently, longValue()) returns the current total combined across the variables maintaining the sum.app
public long longValue() { return sum(); } public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
LongAdder类继承了Striped64类,其中,class Striped64维护有有 Cell的内部类,Base,Cell数组等相关成员变量。
NCPU:表示当前计算机CPU数量,用于控制cells数组长度。由于一个CPU同一时间只能执行一个线程,若是cells数组长度 大于 CPU数量,并不能提升并发数,且形成空间的浪费。
cells:存放Cell的数组。
base:在没有发生过竞争时,数据会累加到base上。 或者,当cells扩容时,是须要将数据写到base中的。
cellsBusy:锁。0表示无锁状态,1表示其余线程已经持有锁。初始化cells,建立Cell,扩容cells都须要获取锁。less
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; // 当前value基于当前对象的内存偏移 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } //表示当前计算机CPU数量,控制cells数组长度 static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; //在没有发生过竞争时,数据会累加到base上, 或者 当cells扩容时,须要将数据写到base中 transient volatile int cellsBusy; // 初始化cells或者扩容cells都须要获取锁,0表示无锁状态,1表示其余线程已经持有锁
add(long x):加上给定的x。
1.一开始只加给base,那么此时cells必定没有初始化,此时只会casBase,成功则返回。
2.casBase失败,意味着多线程写base发生竞争,进入longAccumulate(x, null, uncontended = true)重试或者初始化cells。dom
3.若是cells已经初始化过了,可是,当前线程对应下标的cell为空,须要建立。进入longAccumulate(x, null, uncontended = true)建立对应cell。ide
4.若是cells已经初始化过了,同时,当前线程对应的cell 不为空,cas给当前cell赋值,成功则返回。失败,意味着当前线程对应的cell 有竞争,进入longAccumulate(x, null, uncontended = false) 重试或者扩容cells。函数
public void add(long x) { //as 表示cells引用 //b 表示获取的base值 //v 表示 指望值 //m 表示 cells 数组的长度 //a 表示当前线程命中的cell单元格 Cell[] as; long b, v; int m; Cell a; //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中 // false->表示cells未初始化,当前全部线程应该将数据写到base中 //条件二:false->表示当前线程cas替换数据成功, // true->表示发生竞争了,可能须要重试 或者 扩容 if ((as = cells) != null || !casBase(b = base, b + x)) { //何时会进来? //1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中 //2.true->表示发生竞争了,可能须要重试 或者 扩容 boolean uncontended = true; //true -> 未竞争 false->发生竞争 //条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了 // false->说明 cells 已经初始化了,当前线程应该是 找本身的cell 写值 //条件二:getProbe() 获取当前线程的hash值 m表示cells长度-1 cells长度 必定是2的次方数 15= b1111 // true-> 说明当前线程对应下标的cell为空,须要建立 longAccumulate 支持 // false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。 //条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争 // false->表示cas成功 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //都有哪些状况会调用? //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells] //2.true-> 说明当前线程对应下标的cell为空,须要建立 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] longAccumulate(x, null, uncontended); } }
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
根据LongAdder的add方法可知,参数x是add函数的传入参数,即要增长的数;
LongBinaryOperator是一个接口可扩展,重写applyAsLong方法用于处理cell中值与参数x的关系,此处传null;
wasUncontended只有在 【cells已经初始化过了,同时,当前线程对应的cell 不为空,cas给当前cell赋值,竞争修改失败】的状况下为false,其余为true。高并发
第一种状况:写base发生竞争,此时cells没有初始化,因此才会写到base,不走CASE1;
走Case2,判断有没有锁,没有锁的话,尝试加锁,成功加锁后执行初始化cells的逻辑。若是没有拿到锁,表示其它线程正在初始化cells,因此当前线程将值累加到base。
第二种状况:当前线程对应下标的cell为空,知足CASE1,到达CASE1.1中,建立一个Cell,加锁,若是成功,对应的位置其余线程没有设置过cell,将建立的cell插入相应位置。
第三种状况:当前线程对应下标的cell已经建立成功,但写入cell时发生竞争,到达CASE1.2,wasUncontended = true,把发生竞争线程的hash值rehash。
重置后走若CASE1.1,CASE1.2均不知足,到达CASE1.3【当前线程rehash过hash值,而后新命中的cell不为空】重试cas赋值+x一次,成功则退出。失败,扩容意向设置成true,rehash当前线程的hash值,再到1.3重试,还失败走CASE1.6扩容。
注意:CASE1.4要求cells数组长度不能超过cpu数量,由于一个CPU同一时间只能执行一个线程,若是cells数组长度 大于 CPU数量,并不能提升并发数,且形成空间的浪费。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //h 表示线程hash值 int h; //条件成立:说明当前线程 还未分配hash值; getProbe()获取当前线程的Hash值 if ((h = getProbe()) == 0) { //给当前线程分配hash值 ThreadLocalRandom.current(); // force initialization //取出当前线程的hash值 赋值给h h = getProbe(); //为何? 由于默认状况下 当前线程hash为0, 确定是写入到了 cells[0] 位置。 不把它当作一次真正的竞争 wasUncontended = true; } //表示扩容意向 false 必定不会扩容,true 可能会扩容。 boolean collide = false; // True if last slot nonempty //自旋 for (;;) { //as 表示cells引用 //a 表示当前线程命中的cell //n 表示cells数组长度 //v 表示 指望值 Cell[] as; Cell a; int n; long v; //CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 if ((as = cells) != null && (n = as.length) > 0) { // 如下两种状况会进入Case1: //2.true-> 说明当前线程对应下标的cell为空,须要建立 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] //CASE1.1:true->表示当前线程对应的下标位置的cell为null,须要建立new Cell if ((a = as[(n - 1) & h]) == null) { //true->表示当前锁 未被占用 false->表示锁被占用 if (cellsBusy == 0) { // Try to attach new Cell //拿当前的x建立Cell Cell r = new Cell(x); // Optimistically create //条件一:true->表示当前锁 未被占用 false->表示锁被占用 //条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败.. if (cellsBusy == 0 && casCellsBusy()) { //是否建立成功 标记 boolean created = false; try { // Recheck under lock //rs 表示当前cells 引用 //m 表示cells长度 //j 表示当前线程命中的下标 Cell[] rs; int m, j; //条件一 条件二 恒成立 //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过该位置,而后当前线程再次初始化该位置 //致使丢失数据 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //扩容意向 强制改成了false collide = false; } // CASE1.2: // wasUncontended:只有cells初始化以后,而且当前线程 竞争修改失败,才会是false else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //CASE 1.3:当前线程rehash过hash值,而后新命中的cell不为空 //true -> 写成功,退出循环 //false -> 表示rehash以后命中的新的cell 也有竞争 重试1次 再重试1次 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //CASE 1.4: //条件一:n >= NCPU true->扩容意向 改成false,表示不扩容了 false-> 说明cells数组还能够扩容 //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash以后重试便可 else if (n >= NCPU || cells != as) //扩容意向 改成false,表示不扩容了 collide = false; // At max size or stale //CASE 1.5: //!collide = true 设置扩容意向 为true 可是不必定真的发生扩容 else if (!collide) collide = true; //CASE 1.6:真正扩容的逻辑 //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程能够去竞争这把锁 //条件二:casCellsBusy true->表示当前线程 获取锁 成功,能够执行扩容逻辑 // false->表示当前时刻有其它线程在作扩容相关的操做。 else if (cellsBusy == 0 && casCellsBusy()) { try { //cells == as if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //重置当前线程Hash值 h = advanceProbe(h); } //CASE2:前置条件cells还未初始化 as 为null //条件一:true 表示当前未加锁 //条件二:cells == as?由于其它线程可能会在你给as赋值以后修改了 cells //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 致使丢失数据 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } //CASE3: //1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,因此当前线程将值累加到base //2.cells被其它线程初始化后,当前线程须要将数据累加到base else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
官方文档是这样介绍的
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
LongAdder在多个线程更新一个用于收集统计信息的而不是追求同步的公共和的状况下,是优于AtomicLong类的。在并发度小,低竞争状况下,两个类具备类似的性能。可是在高争用状况下,LongAdder的预期吞吐量要高得多,代价是更高的空间消耗。
最后,咱们再来看一下sum方法的注释
Returns the current sum. The returned value is NOT an atomic snapshot; invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that occur while the sum is being calculated might not be incorporated.
sum方法返回值只是一个接近值,并非一个准确值。它在计算总和时,并发的更新并不会被合并在内。
总结: