【转载】jdk1.8 LongAdder源码学习

本文转自 https://blog.csdn.net/u011392897/article/details/60480108 java

LongAdder是jdk8新增的用于并发环境的计数器,目的是为了在高并发状况下,代替AtomicLong/AtomicInt,成为一个用于高并发状况下的高效的通用计数器。

高并发下计数,通常最早想到的应该是AtomicLong/AtomicInt,AtmoicXXX使用硬件级别的指令 CAS 来更新计数器的值,这样能够避免加锁,机器直接支持的指令,效率也很高。可是AtomicXXX中的 CAS 操做在出现线程竞争时,失败的线程会白白地循环一次,在并发很大的状况下,由于每次CAS都只有一个线程能成功,竞争失败的线程会很是多。失败次数越多,循环次数就越多,不少线程的CAS操做愈来愈接近 自旋锁(spin lock)。计数操做原本是一个很简单的操做,实际须要耗费的cpu时间应该是越少越好,AtomicXXX在高并发计数时,大量的cpu时间都浪费会在 自旋 上了,这很浪费,也下降了实际的计数效率。数组

// 很简单的一个类,这个类能够当作是一个简化的AtomicLong
// 经过cas操做来更新value的值
// @sun.misc.Contended是一个高端的注解,表明使用缓存行填来避免伪共享,能够本身网上搜下,这个我就不细说了
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics Unsafe相关的初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
说LongAdder比在高并发时比AtomicLong更高效,这么说有什么依据呢?LongAdder是根据ConcurrentHashMap这类为并发设计的类的基本原理——锁分段,来实现的,它里面维护一组按需分配的计数单元,并发计数时,不一样的线程能够在不一样的计数单元上进行计数,这样减小了线程竞争,提升了并发效率。本质上是用空间换时间的思想,不过在实际高并发状况中消耗的空间能够忽略不计。
如今,在处理高并发计数时,应该优先使用LongAdder,而不是继续使用AtomicLong。固然,线程竞争很低的状况下进行计数,使用Atomic仍是更简单更直接,而且效率稍微高一些。
其余状况,好比序号生成,这种状况下须要准确的数值,全局惟一的AtomicLong才是正确的选择,此时不该该使用LongAdder。
 
下面简要分析下LongAdder的源码,有了ConcurrentHashMap(LongAdder比较像1.6和1.7的,能够看下1.7的)的基础,这个类的源码看起来也不复杂。
1、类的关系

 

 

公共父类Striped64是实现中的核心,它实现一些核心操做,处理64位数据,很容易就能转化为其余基本类型,是个通用的类。二元算术运算累积,指的是你能够给它提供一个二元算术方式,这个类按照你提供的方式进行算术计算,并保存计算结果。二元运算中第一个操做数是累积器中某个计数单元当前的值,另一个值是外部提供的。
举几个例子:
假设每次操做都须要把原来的数值加上某个值,那么二元运算为 (x, y) -> x+y,这样累积器每次都会加上你提供的数字y,这跟LongAdder的功能基本上是同样的;
假设每次操做都须要把原来的数值变为它的某个倍数,那么能够指定二元运算为 (x, y) -> x*y,累积器每次都会乘以你提供的数字y,y=2时就是一般所说的每次都翻一倍;
假设每次操做都须要把原来的数值变成它的5倍,再加上3,再除以2,再减去4,再乘以你给定的数,最后还要加上6,那么二元运算为 (x, y) -> ((x*5+3)/2 - 4)*y +6,累积器每次累积操做都会按照你说的作;
......
LongAccumulator是标准的实现类,LongAdder是特化的实现类,它的功能等价于LongAccumulator((x, y) -> x+y, 0L)。它们的区别很简单,前者能够进行任何二元算术操做,后者只能进行加减两种算术操做。
Double版本是Long版本的简单改装,相对Long版本,主要的变化就是用Double.longBitsToDouble 和Double.doubleToRawLongBits对底层的8字节数据进行long <---> double转换,存储的时候使用long型,计算的时候转化为double型。这是由于CAS是sun.misc.Unsafe中提供的操做,只对int、long、对象类型(引用或者指针)提供了这种操做,其余类型都须要转化为这三种类型才能进行CAS操做。这里的long型也能够认为是8字节的原始类型,由于把它视为long类型是无心义的。java中没有C语言中的 void* 无类型(或者叫原始类型),只能用最接近的long类型来代替。
 

四个实现类的区别就上面这两句话,这里只讲LongAdder一个类。缓存

 

2、核心实现Striped64多线程

四个类的核心实现都在Striped64中,这个类使用分段的思想,来尽可能平摊并发压力。相似1.7及之前版本的ConcurrentHashMap.Segment,Striped64中使用了一个叫Cell的类,是一个普通的二元算术累积单元,线程也是经过hash取模操做映射到一个Cell上进行累积。为了加快取模运算效率,也把Cell数组的大小设置为2^n,同时大量使用Unsafe提供的底层操做。基本的实现桶1.7的ConcurrentHashMap很是像,并且更简单。
 
一、累积单元Cell
看到这里我想了一个看似简单的问题:既然Cell这么简单,只有一个long型变量,为何不直接用long value?
首先声明下,Unsafe提供的操做很强大,也能对数组的元素进行volatile读写,同时数组计算某个元素的offset偏移量自己就很简单,所以volatile、cas这种站不住脚。这个问题下面一点再进行解答。
// 很简单的一个类,这个类能够当作是一个简化的AtomicLong
// 经过cas操做来更新value的值
// @sun.misc.Contended是一个高端的注解,表明使用缓存行填来避免伪共享,能够本身网上搜下,这个我就不细说了
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics Unsafe相关的初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

二、Striped64主体代码并发

abstract class Striped64 extends Number {
    @sun.misc.Contended static final class Cell { ... }
 
    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
 
    // cell数组,长度同样要是2^n,能够类比为jdk1.7的ConcurrentHashMap中的segments数组
    transient volatile Cell[] cells;
 
    // 累积器的基本值,在两种状况下会使用:
    // 一、没有遇到并发的状况,直接使用base,速度更快;
    // 二、多线程并发初始化table数组时,必需要保证table数组只被初始化一次,所以只有一个线程可以竞争成功,这种状况下竞争失败的线程会尝试在base上进行一次累积操做
    transient volatile long base;
 
    // 自旋标识,在对cells进行初始化,或者后续扩容时,须要经过CAS操做把此标识设置为1(busy,忙标识,至关于加锁),取消busy时能够直接使用cellsBusy = 0,至关于释放锁
    transient volatile int cellsBusy;
 
    Striped64() {
    }
 
    // 使用CAS更新base的值
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
 
    // 使用CAS将cells自旋标识更新为1
    // 更新为0时能够不用CAS,直接使用cellsBusy就行
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
 
    // 下面这两个方法是ThreadLocalRandom中的方法,不过由于包访问关系,这里又从新写一遍
 
    // probe翻译过来是探测/探测器/探针这些,很差理解,它是ThreadLocalRandom里面的一个属性,
    // 不过并不影响对Striped64的理解,这里能够把它理解为线程自己的hash值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
 
    // 至关于rehash,从新算一遍线程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
 
    /**
     * 核心方法的实现,此方法建议在外部进行一次CAS操做(cell != null时尝试CAS更新base值,cells != null时,CAS更新hash值取模后对应的cell.value)
     * @param x the value 前面我说的二元运算中的第二个操做数,也就是外部提供的那个操做数
     * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder).
     *     外部提供的二元算术操做,实例持有而且只能有一个,生命周期内保持不变,null表明LongAdder这种特殊可是最经常使用的状况,能够减小一次方法调用
     * @param wasUncontended false if CAS failed before call 若是为false,代表调用者预先调用的一次CAS操做都失败了
     */
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        int h;
        // 这个if至关于给线程生成一个非0的hash值
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false; // True if last slot nonempty 若是hash取模映射获得的Cell单元不是null,则为true,此值也能够看做是扩容意向,感受这个更好理解
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // cells已经被初始化了
                if ((a = as[(n - 1) & h]) == null) { // hash取模映射获得的Cell单元还为null(为null表示尚未被使用)
                    if (cellsBusy == 0) {       // Try to attach new Cell 若是没有线程正在执行扩容
                        Cell r = new Cell(x);   // Optimistically create 先建立新的累积单元
                        if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
                            boolean created = false;
                            try {               // Recheck under lock 在有锁的状况下再检测一遍以前的判断
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考虑别的线程可能执行了扩容,这里从新赋值从新判断
                                    rs[j] = r; // 对没有使用的Cell单元进行累积操做(第一次赋值至关因而累积上一个操做数,求和时再和base执行一次运算就获得实际的结果)
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; 清空自旋标识,释放锁
                            }
                            if (created) // 若是本来为null的Cell单元是由本身进行第一次累积操做,那么任务已经完成了,因此能够退出循环
                                break;
                            continue;           // Slot is now non-empty 不是本身进行第一次累积操做,重头再来
                        }
                    }
                    collide = false; // 执行这一句是由于cells被加锁了,不能往下继续执行第一次的赋值操做(第一次累积),因此还不能考虑扩容
                }
                else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争
                    wasUncontended = true; // Continue after rehash 状况失败标识,后面去从新算一遍线程的hash值
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试CAS更新a.value(进行一次累积) ------ 标记为分支A
                    break; // 成功了就完成了累积任务,退出循环
                else if (n >= NCPU || cells != as) // cell数组已是最大的了,或者中途发生了扩容操做。由于NCPU不必定是2^n,因此这里用 >=
                    collide = false; // At max size or stale 长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,下面两个else if就永远不会被执行了,也就永远不会再进行扩容
                                     // CPU可以并行的CAS操做的最大数量是它的核心数(CAS在x86中对应的指令是cmpxchg,多核须要经过锁缓存来保证总体原子性),当n >= NCPU时,再出现几个线程映射到同一个Cell致使CAS竞争的状况,那就真不关扩容的事了,彻底是hash值的锅了
                else if (!collide) // 映射到的Cell单元不是null,而且尝试对它进行累积时,CAS竞争失败了,这时候把扩容意向设置为true
                                   // 下一次循环若是仍是跟这一次同样,说明竞争很严重,那么就真正扩容
                    collide = true; // 把扩容意向设置为true,只有这里才会给collide赋值为true,也只有执行了这一句,才可能执行后面一个else if进行扩容
                else if (cellsBusy == 0 && casCellsBusy()) { // 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为分支B
                    try {
                        if (cells == as) {      // Expand table unless stale 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍)
                            Cell[] rs = new Cell[n << 1]; // 执行2倍扩容
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0; // 释放锁
                    }
                    collide = false; // 扩容意向为false
                    continue; // Retry with expanded table 扩容后重头再来
                }
                h = advanceProbe(h); // 从新给线程生成一个hash值,下降hash冲突,减小映射到同一个Cell致使CAS竞争的状况
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells没有被加锁,而且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) { // CAS避免不了ABA问题,这里再检测一次,若是仍是null,或者空数组,那么就执行初始化
                        Cell[] rs = new Cell[2]; // 初始化时只建立两个单元
                        rs[h & 1] = new Cell(x); // 对其中一个单元进行累积操做,另外一个无论,继续为null
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0; // 清空自旋标识,释放锁
                }
                if (init) // 若是某个本来为null的Cell单元是由本身进行第一次累积操做,那么任务已经完成了,因此能够退出循环
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在进行初始化时,尝试直接在base上进行累加操做
                break;                          // Fall back on using base 直接在base上进行累积操做成功了,任务完成,能够退出循环了
        }
    }
 
    // double的不讲,更long的逻辑基本上是同样的
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);
 
    // Unsafe mechanics Unsafe初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
}

 

看完这个在来看看第一点中我提的问题:既然Cell这么简单,为何不直接用long value?app

先看看我特别标明的两个分支:分支A是用CAS更新对应的cell.value,是个写操做,分支B是进行扩容。
ConcurrentHashMap中,扩容和写操做是会严格处理的,在一个分段锁管辖区内,不会出现扩容和写操做并发:1.6和1.7的扩容操做都是在put内部执行的,put自己就会加锁,所以扩容进行时会阻塞对同一个Segment的写操做;1.8中扩容时,put/remove等方法若是遇见正在其余线程正在执行扩容,会去帮助扩容,扩容完成了以后才会去尝试加锁执行真正的写操做。
虽然B分支会进行”加锁“,可是A操做跟cellsBusy无关,”加锁“并不由止A操做的执行。AB两个分支是不互斥的, 所以Striped64这里会出现A分支的写操做,和B分支扩容操做并发执行的状况。
那么问题是:为何这么并发执行没问题?
仔细看看A操做,就明白了。A操做使用CAS更新Cell对象中的某个属性,并不改变数组持有的Cell对象的引用,扩容操做进行的是数组持有的Cell对象引用的复制,复制后引用指向的仍是原来的那个Cell对象。
举个例子就是,旧的cell数组,叫做old,old[1] = cellA,cellA.value = 1,扩容后的新数组,叫做new,任然有new[1] = cellA。A分支实际上执行的是cellA.value = 2,不管分支A和B怎么并发执行,执行完成后新数组都能看到分支A对Cell的改变,扩容先后实际上数组持有的是同一群Cell对象。
这下就知道为何不直接用long变量代替Cell对象了吧。long[]进行复制时,两个数组完彻底全分离了,A分支直接做用在旧数组上,B分支扩容后,看不到串行复制执行后对旧数组同一位置的改变。举个例子就是,old[1]=10,A分支要把old[1]更新为11,这时候B分支已经复制到old[5]了,A分支执行完成后,B分支建立的新数组new[1]可能仍是10(无论是多少,反正没记录A分支的操做),这样A分支的操做就被遗失了,程序会有问题。
下面简单画了个示意图,能够看看。



3、LongAdder
看完了Striped64的讲解,这部分就很简单了,只是一些简单的封装。
public class LongAdder extends Striped64 implements Serializable {
 
    // 构造方法,什么也不作,直接使用默认值,base = 0, cells = null
    public LongAdder() {
    }
 
    // add方法,根据父类的longAccumulate方法的要求,这里要进行一次CAS操做
    // (虽然这里有两个CAS,可是第一个CAS成功了就不会执行第二个,要执行第二个,第一个就被“短路”了不会被执行)
    // 在线程竞争不激烈时,这样作更快
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
 
    public void increment() {
        add(1L);
    }
 
    public void decrement() {
        add(-1L);
    }
 
    // 返回累加的和,也就是“当前时刻”的计数值
    // 此返回值可能不是绝对准确的,由于调用这个方法时还有其余线程可能正在进行计数累加,
    //     方法的返回时刻和调用时刻不是同一个点,在有并发的状况下,这个值只是近似准确的计数值
    // 高并发时,除非全局加锁,不然得不到程序运行中某个时刻绝对准确的值,可是全局加锁在高并发状况下是下下策
    // 在不少的并发场景中,计数操做并非核心,这种状况下容许计数器的值出现一点误差,此时可使用LongAdder
    // 在必须依赖准确计数值的场景中,应该本身处理而不是使用通用的类
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
 
    // 重置计数器,只应该在明确没有并发的状况下调用,能够用来避免从新new一个LongAdder
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
 
    // 至关于sum()后再调用reset()
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
 
    // 其余的不说了
}

 

 
简单总结下:
这个类是jdk1.8新增的类,目的是为了提供一个通用的,更高效的用于并发场景的计数器。能够网上搜下一些关于LongAdder的性能测试,有不少现成的,我本身就不写了。
jdk1.8的ConcurrentHashMap中,没有再使用Segment,使用了一个简单的仿造LongAdder实现的计数器,这样可以保证计数效率不低于使用Segment的效率。
相关文章
相关标签/搜索