对LongAdder
的最初了解是从Coolshell上的一篇文章中得到的,可是一直都没有深刻的了解过其实现,只知道它相较于AtomicLong
来讲,更加适合写多读少的并发情景。今天,咱们就研究一下LongAdder
的原理,探究一下它如此高效的缘由。算法
Java有不少并发控制机制,好比说以AQS为基础的锁或者以CAS为原理的自旋锁。不了解AQS的朋友能够阅读我以前的AQS源码解析文章。通常来讲,CAS适合轻量级的并发操做,也就是并发量并很少,并且等待时间不长的状况,不然就应该使用普通锁,进入阻塞状态,避免CPU空转。shell
因此,若是你有一个Long类型的值会被多线程修改,那么使用CAS进行并发控制比较好,可是若是你是须要锁住一些资源,而后进行数据库操做,那么仍是使用阻塞锁比较好。数据库
第一种状况下,咱们通常都使用AtomicLong
。AtomicLong
是经过无限循环不停的采起CAS的方法去设置内部的value,直到成功为止。那么当并发数比较多或出现更新热点时,就会致使CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,造成恶性循环,从而下降了效率。编程
而LongAdder的原理就是下降对value更新的并发数,也就是将对单一value的变动压力分散到多个value值上,下降单个value的“热度”。数组
咱们知道LongAdder
的大体原理以后,再来详细的了解一下它的具体实现,其中也有不少值得借鉴的并发编程的技巧。bash
LongAdder
是Striped64
的子类,其有三个比较重要的成员函数,在以后的函数分析中须要使用到,这里先说明一下。多线程
// CPU的数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell对象的数组,长度通常是2的指数
transient volatile Cell[] cells;
// 基础value值,当并发较低时,只累加该值
transient volatile long base;
// 建立或者扩容Cells数组时使用的自旋锁变量
transient volatile int cellsBusy;
复制代码
cells
是LongAdder
的父类Striped64
中的Cell
数组类型的成员变量。每一个Cell
对象中都包含一个value值,并提供对这个value值的CAS操做。并发
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
}
复制代码
咱们首先来看一下LongAdder
的add
函数,其会屡次尝试CAS操做将值进行累加,若是成功了就直接返回,失败则继续执行。代码比较复杂,并且涉及的状况比较多,咱们就以梳理历次尝试CAS操做为主线,讲清楚这些CAS操做的前提条件和场景。app
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 当cells数组为null时,会进行第一次cas操做尝试。
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 当cells数组不为null,而且经过getProbe() & m
// 定位的Cell对象不为null时进行第二次CAS操做。
// 若是执行不成功,则进入longAccumulate函数。
longAccumulate(x, null, uncontended);
}
}
复制代码
当并发量较少时,cell数组还没有初始化,因此只调用casBase
函数,对base变量进行CAS累加。dom
咱们来看一下casBase
函数相关的源码吧。咱们能够认为变量base
就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,若是成功了,就不须要在进行下面比较复杂的算法,
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
复制代码
当并发量逐渐提升时,casBase
函数会失败。若是cells数组为null或为空,就直接调用longAccumulate
方法。由于cells为null或在为空,说明cells未初始化,因此调用longAccumulate
进行初始化。不然继续判断。 若是cells中已经初始化,就继续进行后续判断。咱们先来理解一下getProbe() & m
的这个操做吧,能够把这个操做看成一次计算"hash"值,而后将cells中这个位置的Cell对象赋值给变量a。若是变量a不为null,那么就调用该对象的cas方法去设置其value值。若是a为null,或在cas赋值发生冲突,那么调用longAccumulate
方法。
longAccumulate
函数比较复杂,带有个人注释的代码已经贴在了文章后边,这里咱们就只讲一下其中比较关键的一些技巧和思想。
首先,咱们都知道只有当对base
的cas操做失败以后,LongAdder
才引入Cell
数组.因此在longAccumulate
中就是对Cell
数组进行操做,分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操做。
在这段代码中,关于cellBusy
的cas操做构成了一个SpinLock,这就是经典的SpinLock的编程技巧,你们能够学习一下。
咱们先来看一下longAccumulate
的主体代码,首先是一个无限for循环,而后根据cells数组的状态来判断是要进行cells数组的初始化,仍是进行对象添加或者扩容。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
//获取PROBE变量,探针变量,与当前运行的线程相关,不一样线程不一样
ThreadLocalRandom.current();
//初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操做。
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) { //cas经典无限循环,不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// cells不为null,而且数组size大于0,表示cells已经初始化了
// 初始化Cell对象并设置到数组中或者进行数组扩容
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//cells数组未初始化,得到cellsBusy lock,进行cells数组的初始化
// cells数组初始化操做
}
//若是初始化数组失败了,那就再次尝试一下直接cas base变量,
// 若是成功了就直接返回,这是最后一个进行CAS操做的地方。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
复制代码
进行Cell数组代码以下所示,它首先调用casCellsBusy
函数获取了cellsBusy
‘锁’,而后进行数组的初始化操做,最后将cellBusy
'锁'释放掉。
// 注意在进入这段代码以前已经casCellsBusy得到cellsBusy这个锁变量了。
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x); //设置x的值为cell对象的value值
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
复制代码
若是Cell数组已经初始化过了,那么就进行Cell数组的设置或者扩容。这部分代码有一系列的if else的判断,若是前一个条件不成立,才会进入下一条判断。
首先,当Cell数组中对应位置的cell对象为null时,代表该位置的Cell对象须要进行初始化,因此使用casCellsBusy
函数获取'锁',而后初始化Cell对象,而且设置进cells数组,最后释放掉'锁'。
当Cell数组中对应位置的cell对象不为null,则直接调用其cas操做进行累加。
当上述操做都失败后,认为多个线程在对同一个位置的Cell对象进行操做,这个Cell对象是一个“热点”,因此Cell数组须要进行扩容,将热点分散。
if ((a = as[(n - 1) & h]) == null) { //经过与操做计算出来须要操做的Cell对象的坐标
if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
//当cellsBusy为0时,表示当前能够对cells数组进行操做。
Cell r = new Cell(x);//将x值直接赋值给Cell对象
if (cellsBusy == 0 && casCellsBusy()) {//若是这个时候cellsBusy仍是0
//就cas将其设置为非0,若是成功了就是得到了spinLock的锁.能够对cells数组进行操做.
//若是失败了,就会再次执行一次循环
boolean created = false;
try {
Cell[] rs; int m, j;
//判断cells是否已经初始化,而且要操做的位置上没有cell对象.
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将以前建立的值为x的cell对象赋值到cells数组的响应位置.
created = true;
}
} finally {
//经典的spinLock编程技巧,先得到锁,而后try finally将锁释放掉
//将cellBusy设置为0就是释放锁.
cellsBusy = 0;
}
if (created)
break; //若是建立成功了,就是使用x建立了新的cell对象,也就是新建立了一个分担热点的value
continue;
}
}
collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操做失败
wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
 //fn是操做类型,若是是空,就是相加,因此让a这个cell对象中的value值和x相加,而后在cas设置,若是成果
//就直接返回
break;
else if (n >= NCPU || cells != as)
  //若是cells数组的大小大于系统的可得到处理器数量或在as再也不和cells相等.
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次得到cellsBusy这个spinLock,对数组进行resize
try {
if (cells == as) {//要再次检测as是否等于cells以避免其余线程已经对cells进行了操做.
Cell[] rs = new Cell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;//赋予cells一个新的数组对象
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);//因为使用当前探针变量没法操做成功,因此从新设置一个,再次尝试
复制代码
本篇文章写的不是很好,我写完以后又看了一遍coolshell上的关于LongAdder
的文章,感受本身没有人家写的那么简洁明了。我对代码细节的注释和投入太多了。其实不少代码你们均可以看懂,并不须要大量的代码片断加注释。之后要注意一下。以后会接着研究一下JUC包中的其余类,但愿你们多多关注。