原文地址java
最近在看阿里的 Sentinel 的源码的时候。发现使用了一个类 LongAdder 来在并发环境中计数。这个时候就提出了疑问,JDK 中已经有 AtomicLong 了,为啥还要使用 LongAdder ? AtomicLong 已是基于 CAS 的无锁结构,已经有很好的并发表现了,为啥还要用 LongAdder ?因而赶快找来源码一探究竟。git
你们能够阅读我以前写的 JAVA 中的 CAS 详细了解 AtomicLong 的实现原理。须要注意的一点是,AtomicLong 的 Add() 是依赖自旋不断的 CAS 去累加一个 Long 值。若是在竞争激烈的状况下,CAS 操做不断的失败,就会有大量的线程不断的自旋尝试 CAS 会形成 CPU 的极大的消耗。github
经过阅读 LongAdder 的 Javadoc 咱们了解到:数组
This class is usually preferable to {@link AtomicLong} when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.微信
大概意思就是,LongAdder 功能相似 AtomicLong ,在低并发状况下两者表现差很少,在高并发状况下 LongAdder 的表现就会好不少。并发
LongAdder 到底用了什么黑科技能作到高性比 AtomicLong 还要好呢呢?对于一样的一个 add() 操做,上文说到 AtomicLong 只对一个 Long 值进行 CAS 操做。而 LongAdder 是针对 Cell 数组的某个 Cell 进行 CAS 操做 ,把线程的名字的 hash 值,做为 Cell 数组的下标,而后对 Cell[i] 的 long 进行 CAS 操做。简单粗暴的分散了高并发下的竞争压力。app
虽然原理简单粗暴,可是代码写得却至关细致和精巧。less
在 java.util.concurrent.atomic
包下面咱们能够看到 LongAdder 的源码。首先看 add() 方法的源码。dom
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
复制代码
看到这个 add() 方法,首先须要了解 Cell 是什么?ide
Cell 是 java.util.concurrent.atomic
下 Striped64
的一个内部类。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// unsafe 机制
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
复制代码
首先 Cell 被 @sun.misc.Contended 修饰。意思是让Java编译器和JRE运行时来决定如何填充。不理解没关系,不影响理解。
其实一个 Cell 的本质就是一个 volatile 修饰的 long 值,且这个值可以进行 cas 操做。
回到咱们的 add() 方法。
这里涉及四个额外的方法 casBase() , getProbe() , a.cas() , longAccumulate();
咱们看名字就知道 casBase() 和 a.cas() 都是对参数的 cas 操做。
getProbe() 的做用,就是根据当前线程 hash 出一个 int 值。
longAccumlate() 的做用比较复杂,以后咱们会讲解。
因此这个 add() 操做概括之后就是:
可见,操做的核心思想仍是基于 cas。可是 cas 失败后,并非傻乎乎的自旋,而是逐渐升级。升级的 cas 都无论用了则进入 longAccumulate() 这个方法。
下面就开始揭开 longAccumulate 的神秘面纱。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//若是操做的cell 为空,double check 新建 cell
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// cas 失败 继续循环
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 若是 cell cas 成功 break
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 若是 cell 的长度已经大于等于 cpu 的数量,扩容意义不大,就不用标记冲突,重试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 获取锁,上锁扩容,将冲突标记为否,继续执行
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 无法获取锁,重散列,尝试其余槽
h = advanceProbe(h);
}
// 获取锁,初始化 cell 数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 表未被初始化,可能正在初始化,回退使用 base。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
复制代码
longAccumulate 看上去比较复杂。咱们慢慢分析。
回忆一下,什么状况会进入到这个 longAccumulate 方法中,
在 longAccumulate 中有几个标记位,咱们也先理解一下
cellsBusy
cells 的操做标记位,若是正在修改、新建、操做 cells 数组中的元素会,会将其 cas 为 1,不然为0。wasUncontended
表示 cas 是否失败,若是失败则考虑操做升级。collide
是否冲突,若是冲突,则考虑扩容 cells 的长度。整个 for(;;) 死循环,都是以 cas 操做成功而了结。不然则会修改上述描述的几个标记位,从新进入循环。
因此整个循环包括以下几种状况:
cells 不为空
cells 为空且获取到 cellsBusy ,init cells 数组,而后赋值退出。
cellsBusy 获取失败,则进行 baseCas ,操做成功退出,不成功则重试。
至此 longAccumulate 就分析完了。之因此这个方法那么复杂,我认为有两个缘由
最后说说 LongAddr 的 sum() 方法,这个就很简单了。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
复制代码
就是遍历 cell 数组,累加 value 就行。LongAdder 余下的方法就比较简单,没有什么能够讨论的了。
看上去 LongAdder 性能全面超越了 AtomicLong。为何 jdk 1.8 中仍是保留了 AtomicLong 的实现呢?
其实咱们能够发现,LongAdder 使用了一个 cell 列表去承接并发的 cas,以提高性能,可是 LongAdder 在统计的时候若是有并发更新,可能致使统计的数据有偏差。
若是用于自增 id 的生成,就不适合使用 LongAdder 了。这个时候使用 AtomicLong 就是一个明智的选择。
而在 Sentinel 中 LongAdder 承担的只是统计任务,且容许偏差。
LongAdder 使用了一个比较简单的原理,解决了 AtomicLong 类,在极高竞争下的性能问题。可是 LongAdder 的具体实现却很是精巧和细致,分散竞争,逐步升级竞争的解决方案,至关漂亮,值得咱们细细品味。
欢迎关注个人微信公众号: