死磕 java并发包之LongAdder源码分析

问题

(1)java8中为何要新增LongAdder?java

(2)LongAdder的实现方式?数组

(3)LongAdder与AtomicLong的对比?多线程

简介

LongAdder是java8中新增的原子类,在多线程环境中,它比AtomicLong性能要高出很多,特别是写多的场景。app

它是怎么实现的呢?让咱们一块儿来学习吧。less

原理

LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时经过分段的思想,让不一样的线程更新不一样的段,最后把这些段相加就获得了完整的LongAdder存储的值。dom

LongAdder

源码分析

LongAdder继承自Striped64抽象类,Striped64中定义了Cell内部类和各重要属性。ide

主要内部类

// Striped64中的内部类,使用@sun.misc.Contended注解,说明里面的值消除伪共享
@sun.misc.Contended static final class Cell {
    // 存储元素的值,使用volatile修饰保证可见性
    volatile long value;
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe实例
    private static final sun.misc.Unsafe UNSAFE;
    // value字段的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell类使用@sun.misc.Contended注解,说明是要避免伪共享的。源码分析

使用Unsafe的CAS更新value的值,其中value的值使用volatile修饰,保证可见性。性能

关于Unsafe的介绍请查看【死磕 java魔法类之Unsafe解析】。学习

关于伪共享的介绍请查看【杂谈 什么是伪共享(false sharing)?】。

主要属性

// 这三个属性都在Striped64中
// cells数组,存储各个段的值
transient volatile Cell[] cells;
// 最初无竞争时使用的,也算一个特殊的段
transient volatile long base;
// 标记当前是否有线程在建立或扩容cells,或者在建立Cell
// 经过CAS更新该值,至关因而一个锁
transient volatile int cellsBusy;

最初无竞争或有其它线程在建立cells数组时使用base更新值,有过竞争时使用cells更新值。

最初无竞争是指一开始没有线程之间的竞争,但也有多是多线程在操做,只是这些线程没有同时去更新base的值。

有过竞争是指只要出现过竞争无论后面有没有竞争都使用cells更新值,规则是不一样的线程hash到不一样的cell上去更新,减小竞争。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可使LongAdder中存储的值增长x,x可为正可为负。

public void add(long x) {
    // as是Striped64中的cells属性
    // b是Striped64中的base属性
    // v是当前线程hash到的Cell中存储的值
    // m是cells的长度减1,hash时做为掩码使用
    // a是当前线程hash到的Cell
    Cell[] as; long b, v; int m; Cell a;
    // 条件1:cells不为空,说明出现过竞争,cells已经建立
    // 条件2:cas操做base失败,说明其它线程先一步修改了base,正在出现竞争
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示当前竞争还不激烈
        // false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容
        boolean uncontended = true;
        // 条件1:cells为空,说明正在出现竞争,上面是从条件2过来的
        // 条件2:应该不会出现
        // 条件3:当前线程所在的Cell为空,说明当前线程尚未更新过Cell,应初始化一个Cell
        // 条件4:更新当前线程所在的Cell失败,说明如今竞争很激烈,多个线程hash到了同一个Cell,应扩容
        if (as == null || (m = as.length - 1) < 0 ||
            // getProbe()方法返回的是线程中的threadLocalRandomProbe字段
            // 它是经过随机数生成的一个值,对于一个肯定的线程这个值是固定的
            // 除非刻意修改它
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 调用Striped64中的方法处理
            longAccumulate(x, null, uncontended);
    }
}

(1)最初无竞争时只更新base;

(2)直到更新base失败时,建立cells数组;

(3)当多个线程竞争同一个Cell比较激烈时,可能要扩容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 存储线程的probe值
    int h;
    // 若是getProbe()方法返回0,说明随机数未初始化
    if ((h = getProbe()) == 0) {
        // 强制初始化
        ThreadLocalRandom.current(); // force initialization
        // 从新获取probe值
        h = getProbe();
        // 都未初始化,确定还不存在竞争激烈
        wasUncontended = true;
    }
    // 是否发生碰撞
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // cells已经初始化过
        if ((as = cells) != null && (n = as.length) > 0) {
            // 当前线程所在的Cell未初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 当前无其它线程在建立或扩容cells,也没有线程在建立Cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                    // 新建一个Cell,值为当前须要增长的值
                    Cell r = new Cell(x);   // Optimistically create
                    // 再次检测cellsBusy,并尝试更新它为1
                    // 至关于当前线程加锁
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 是否建立成功
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 从新获取cells,并找到当前线程hash到cells数组中的位置
                            // 这里必定要从新获取cells,由于as并不在锁定范围内
                            // 有可能已经扩容了,这里要从新获取
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 把上面新建的Cell放在cells的j位置处
                                rs[j] = r;
                                // 建立成功
                                created = true;
                            }
                        } finally {
                            // 至关于释放锁
                            cellsBusy = 0;
                        }
                        // 建立成功了就返回
                        // 值已经放在新建的Cell里面了
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                // 标记当前未出现冲突
                collide = false;
            }
            // 当前线程所在的Cell不为空,且更新失败了
            // 这里简单地设为true,至关于简单地自旋一次
            // 经过下面的语句修改线程的probe再从新尝试
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次尝试CAS更新当前线程所在Cell的值,若是成功了就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 若是cells数组的长度达到了CPU核心数,或者cells扩容了
            // 设置collide为false并经过下面的语句修改线程的probe再从新尝试
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 上上个elseif都更新失败了,且上个条件不成立,说明出现冲突了
            else if (!collide)
                collide = true;
            // 明确出现冲突了,尝试占有锁,并扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 检查是否有其它线程已经扩容过了
                    if (cells == as) {      // Expand table unless stale
                        // 新数组为原数组的两倍
                        Cell[] rs = new Cell[n << 1];
                        // 把旧数组元素拷贝到新数组中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 从新赋值cells为新数组
                        cells = rs;
                    }
                } finally {
                    // 释放锁
                    cellsBusy = 0;
                }
                // 已解决冲突
                collide = false;
                // 使用扩容后的新数组从新尝试
                continue;                   // Retry with expanded table
            }
            // 更新失败或者达到了CPU核心数,从新生成probe,并重试
            h = advanceProbe(h);
        }
        // 未初始化过cells数组,尝试占有锁并初始化cells数组
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 检测是否有其它线程初始化过
                if (cells == as) {
                    // 新建一个大小为2的Cell数组
                    Cell[] rs = new Cell[2];
                    // 找到当前线程hash到数组中的位置并建立其对应的Cell
                    rs[h & 1] = new Cell(x);
                    // 赋值给cells数组
                    cells = rs;
                    // 初始化成功
                    init = true;
                }
            } finally {
                // 释放锁
                cellsBusy = 0;
            }
            // 初始化成功直接返回
            // 由于增长的值已经同时建立到Cell中了
            if (init)
                break;
        }
        // 若是有其它线程在初始化cells数组中,就尝试更新base
        // 若是成功了就返回
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

(1)若是cells数组未初始化,当前线程会尝试占有cellsBusy锁并建立cells数组;

(2)若是当前线程尝试建立cells数组时,发现有其它线程已经在建立了,就尝试更新base,若是成功就返回;

(3)经过线程的probe值找到当前线程应该更新cells数组中的哪一个Cell;

(4)若是当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置建立一个Cell;

(5)尝试CAS更新当前线程所在的Cell,若是成功就返回,若是失败说明出现冲突;

(5)当前线程更新Cell失败后并非当即扩容,而是尝试更新probe值后再重试一次;

(6)若是在重试的时候仍是更新失败,就扩容;

(7)扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;

(8)cellsBusy在建立cells数组、建立Cell、扩容cells数组三个地方用到;

sum()方法

sum()方法是获取LongAdder中真正存储的值的大小,经过把base和全部段相加获得。

public long sum() {
    Cell[] as = cells; Cell a;
    // sum初始等于base
    long sum = base;
    // 若是cells不为空
    if (as != null) {
        // 遍历全部的Cell
        for (int i = 0; i < as.length; ++i) {
            // 若是所在的Cell不为空,就把它的value累加到sum中
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 返回sum
    return sum;
}

能够看到sum()方法是把base和全部段的值相加获得,那么,这里有一个问题,若是前面已经累加到sum上的Cell的value有修改,不是就无法计算到了么?

答案确实如此,因此LongAdder能够说不是强一致性的,它是最终一致性的。

LongAdder VS AtomicLong

直接上代码:

public class LongAdderVSAtomicLongTest {
    public static void main(String[] args){
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times){
        try {
            System.out.println("threadCount:" + threadCount + ", times:" + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }
}

运行结果以下:

threadCount:1, times:10000000
LongAdder elapse:158ms
AtomicLong elapse:64ms
threadCount:10, times:10000000
LongAdder elapse:206ms
AtomicLong elapse:2449ms
threadCount:20, times:10000000
LongAdder elapse:429ms
AtomicLong elapse:5142ms
threadCount:40, times:10000000
LongAdder elapse:840ms
AtomicLong elapse:10506ms
threadCount:80, times:10000000
LongAdder elapse:1369ms
AtomicLong elapse:20482ms

能够看到当只有一个线程的时候,AtomicLong反而性能更高,随着线程愈来愈多,AtomicLong的性能急剧降低,而LongAdder的性能影响很小。

总结

(1)LongAdder经过base和cells数组来存储值;

(2)不一样的线程会hash到不一样的cell上去更新,减小了竞争;

(3)LongAdder的性能很是高,最终会达到一种无竞争的状态;

彩蛋

在longAccumulate()方法中有个条件是n >= NCPU就不会走到扩容逻辑了,而n是2的倍数,那是否是表明cells数组最大只能达到大于等于NCPU的最小2次方?

答案是明确的。由于同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不一样的核心更新了同一个Cell,这时会从新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大几率会发生改变,若是运行的时间足够长,最终会出现同一个核心的全部线程都会hash到同一个Cell(大几率,但不必定全在一个Cell上)上去更新,因此,这里cells数组中长度并不须要太长,达到CPU核心数足够了。

好比,笔者的电脑是8核的,因此这里cells的数组最大只会到8,达到8就不会扩容了。

LongAdder


欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。

qrcode

相关文章
相关标签/搜索