并发编程——多线程计数的更优解:LongAdder原理分析

前言

最近在学习ConcurrentHashMap的源码,发现它采用了一种比较独特的方式对map中的元素数量进行统计,天然是要好好研究一下其原理思想,同时也能更好地理解ConcurrentHashMap自己。java

本文主要思路分为如下4个部分算法

1.计数的使用效果编程

2.原理的直观图解数组

3.源码的细节分析安全

4.与AtomicInteger的比较多线程

5.思想的抽象并发

学习的入口天然是map的put方法app

public V put(K key, V value) {
    return putVal(key, value, false);
}

查看putVal方法dom

这里并不对ConcurrentHashMap自己的原理做过多讨论,所以咱们直接跳到计数部分ide

final V putVal(K key, V value, boolean onlyIfAbsent) {
    ...
    addCount(1L, binCount);
    return null;
}

每当成功添加一个元素以后,都会调用addCount方法进行数量的累加1的操做,这就是咱们研究的目标

由于ConcurrentHashMap的设计初衷就是为了解决多线程并发场景下的map操做,所以在做数值累加的时候天然也要考虑线程安全

固然,多线程数值累加通常是学习并发编程的第一课,自己并不是很复杂,能够采用AtomicInteger或者锁等等方式来解决该问题

然而若是咱们查看该方法,就会发现,一个想来应该比较简单的累加方法,其逻辑看上去却至关复杂

这里我只贴出了累加算法的核心部分

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    ...
}

咱们就来研究一下该逻辑的实现思路。而这个思路实际上是照搬了LongAdder类的逻辑,所以咱们直接查看该算法的原始类

1.LongAdder类的使用

咱们先看下LongAdder的使用效果

LongAdder adder = new LongAdder();
int num = 0;

@Test
public void test5() throws InterruptedException {
    Thread[] threads = new Thread[10];
    for (int i = 0; i < 10; i++) {
        threads[i] = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                adder.add(1);
                num += 1;
            }
        });
        threads[i].start();
    }
    for (int i = 0; i < 10; i++) {
        threads[i].join();
    }
    System.out.println("adder:" + adder);
    System.out.println("num:" + num);
}

输出结果

adder:100000
num:40982

能够看到adder在使用效果上是能够保证累加的线程安全的

2.LongAdder原理的直观理解

为了更好地对源码进行分析,咱们须要先从直觉上理解它的原理,不然直接看代码的话会一脸懵逼

LongAdder的计数主要分为2个对象

一个long类型的字段:base

一个Cell对象数组,Cell对象中就维护了一个long类型的字段value,用来计数

/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

/**
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;
1

当没有发生线程竞争的时候,累加都会发生在base字段上,这就至关因而一个单线程累加2次,只不过base的累加是一个cas操做

1

当发生线程竞争的时候,必然有一个线程对base的cas累加操做失败,因而它先去判断Cell是否已经被初始化了,若是没有则初始化一个长度为2的数组,并根据线程的hash值找到对应的数组索引,并对该索引的Cell对象中的value值进行累加(这个累加也是cas的操做)

1

若是一共有3个线程发生了竞争,那么其中第一个线程对base的cas累加成功,剩下2个线程都须要去对Cell数组中的元素进行累加。由于对Cell中value值的累加也是一个cas操做,若是第二个线程和第三个线程的hash值对应的数组下标是同一个,那么一样会发生竞争,若是第二个线程成功了,第三个线程就会去rehash本身的hash值,若是获得的新的hash值对应的是另外一个元素为null的数组下标,那么就new一个Cell对象并对value值进行累加

1

若是此时有线程4同时参与竞争,那么对于线程4来讲,即便rehash后仍是可能在和线程3的竞争过程当中cas失败,此时若是当前数组的容量小于系统可用的cpu的数量,那么它就会对数组进行扩容,以后再次rehash,重复尝试对Cell数组中某个下标对象的累加

1

以上就是总体直觉上的理解,然而代码中还有不少细节的设计很是值得学习,因此咱们就开始进入源码分析的环节

3.源码分析

入口方法是add

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    /**
     * 这里优先判断了cell数组是否为空,以后才判断base字段的cas累加
     * 意味着若是线程不发生竞争,cell数组一直为空,那么全部的累加操做都会累加到base上
     * 而一旦发生过一次竞争致使cell数组不为空,那么全部的累加操做都会优先做用于数组中的对象上
     */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        /**
         * 这个字段是用来标识在对cell数组中的对象进行累加操做时是否发生了竞争
         * 若是发生了竞争,那么在longAccumulate方法中会多进行一次rehash的自旋
         * 这个在后面的方法中详细说明,这里先有个印象
         * true表示未发生竞争
         */
        boolean uncontended = true;
        /**
         * 若是cell数组为空或者长度为0则直接进入主逻辑方法
         */
        if (as == null || (m = as.length - 1) < 0 ||
                /**
                 * 这里的getProbe()方法能够认为就是获取线程的hash值
                 * hash值与(数组长度-1)进行位与操做后获得对应的数组下标
                 * 判断该元素是否为空,若是不为空那么就会尝试累加
                 * 不然进入主逻辑方法
                 */
                (a = as[getProbe() & m]) == null ||
                /**
                 * 对数组下标的元素进行cas累加,若是成功了,那么就能够直接返回
                 * 不然进入主逻辑方法
                 */
                !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

当不发生线程竞争的时候,那累加操做就会由第一个if中的casBase负责,对应以前图解的状况一

当发生线程竞争以后,累加操做就会由cell数组负责,对应以前图解的状况二(数组的初始化在longAccumulate方法中)

接着咱们查看主逻辑方法,由于方法比较长,因此我会一段一段拿出来解析

longAccumulate方法

签名中的参数

x表示须要累加的值

fn表示须要如何累加,通常传null就行,不重要

wasUncontended表示是否在外层方法遇到了竞争失败的状况,由于外层的判断逻辑是多个“或”(as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null),因此若是数组为空或者相应的下标元素还未初始化,这个字段就会保持false

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
  ...
}

首先判断线程的hash值是否为0,若是为0则须要作一个初始化,即rehash

以后会将wasUncontended置为true,由于即便以前是冲突过的,通过rehash后就会先假设它能找到一个元素不冲突的数组下标

int h;//线程的hash值,在后面的逻辑中会用到
if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
}

以后是一个死循环,死循环中有3个大的if分支,这3个分支的逻辑做用于数组未初始化的时候,一旦数组初始化完成,那么就都会进入主逻辑了,所以我这里把主逻辑抽取出来放到后面单独说,也能够避免外层分支对思路的影响

/**
 * 用来标记某个线程在上一次循环中找到的数组下标是否已经有Cell对象了
 * 若是为true,则表示数组下标为空
 * 在主逻辑的循环中会用到
 */
boolean collide = false;
/**
 * 死循环,提供自旋操做
 */
for (; ; ) {
    Cell[] as;
    Cell a;
    int n;//cell数组长度
    long v;//须要被累积的值
    /**
     * 若是cells数组不为空,且已经被某个线程初始化成功,那么就会进入主逻辑,这个后面详细解释
     */
    if ((as = cells) != null && (n = as.length) > 0) {
        ...
        /**
         * 若是数组为空,那么就须要初始化一个Cell数组
         * cellsBusy用来标记cells数组是否能被操做,做用至关于一个锁
         * cells == as 判断是否有其余线程在当前线程进入这个判断以前已经初始化了一个数组
         * casCellsBusy 用一个cas操做给cellsBusy字段赋值为1,若是成功能够认为拿到了操做cells数组的锁
         */
    } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
        /**
         * 这里就是初始化一个数组,不解释了
         */
        boolean init = false;
        try {                           
            if (cells == as) {
                Cell[] rs = new Cell[2];
                rs[h & 1] = new Cell(x);
                cells = rs;
                init = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (init)
            break;
        /**
         * 若是当前数组是空的,又没有竞争过其余线程
         * 那么就再次尝试去给base赋值
         * 若是又没竞争过(感受有点可怜),那么就自旋
         * 另外提一下方法签名中的LongBinaryOperator对象就是用在这里的,不影响逻辑
         */
    } else if (casBase(v = base, ((fn == null) ? v + x :
            fn.applyAsLong(v, x))))
        break;                          // Fall back on using base
}

接着就看对cell数组元素进行累加的主逻辑

/**
 * 若是cells数组不为空,且已经被某个线程初始化成功,进入主逻辑
 */
if ((as = cells) != null && (n = as.length) > 0) {
    /**
     * 若是当前线程的hash值对应的数组元素为空
     */
    if ((a = as[(n - 1) & h]) == null) {
        /**
         * Cell数组并未被其余线程操做
         */
        if (cellsBusy == 0) {
            /**
             * 这里没有理解做者为何会在这里初始化单个Cell
             * 做者这里的注释是Optimistically create,若是有理解的同窗能够说一下
             */
            Cell r = new Cell(x);
            /**
             * 在此判断cell锁的状态,并尝试加锁
             */
            if (cellsBusy == 0 && casCellsBusy()) {
                boolean created = false;
                try {
                    /**
                     * 这里对数组是否为空等状态再次进行校验
                     * 若是校验经过,那么就将以前new的Cell对象放到Cell数组的该下标处
                     */
                    Cell[] rs;
                    int m, j;
                    if ((rs = cells) != null &&
                            (m = rs.length) > 0 &&
                            rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                /**
                 * 若是建立成功,就说明累加成功,直接退出循环
                 */
                if (created)
                    break;
                /**
                 * 走到这里说明在判空和拿到锁之间正好有其余线程在该下标处建立了一个Cell
                 * 所以直接continue,不rehash,下次就不会进入到该分支了
                 */
                continue;
            }
        }
        /**
         * 当执行到这里的时候,由于是在 if ((a = as[(n - 1) & h]) == null) 这个判断逻辑中
         * 就说明在第一个if判断的时候该下标处没有元素,因此赋值为false
         * collide的意义是:上一次循环中找到的数组下标是否已经有Cell对象了
         * True if last slot nonempty
         */
        collide = false;
    /**
     * 这个字段若是为false,说明以前已经和其余线程发过了竞争
     * 即便此时能够直接取尝试cas操做,可是在高并发场景下
     * 这2个线程以后依然可能发生竞争,而每次竞争都须要自旋的话会很浪费cpu资源
     * 所以在这里先直接增长自旋一次,在for的最后会作一次rehash
     * 使得线程尽快地找到本身独占的数组下标
     */
    } else if (!wasUncontended) 
        wasUncontended = true;
    /**
     * 尝试给hash对应的Cell累加,若是这一步成功了,那么就返回
     * 若是这一步依然失败了,说明此时总体的并发竞争很是激烈
     * 那就可能须要考虑扩容数组了
     * (由于数组初始化容量为2,若是此时有10个线程在并发运行,那就很难避免竞争的发生了)
     */
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
            fn.applyAsLong(v, x))))
        break;
    /**
     * 这里判断下cpu的核数,由于即便有100个线程
     * 能同时并行运行的线程数等于cpu数
     * 所以若是数组的长度已经大于cpu数目了,那就不该当再扩容了
     */
    else if (n >= NCPU || cells != as)
        collide = false;
    /**
     * 走到这里,说明当前循环中根据线程hash值找到的数组下标已经有元素了
     * 若是此时collide为false,说明上一次循环中找到的下边是没有元素的
     * 那么就自旋一次并rehash
     * 若是再次运行到这里,而且collide为true,就说明明竞争很是激烈,应当扩容了
     */
    else if (!collide)
        collide = true;
    /**
     * 能运行到这里,说明须要扩容数组了
     * 判断锁状态并尝试获取锁
     */
    else if (cellsBusy == 0 && casCellsBusy()) {
        /**
         * 扩容数组的逻辑,这个扩容比较简单,就不解释了
         * 扩容大小为2倍
         */
        try {
            if (cells == as) { 
                Cell[] rs = new Cell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        /**
        * 这里直接continue,由于扩容过了,就先不rehash了
        */
        continue;               
    }
    /**
     * 作一个rehash,使得线程在下一个循环中可能找到独占的数组下标
     */
    h = advanceProbe(h);
}

到这里LongAdder的源码其实就分析结束了,其实代码并很少,可是他的思想很是值得咱们去学习。

4.与AtomicInteger的比较

光分析源码其实还差一些感受,咱们尚未搞懂为什么做者要在已经有AtomicInteger的状况下,再设计这么一个看上去很是复杂的类。

那么首先咱们先分析下AtomicInteger保证线程安全的原理

查看最基本的getAndIncrement方法

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

调用了Unsafe类的getAndAddInt方法,继续往下看

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

这里咱们再也不深究getIntVolatile和compareAndSwapInt方法具体实现,由于其已是native的方法了

能够看到,AtomicInteger底层是使用了cas+自旋的方式解决原子性问题的,即若是一次赋值不成功,那么就自旋,直到赋值成功为止

那么由此能够推断,当出现大量线程并发,竞争很是激烈的时候,AtomicInteger就有可能致使有些线程不断地竞争失败,不断自旋从而影响任务的吞吐量

为了解决高并发下的自旋问题,LongAdder的做者在设计的时候就经过增长一个数组的方式,使得竞争的对象从一个值变成多个值,从而使得发生竞争的频率下降,从而缓解了自旋的问题,固然付出的代价就是额外的存储空间。

最后我简单作了个测试,比较2种计数方法的耗时

经过原理可知,只有当线程竞争很是激烈的时候,LongAdder的优点才会比较明显,所以这里我用了100个线程,每个线程对同一个数累加1000000次,获得结果以下,差距很是巨大,达到15倍!

LongAdder耗时:104292242nanos

AtomicInteger耗时:1583294474nanos

固然这只是一个简单测试,包含了不少随机性,有兴趣的同窗能够尝试不一样的竞争程度屡次测试

5.思想的抽象

最后咱们须要将做者的具体代码和实现逻辑抽象一下,理清思考的过程

1)AtomicInteger遇到的问题:单个资源的竞争致使自旋的发生

2)解决的思路:将单个对象的竞争扩展为多个对象的竞争(有那么一些分治的思想)

3)扩展的可控性:多个竞争对象须要付出额外的存储空间,所以不能无脑地扩展(极端状况是一个线程一个计数的对象,这明显不合理)

4)问题的分层:由于使用类的时候的场景是不可控的,所以须要根据并发的激烈程度动态地扩展额外的存储空间(相似于synchronized的膨胀)

5)3个分层策略:当不发生竞争时,那么用一个值累加便可;当发生必定程度的竞争时,建立一个容量为2的数组,使得竞争的资源扩展为3个;当竞争更加激烈时,则继续扩展数组(对应图解中的1个线程到4个线程的过程)

6)策略细节:在自旋的时候增长rehash,此时虽然付出了必定的运算时间计算hash、比较数组对象等,可是这会使得并发的线程尽快地找到专属于本身的对象,在以后就不会再发生任何竞争(磨刀不误砍柴工,特别注意wasUncontended字段的相关注解)

相关文章
相关标签/搜索