本文转自 https://blog.csdn.net/u011392897/article/details/60480108 java
高并发下计数,通常最早想到的应该是AtomicLong/AtomicInt,AtmoicXXX使用硬件级别的指令 CAS 来更新计数器的值,这样能够避免加锁,机器直接支持的指令,效率也很高。可是AtomicXXX中的 CAS 操做在出现线程竞争时,失败的线程会白白地循环一次,在并发很大的状况下,由于每次CAS都只有一个线程能成功,竞争失败的线程会很是多。失败次数越多,循环次数就越多,不少线程的CAS操做愈来愈接近 自旋锁(spin lock)。计数操做原本是一个很简单的操做,实际须要耗费的cpu时间应该是越少越好,AtomicXXX在高并发计数时,大量的cpu时间都浪费会在 自旋 上了,这很浪费,也下降了实际的计数效率。数组
// 很简单的一个类,这个类能够当作是一个简化的AtomicLong // 经过cas操做来更新value的值 // @sun.misc.Contended是一个高端的注解,表明使用缓存行填来避免伪共享,能够本身网上搜下,这个我就不细说了 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics Unsafe相关的初始化 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
四个实现类的区别就上面这两句话,这里只讲LongAdder一个类。缓存
2、核心实现Striped64多线程
// 很简单的一个类,这个类能够当作是一个简化的AtomicLong // 经过cas操做来更新value的值 // @sun.misc.Contended是一个高端的注解,表明使用缓存行填来避免伪共享,能够本身网上搜下,这个我就不细说了 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics Unsafe相关的初始化 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
二、Striped64主体代码并发
abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { ... } /** Number of CPUS, to place bound on table size */ static final int NCPU = Runtime.getRuntime().availableProcessors(); // cell数组,长度同样要是2^n,能够类比为jdk1.7的ConcurrentHashMap中的segments数组 transient volatile Cell[] cells; // 累积器的基本值,在两种状况下会使用: // 一、没有遇到并发的状况,直接使用base,速度更快; // 二、多线程并发初始化table数组时,必需要保证table数组只被初始化一次,所以只有一个线程可以竞争成功,这种状况下竞争失败的线程会尝试在base上进行一次累积操做 transient volatile long base; // 自旋标识,在对cells进行初始化,或者后续扩容时,须要经过CAS操做把此标识设置为1(busy,忙标识,至关于加锁),取消busy时能够直接使用cellsBusy = 0,至关于释放锁 transient volatile int cellsBusy; Striped64() { } // 使用CAS更新base的值 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } // 使用CAS将cells自旋标识更新为1 // 更新为0时能够不用CAS,直接使用cellsBusy就行 final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } // 下面这两个方法是ThreadLocalRandom中的方法,不过由于包访问关系,这里又从新写一遍 // probe翻译过来是探测/探测器/探针这些,很差理解,它是ThreadLocalRandom里面的一个属性, // 不过并不影响对Striped64的理解,这里能够把它理解为线程自己的hash值 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } // 至关于rehash,从新算一遍线程的hash值 static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } /** * 核心方法的实现,此方法建议在外部进行一次CAS操做(cell != null时尝试CAS更新base值,cells != null时,CAS更新hash值取模后对应的cell.value) * @param x the value 前面我说的二元运算中的第二个操做数,也就是外部提供的那个操做数 * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder). * 外部提供的二元算术操做,实例持有而且只能有一个,生命周期内保持不变,null表明LongAdder这种特殊可是最经常使用的状况,能够减小一次方法调用 * @param wasUncontended false if CAS failed before call 若是为false,代表调用者预先调用的一次CAS操做都失败了 */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 这个if至关于给线程生成一个非0的hash值 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty 若是hash取模映射获得的Cell单元不是null,则为true,此值也能够看做是扩容意向,感受这个更好理解 for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells已经被初始化了 if ((a = as[(n - 1) & h]) == null) { // hash取模映射获得的Cell单元还为null(为null表示尚未被使用) if (cellsBusy == 0) { // Try to attach new Cell 若是没有线程正在执行扩容 Cell r = new Cell(x); // Optimistically create 先建立新的累积单元 if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁 boolean created = false; try { // Recheck under lock 在有锁的状况下再检测一遍以前的判断 Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考虑别的线程可能执行了扩容,这里从新赋值从新判断 rs[j] = r; // 对没有使用的Cell单元进行累积操做(第一次赋值至关因而累积上一个操做数,求和时再和base执行一次运算就获得实际的结果) created = true; } } finally { cellsBusy = 0; 清空自旋标识,释放锁 } if (created) // 若是本来为null的Cell单元是由本身进行第一次累积操做,那么任务已经完成了,因此能够退出循环 break; continue; // Slot is now non-empty 不是本身进行第一次累积操做,重头再来 } } collide = false; // 执行这一句是由于cells被加锁了,不能往下继续执行第一次的赋值操做(第一次累积),因此还不能考虑扩容 } else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争 wasUncontended = true; // Continue after rehash 状况失败标识,后面去从新算一遍线程的hash值 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试CAS更新a.value(进行一次累积) ------ 标记为分支A break; // 成功了就完成了累积任务,退出循环 else if (n >= NCPU || cells != as) // cell数组已是最大的了,或者中途发生了扩容操做。由于NCPU不必定是2^n,因此这里用 >= collide = false; // At max size or stale 长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,下面两个else if就永远不会被执行了,也就永远不会再进行扩容 // CPU可以并行的CAS操做的最大数量是它的核心数(CAS在x86中对应的指令是cmpxchg,多核须要经过锁缓存来保证总体原子性),当n >= NCPU时,再出现几个线程映射到同一个Cell致使CAS竞争的状况,那就真不关扩容的事了,彻底是hash值的锅了 else if (!collide) // 映射到的Cell单元不是null,而且尝试对它进行累积时,CAS竞争失败了,这时候把扩容意向设置为true // 下一次循环若是仍是跟这一次同样,说明竞争很严重,那么就真正扩容 collide = true; // 把扩容意向设置为true,只有这里才会给collide赋值为true,也只有执行了这一句,才可能执行后面一个else if进行扩容 else if (cellsBusy == 0 && casCellsBusy()) { // 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为分支B try { if (cells == as) { // Expand table unless stale 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍) Cell[] rs = new Cell[n << 1]; // 执行2倍扩容 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; // 释放锁 } collide = false; // 扩容意向为false continue; // Retry with expanded table 扩容后重头再来 } h = advanceProbe(h); // 从新给线程生成一个hash值,下降hash冲突,减小映射到同一个Cell致使CAS竞争的状况 } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells没有被加锁,而且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if boolean init = false; try { // Initialize table if (cells == as) { // CAS避免不了ABA问题,这里再检测一次,若是仍是null,或者空数组,那么就执行初始化 Cell[] rs = new Cell[2]; // 初始化时只建立两个单元 rs[h & 1] = new Cell(x); // 对其中一个单元进行累积操做,另外一个无论,继续为null cells = rs; init = true; } } finally { cellsBusy = 0; // 清空自旋标识,释放锁 } if (init) // 若是某个本来为null的Cell单元是由本身进行第一次累积操做,那么任务已经完成了,因此能够退出循环 break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在进行初始化时,尝试直接在base上进行累加操做 break; // Fall back on using base 直接在base上进行累积操做成功了,任务完成,能够退出循环了 } } // double的不讲,更long的逻辑基本上是同样的 final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended); // Unsafe mechanics Unsafe初始化 private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
看完这个在来看看第一点中我提的问题:既然Cell这么简单,为何不直接用long value?app
public class LongAdder extends Striped64 implements Serializable { // 构造方法,什么也不作,直接使用默认值,base = 0, cells = null public LongAdder() { } // add方法,根据父类的longAccumulate方法的要求,这里要进行一次CAS操做 // (虽然这里有两个CAS,可是第一个CAS成功了就不会执行第二个,要执行第二个,第一个就被“短路”了不会被执行) // 在线程竞争不激烈时,这样作更快 public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } public void increment() { add(1L); } public void decrement() { add(-1L); } // 返回累加的和,也就是“当前时刻”的计数值 // 此返回值可能不是绝对准确的,由于调用这个方法时还有其余线程可能正在进行计数累加, // 方法的返回时刻和调用时刻不是同一个点,在有并发的状况下,这个值只是近似准确的计数值 // 高并发时,除非全局加锁,不然得不到程序运行中某个时刻绝对准确的值,可是全局加锁在高并发状况下是下下策 // 在不少的并发场景中,计数操做并非核心,这种状况下容许计数器的值出现一点误差,此时可使用LongAdder // 在必须依赖准确计数值的场景中,应该本身处理而不是使用通用的类 public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } // 重置计数器,只应该在明确没有并发的状况下调用,能够用来避免从新new一个LongAdder public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } } // 至关于sum()后再调用reset() public long sumThenReset() { Cell[] as = cells; Cell a; long sum = base; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; a.value = 0L; } } } return sum; } // 其余的不说了 }