并发编程第六天----LongAdder源码深度解析

简介

AtomicLong 经过 CAS 提供了非阻塞的原子性操做,性能比使用同步锁好多了。可是在高并发状况下,大量线程争夺同一个原子变量,只有一个线程的 CAS 能操做成功,其余线程会不停地 CAS 自旋,极度浪费 CPU 资源。java

为了解决这个问题,JDK8 提供了一个类 LongAdder把一个变量分红多个变量,让一样多的线程去竞争多个资源,就解决了性能问题。数组

LongAdder 在内部维护了多个 Cell 原子变量,另外,多个线程在争夺同一个 Cell 原子变量时若是失败了,并非在当前 Cell 变量上一直尝试,而是尝试对其余 Cell 变量进行 CAS 操做。最后,在获取 LongAdder 当前值时,是把全部 Cell 变量的 value 值累加后再加上 base 返回的。下面咱们来看看 LongAdder 的使用及源码。安全


使用

public class LongAdderTest {

    static LongAdder longAdder = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                for (int j = 0; j < 10; j++) {
                    longAdder.add(10);
                }
            }).start();
        }

        Thread.sleep(2000); // 保证前面的线程执行完

        System.out.println(longAdder.sum());
    }
}
复制代码

毫无疑问,输出的结果确定是 1000,LongAdder 是线程安全的。markdown


源码分析

Striped64

LongAdder 是继承自 Striped64,咱们来看看 Striped64 的主要成员变量。多线程

abstract class Striped64 extends Number {
    
    // 大小是 2 的 n 次方
    transient volatile Cell[] cells;
    
    // 基础值
    transient volatile long base;
    
    // 一个标识,状态只有 0 和 1,为 1 表示 cells 数组在初始化或者扩容
    transient volatile int cellsBusy;
    
    @sun.misc.Contended static final class Cell {
        
        volatile long value;
        
        Cell(long x) { value = x; }
        
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe 机制
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                // 获取 Cell 实例中变量 value 的内存偏移量
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    // Unsafe 机制 
    private static final sun.misc.Unsafe UNSAFE;
    // 记录 Striped64 实例中变量 base 的偏移量
    private static final long BASE;
    
    // 记录 Striped64 实例中变量 cellsBusy 的偏移量
    private static final long CELLSBUSY;
    
     // 记录 Thread 实例中变量 threadLocalRandomProbe 的偏移量,用于计算访问下标
    private static final long PROBE;
    
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
   
}
复制代码

Unsafe 机制如今应该很是熟悉了,就是用于获取变量在实例中的偏移量,用于 CAS 操做。并发

@sun.misc.Contended 能够解决伪共享问题, Cell 内部有一个声明为 volatile 的变量,并经过 CAS 更新该值,保证了更新操做的原子性。CellAtomicLong 的优化。保证了线程操做 Cell 元素的原子性。app

Cell 数组的大小必定为 2 的 n 次方。dom

threadLocalRandomProbe 用来计算当前线程访问 Cell 数组的下标。ide

cellsBusy 做为标识锁变量,状态只有 0 和 1 ,为 1 表示 Cell 数组正在初始化或者扩容,其余线程则不能进行初始化或者扩容高并发


LongAdder 源码分析

咱们应该关注下面几个问题:

  1. 当前线程应该访问 Cell 数组里的哪一个元素。
  2. 如何初始化 Cell 数组。
  3. Cell 数组什么时候扩容
  4. 线程访问分配的 Cell 元素有冲突时怎么办

sum()

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) { // 若是 Cell 数组为空,则直接返回 base
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
复制代码

累加全部 Cell 内部的 value 值,而后再累加上 base。因为没有对 Cell 数组进行加锁,因此在累加的过程当中,可能有其余线程对 Cell 数组的内容进行了修改,返回的值可能不是很精确。


add()

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {   // ( 1 )
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||    // ( 2 )
            (a = as[getProbe() & m]) == null ||     // ( 3 )
            !(uncontended = a.cas(v = a.value, v + x)))  // ( 4 )
            longAccumulate(x, null, uncontended); // ( 5 )
    }
}
复制代码

( 1 ) 判断 cells 是否为 null,若是为 null,则直接在 base 上进行累加,此时相似 AtomicLong,若是 cells 不为 null 或者 CAS 操做失败,则进行下面的操做。刚开始并发线程较少时,全部的累加操做都是对 base 变量进行,当某个线程第一次 CAS 操做失败时,则进行初始化 Cell 数组

( 2 ) 、 ( 3 ) 决定当前线程应该访问 Cell 数组里的哪一个元素 (解决了问题 1 ) ,经过 getProbe() & m 计算的,getProbe 用于获取当前线程的 ThreadLocalRandomProbe 的值, m 是 Cell 数组的长度。若是访问的 Cell 数组元素为 null,则执行代码 ( 5 )。

若是该 Cell 元素存在则执行代码 ( 4 ),经过 CAS 更新 Cell 元素的 value 值,若是更新失败则执行代码 ( 5 ),而且 uncontended 的值为 false。


longAccumulate()

longAccumulate 是涉及处理初始化扩容解决冲突的方法,在 Striped64 类中。咱们能在里面找到问题 2 、 3 、 4 的答案。

int h;  // 记录当前线程的 threadLocalRandomProbe 的值
    if ((h = getProbe()) == 0) { 
        ThreadLocalRandom.current(); // 初始化当前线程的 threadLocalRandomProbe 的值
        h = getProbe();
        wasUncontended = true;
    }
复制代码

最开始是判断当前线程的 threadLocalRandomProbe 值是否为 0 ,为 0 则初始化 threadLocalRandomProbe。这个变量用于计算当前线程应该被分配到 Cell 数组的哪一个下标。


boolean collide = false;            
        for (;;) {  // 无限循环
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
               // cells 不为 null 时
               ...
            }
            // cells 为 null 则进行初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           
                    if (cells == as) {
                        Cell[] rs = new Cell[2]; // 初始化大小为2
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            
            // 为了能找到一个空闲的 Cell,从新计算 hash 值
            h = advanceProbe(h); 
        }
复制代码

Cell 数组为 null 时,则进行初始化。 初始化时经过 casCellsBusy()cellsBusy 的值设为 1,这时其余线程就不能进行扩容或者初始化了

初始化 Cell 数组的大小为 2,并经过 h & 1 计算出当前线程访问的 Cell 元素,并进行赋值操做。(回答了问题 2 )


boolean collide = false;  // 为 true 表示冲突
        
        for (;;) { // 无限循环
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) { // 当前线程访问的 cell 元素为 null
                
                    if (cellsBusy == 0) { 
                        Cell r = new Cell(x);   // 初始化 Cell 元素 (采用了延迟加载,用到时才初始化 Cell)
                        if (cellsBusy == 0 && casCellsBusy()) { // 获取锁变量
                            boolean created = false;
                            try {               
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           
                        }
                    }
                   
                    collide = false;
                }
                
                else if (!wasUncontended)       // 以前 CAS 更新失败
                    wasUncontended = true;      
                
                // 当前线程访问的 Cell 元素存在了,则进行 CAS 加操做 
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                    
                // NCPU 表明当前机器的 CPU 个数,Cell 数组的元素不能超过 NCPU
                else if (n >= NCPU || cells != as)   ( 7 )
                    collide = false;           
                
                
                else if (!collide)    ( 8 )
                    collide = true;
                    
                // 扩容操做
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   
                }
                
                // 为了能找到一个空闲的 Cell,从新计算 hash 值
                h = advanceProbe(h);   ( 9 )
            }
        }
复制代码

( 7 ) 、 ( 8 ) 都不符合条件时,才会执行扩容操做。即:当前 Cell 数组的个数小于 CPU 的个数 而且 多个线程间发生了冲突。

( 7 ) 处表示只有当前 Cell 数组的长度小于 CPU 的个数,才能扩容。为何要作这样的限制呢 :只有当每一个 CPU 运行一个线程时才会使多线程的效果最佳,也就是当 Cell 数组元素的个数与 CPU 个数一致时,每一个 Cell 元素都用一个 CPU 处理,效果最佳。

( 8 ) 表示多个线程访问了 Cell 数组中的同一个元素,或者多个线程尝试获取 cellsBusy 锁变量 致使了冲突。

扩容时先获取 cellsBusy 锁变量,而后大小扩充为原来的两倍再复制原来的变量到新数组。(解决了问题 3)。

( 9 ) 处对 CAS 失败的线程从新计算 threadLocalRandomProb , 减小冲突机会。(解决了问题 4


总结

LongAccumulate() 源码表面复杂,可是只要围绕文章开头题的四个问题去看,也不是很难。

相关文章
相关标签/搜索