Hystrix须要根据过去一段时间内失败的请求次数来判断是否打开熔断开关,因此它会维护一个时间窗口,并不断向该窗口中累加失败请求次数,在多线程环境下通常会使用AtomicLong,可是Hystrix中使用的是LongAdder。查了一下,发如今Hystrix,Guava,JDK8中都有这个类,应该是Java8中才加到标准库中,其余库要兼容老版本只能本身复制一份了。Hystrix和Java8中的LongAdder具体实现有细微差异,不过总体思路是同样的,下面的分析都是以jdk为准的。html
为何Hystrix使用LongAdder而不是AtomicLong呢?在LongAdder的Java doc中有java
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
在存在高度竞争的条件下,LongAdder的性能会远远好于AtomicLong,不过会消耗更多空间。高度竞争固然是指在多线程条件下。程序员
咱们知道AtomicLong是经过cas来更新值的,按理说是很快的,LongAdder为何会比它更快,是还有其余什么更快的手段吗?先无论这些,直接实验一下,看是否是真的更快。编程
public class TestAtomic { private static final int TASK_NUM = 1000; private static final int INCREMENT_PER_TASK = 10000; private static final int REPEAT = 10; private static long l = 0; public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); } public static void testAtomicLong() { AtomicLong al = new AtomicLong(0); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> al.incrementAndGet())); } public static void testLong() { l = 0; execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testLongAdder() { LongAdder adder = new LongAdder(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> adder.add(1))); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void execute(int n, Runnable task) { try { CountDownLatch latch = new CountDownLatch(n); ExecutorService service = Executors.newFixedThreadPool(100); Runnable taskWrapper = () -> { task.run(); latch.countDown(); }; service.invokeAll(cloneTask(n, taskWrapper)); latch.await(); service.shutdown(); } catch (Exception e) {} } private static Collection<Callable<Void>> cloneTask(int n, Runnable task) { return ntimes(n).mapToObj(x -> new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }).collect(Collectors.toList()); } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
上面是用1000个并发任务,每一个任务对数据累加10000次,每一个实验测试10次。数组
输出:缓存
total: 1939, [258, 196, 200, 174, 186, 178, 204, 189, 185, 169]
多线程
total: 613, [57, 45, 47, 53, 69, 61, 80, 67, 64, 70]
并发
total: 1131, [85, 67, 77, 81, 280, 174, 108, 67, 99, 93]
app
从上往下依次是AtomicLong, LongAdder, long。less
从结果能看到LongAdder确实性能高于AtomicLong,不过还有一个让我很是吃惊的结果,就是LongAdder居然比直接累加long还快(固然直接累加long最终获得的结果是错误的,由于没有同步),这个有些反常识了,其实这里涉及到了一些隐藏的问题,就是cache的false sharing,由于平时编程时不太会关注cache这些,因此碰到这个结果会出乎预料,详细的解释在后面的第三节。
先来分析一下LongAdder为何会比AtomicLong快,是否是用到了什么比cas还快的东西。
LongAdder的父类Striped64的注释中已经将整个类的设计讲的很清楚的了,类中主要维护两个值,一个long型的base属性,一个Cell数组,它们值的和才是真正的结果。Cell是对long的一个包装,为何将long包装起来,猜想有两个缘由:1)能够在类中添加padding数据,避免false sharing,2)包装起来才好使用cas。
LongAdder.add的流程简单描述就是,先尝试经过cas修改base,成功则返回,失败则根据当前线程hash值从Cell数组中选择一个Cell,而后向Cell中add数据。Cell数组是动态增加的,而且是用时才初始化的,这是为了不占用过多空间。
看到注释大概能猜到为何快了,LongAdder仍然用的cas,快是由于在高度竞争的条件下,对一个值进行修改,冲突的几率很高,须要不断cas,致使时间浪费在循环上,若是将一个值拆分为多个值,分散压力,那么性能就会有所提升。
下面来看源码,进入LongAdder的add方法:
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
上面先对base进行cas操做,而后判断Cell数组是否为空,不为空则根据当前线程probe值(相似hash值)选择Cell并进行cas,都不成功进入longAccumulate方法。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // (1) if ((a = as[(n - 1) & h]) == null) { // (1.1) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (1.2) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // (1.3) try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // (2) boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (3) break; // Fall back on using base } }
Cell数组不为空时进入分支(1),若是根据当前线程hash得到的Cell为null,则进入(1.1)开始实例化该Cell,不然进入(1.2)对Cell进行cas,不成功的话表示冲突比较多,开始进入(1.3)对Cell数组扩容了,cellsBusy是用cas实现的一个spinlock;
Cell数组为空且获取到cellsBusy时进入分支(2),开始初始化Cell数组;
分支(1)和(2)都进不去,没办法,只能再次对base进行cas。
上面只是对源码作了粗略的分析,详细的每一个分支的含义我也不知道,不过这些咱们都不须要较真去弄的很是清楚,毕竟世界上只有一个Doug Lea,咱们只须要知道LongAdder是怎么比AtomicLong快的就行,实际就是用多个long来分担压力,一群人到十个盘子里夹菜固然比到一个盘子里夹菜冲突小。
知道了原理,那咱们就本身来实现一个简陋的LongAdder。
public class MyLong { private static final int LEN = 2 << 5; private AtomicLong[] atomicLongs = new AtomicLong[LEN]; public MyLong() { for (int i = 0; i < LEN; ++i) { atomicLongs[i] = new AtomicLong(0); } } public void add(long l) { atomicLongs[hash(Thread.currentThread()) & (LEN - 1)].addAndGet(l); } public void increment() { add(1); } public long get() { return Arrays.stream(atomicLongs).mapToLong(al -> al.get()).sum(); } // 从HashMap里抄过来的 private static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } }
在最上面的TestAtomic类中加上方法:
public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testMyLong()); } public static void testMyLong() { MyLong myLong = new MyLong(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> myLong.increment())); }
输出:
total: 1907, [176, 211, 192, 182, 195, 173, 199, 229, 184, 166] total: 641, [67, 50, 45, 53, 73, 58, 80, 63, 69, 83] total: 947, [90, 82, 70, 72, 87, 78, 136, 107, 77, 148] total: 670, [81, 80, 73, 67, 57, 94, 62, 49, 57, 50]
能够看到性能比AtomicLong好多了。
前面解释了LongAdder比AtomicLong快,可是为何它还会比long快?解答这个问题以前要先介绍伪共享的概念。
在计算机中寻址是以字节为单位,可是cache从内存中复制数据是以行为单位的,一个行会包含多个字节,通常为64字节,每一个CPU有本身的L一、L2 cache,如今有两个变量x、y在同一行中,若是CPU1修改x,缓存一致性要求数据修改须要立刻反应到其余对应副本上,CPU2 cache对应行从新刷新,而后CPU2才能访问y,若是CPU1一直修改x,CPU2一直访问y,那么CPU2得一直等到cache刷新后才能访问y,带来性能降低,产生这个问题的缘由有两方面:1)x、y位于同一行,2)两个CPU会频繁的访问这两个数据,若是这两个条件其中一个不成立,那就不会产生问题。更多关于伪共享的概念参考伪共享(False Sharing)和(false sharing(wiki)。
既然这个问题出现了,那确定是有解决办法的。通常就是添加padding数据,来将x、y隔开,让它们不会位于同一行中。
Java中的话,在Java7以前须要手动添加padding数据,后来JEP 142提案提出应该为程序员提供某种方式来标明哪些字段是会存在缓存竞争的,而且虚拟机可以根据这些标识来避免这些字段位于同一行中,程序员不用再手动填充padding数据。
@Contended就是应JEP 142而生的,在字段或类上标准该注解,就表示编译器或虚拟机须要在这些数据周围添加padding数据。Java8的伪共享和缓存行填充--@Contended注释中详细解释了@Contended注解的使用方法,在百度或者谷歌上搜索 jep 142 site:mail.openjdk.java.net
能找到不少@Contended相关资料。
下面实验一下来观察false sharing:
public class TestContended { private static int NCPU = Runtime.getRuntime().availableProcessors(); private static ForkJoinPool POOL = new ForkJoinPool(NCPU); private static int INCREMENT_PER_TASK = 1000000; private static final int REPEAT = 10; private static long l = 0; private static long l1 = 0; private static long l2 = 0; private static long cl1 = 0; private static volatile long q0, q1, q2, q3, q4, q5, q6; private static long cl2 = 0; public static void main(String[] args) { repeatWithStatics(REPEAT, () -> testLongWithSingleThread()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testTwoLong()); repeatWithStatics(REPEAT, () -> testTwoContendedLong()); } public static void testLongWithSingleThread() { repeat(2 * INCREMENT_PER_TASK, () -> l++); } public static void testLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l++), () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testTwoLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l1++), () -> repeat(INCREMENT_PER_TASK, () -> l2++)); } public static void testTwoContendedLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> cl1++), () -> repeat(INCREMENT_PER_TASK, () -> cl2++)); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void asyncExecute2Task(Runnable task1, Runnable task2) { try { CompletableFuture.runAsync(task1, POOL) .thenCombine(CompletableFuture.runAsync(task2, POOL), (r1, r2) -> 0).get(); } catch (Exception e) {} } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
不知道为何我用不了@Contended注解,即便启动参数加上-XX:-RestrictContended也不行,因此只能手工添加padding数据。目前缓存行大小通常为64字节(也能够经过CPUZ来查看),也就是填充7个long就能够将两个long型数据隔离在两个缓存行中了。
输出:
total: 16, [9, 5, 1, 0, 0, 0, 0, 1, 0, 0] total: 232, [35, 35, 33, 24, 25, 23, 13, 15, 15, 14] total: 148, [17, 15, 14, 16, 14, 15, 13, 17, 12, 15] total: 94, [8, 8, 8, 8, 15, 9, 10, 11, 8, 9]
从上往下依次为:1)单线程累加一个long;2)两个线程累加一个long;3)两个线程累加两个long,这两个long位于同一缓存行中;4)两个线程累加两个long,且它们位于不一样缓存行中。
从上面的结果看,padding仍是颇有效的。结果2相比于1,不只会有线程切换代价还会有false sharing问题,对于纯计算型任务线程个数不要超过CPU个数。不过有一点想不通的是,结果2和3为何差距这么大。
以上转自公司同事“yuanzhongcheng”的分享