比AtomicLong还高效的LongAdder 源码解析

接触到AtomicLong的缘由是在看guava的LoadingCache相关代码时,关于LoadingCache,其实思路也很是简单清晰:用模板模式解决了缓存不命中时获取数据的逻辑,这个思路我早前也正好在项目中使用到。java

言归正传,为何说LongAdder引发了个人注意,缘由有二:数组

  1. 做者是Doug lea ,地位实在举足轻重。
  2. 他说这个比AtomicLong高效。

咱们知道,AtomicLong已是很是好的解决方案了,涉及并发的地方都是使用CAS操做,在硬件层次上去作 compare and set操做。效率很是高。缓存

所以,我决定研究下,为何LongAdder比AtomicLong高效。多线程

首先,看LongAdder的继承树:并发

la1

继承自Striped64,这个类包装了一些很重要的内部类和操做。稍候会看到。less

 

正式开始前,强调下,咱们知道,AtomicLong的实现方式是内部有个value 变量,当多线程并发自增,自减时,均经过CAS 指令从机器指令级别操做保证并发的原子性。ide

再看看LongAdder的方法:高并发

la2
怪不得能够和AtomicLong做比较,连API都这么像。咱们随便挑一个API入手分析,这个API通了,其余API都大同小异,所以,我选择了add这个方法。事实上,其余API也都依赖这个方法。性能

la3
LongAdder中包含了一个Cell 数组,Cell是Striped64的一个内部类,顾名思义,Cell 表明了一个最小单元,这个单元有什么用,稍候会说道。先看定义:学习

la4
Cell内部有一个很是重要的value变量,而且提供了一个CAS更新其值的方法。

回到add方法:

la3

这里,我有个疑问,AtomicLong已经使用CAS指令,很是高效了(比起各类锁),LongAdder若是仍是用CAS指令更新值,怎么可能比AtomicLong高效了? 况且内部还这么多判断!!!

这是我开始时最大的疑问,因此,我猜测,难道有比CAS指令更高效的方式出现了? 带着这个疑问,继续。

第一if 判断,第一次调用的时候cells数组确定为null,所以,进入casBase方法:

la5
原子更新base没啥好说的,若是更新成功,本地调用开始返回,不然进入分支内部。

何时会更新失败? 没错,并发的时候,好戏开始了,AtomicLong的处理方式是死循环尝试更新,直到成功才返回,而LongAdder则是进入这个分支。

分支内部,经过一个Threadlocal变量threadHashCode 获取一个HashCode对象,该HashCode对象依然是Striped64类的内部类,看定义:

la6
有个code变量,保存了一个非0的随机数随机值。

回到add方法:

la3

拿到该线程相关的HashCode对象后,获取它的code变量,as[(n-1)&h] 这句话至关于对h取模,只不过比起取模,由于是 与 的运算因此效率更高。

计算出一个在Cells 数组中当先线程的HashCode对应的 索引位置,并将该位置的Cell 对象拿出来用CAS更新它的value值。

固然,若是as 为null 而且更新失败,才会进入retryUpdate方法。

看到这里我想应该有不少人明白为何LongAdder会比AtomicLong更高效了,没错,惟一会制约AtomicLong高效的缘由是高并发,高并发意味着CAS的失败概率更高, 重试次数更多,越多线程重试,CAS失败概率又越高,变成恶性循环,AtomicLong效率下降。 那怎么解决? LongAdder给了咱们一个很是容易想到的解决方案:减小并发,将单一value的更新压力分担到多个value中去,下降单个value的 “热度”,分段更新!!!

这样,线程数再多也会分担到多个value上去更新,只须要增长value就能够下降 value的 “热度”  AtomicLong中的 恶性循环不就解决了吗? cells 就是这个 “段” cell中的value 就是存放更新值的, 这样,当我须要总数时,把cells 中的value都累加一下不就能够了么!!

固然,聪明之处远远不只仅这里,在看看add方法中的代码,casBase方法可不能够不要,直接分段更新,上来就计算 索引位置,而后更新value?

答案是很差,不是不行,由于,casBase操做等价于AtomicLong中的CAS操做,要知道,LongAdder这样的处理方式是有坏处的,分段操做必然带来空间上的浪费,能够空间换时间,可是,能不换就不换,看空间时间都节约~! 因此,casBase操做保证了在低并发时,不会当即进入分支作分段更新操做,由于低并发时,casBase操做基本都会成功,只有并发高到必定程度了,才会进入分支,因此,Doug Lea对该类的说明是: 低并发时LongAdder和AtomicLong性能差很少,高并发时LongAdder更高效!

la7

可是,Doung Lea 仍是没这么简单,聪明之处尚未结束……

如此,retryUpdate中作了什么事,也基本略知一二了,由于cell中的value都更新失败(说明该索引到这个cell的线程也不少,并发也很高时) 或者cells数组为空时才会调用retryUpdate,

所以,retryUpdate里面应该会作两件事:

  1. 扩容,将cells数组扩大,下降每一个cell的并发量,一样,这也意味着cells数组的rehash动做。
  2.  给空的cells变量赋一个新的Cell数组

是否是这样呢? 继续看代码:

代码比较长,变成文本看看,为了方便你们看if else 分支,对应的  { } 我用相同的颜色标注出来。能够看到,这个时候Doug Lea才愿意使用死循环保证更新成功~!

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {

      int h = hc.code;

      boolean collide = false;                // True if last slot nonempty

      for (;;) {

          Cell[] as; Cell a; int n; long v;

          if ((as = cells) != null && (n = as.length) > 0) {// 分支1

              if ((a = as[(n - 1) & h]) == null) {

                  if (busy == 0) {            // Try to attach new Cell

                      Cell r = new Cell(x);   // Optimistically create

                      if (busy == 0 && casBusy()) {

                          boolean created = false;

                          try {               // Recheck under lock

                              Cell[] rs; int m, j;

                              if ((rs = cells) != null &&

                                      (m = rs.length) > 0 &&

                                      rs[j = (m - 1) & h] == null) {

                                  rs[j] = r;

                                  created = true;

                              }

                          } finally {

                              busy = 0;

                          }

                          if (created)

                              break;

                          continue;           // Slot is now non-empty

                      }

                  }

                  collide = false;

              }

              else if (!wasUncontended)       // CAS already known to fail

                  wasUncontended = true;      // Continue after rehash

              else if (a.cas(v = a.value, fn(v, x)))

                  break;

              else if (n >= NCPU || cells != as)

                  collide = false;            // At max size or stale

              else if (!collide)

                  collide = true;

              else if (busy == 0 && casBusy()) {

                  try {

                      if (cells == as) {      // Expand table unless stale

                          Cell[] rs = new Cell[n << 1];

                          for (int i = 0; i < n; ++i)

                              rs[i] = as[i];

                          cells = rs;

                      }

                  } finally {

                      busy = 0;

                  }

                  collide = false;

                  continue;                   // Retry with expanded table

              }

              h ^= h << 13;                   // Rehash  h ^= h >>> 17;

              h ^= h << 5;

          }

          else if (busy == 0 && cells == as && casBusy()) {//分支2

              boolean init = false;

              try {                           // Initialize table

                  if (cells == as) {

                      Cell[] rs = new Cell[2];

                      rs[h & 1] = new Cell(x);

                      cells = rs;

                      init = true;

                  }

              } finally {

                  busy = 0;

              }

              if (init)

                  break;

          }

          else if (casBase(v = base, fn(v, x)))

              break;                          // Fall back on using base

      }

      hc.code = h;                            // Record index for next time

  }

分支2中,为cells为空的状况,须要new 一个Cell数组。

分支1分支中,略复杂一点点:

注意,几个分支中都提到了busy这个方法,这个能够理解为一个CAS实现的锁,只有在须要更新cells数组的时候才会更新该值为1,若是更新失败,则说明当前有线程在更新cells数组,当前线程须要等待。重试。

回到分支1中,这里首先判断当前cells数组中的索引位置的cell元素是否为空,若是为空,则添加一个cell到数组中。

不然更新 标示冲突的标志位wasUncontended 为 true ,重试。

不然,再次更新cell中的value,若是失败,重试。

。。。。。。。一系列的判断后,若是仍是失败,下下下策,reHash,直接将cells数组扩容一倍,并更新当前线程的hash值,保证下次更新能尽量成功。

能够看到,LongAdder确实用了不少心思减小并发量,而且,每一步都是在”没有更好的办法“的时候才会选择更大开销的操做,从而尽量的用最最简单的办法去完成操做。追求简单,可是绝对不粗暴。

———————陈皓注————————

最后留给你们思考的两个问题:

1)是否是AtomicLong能够被废了?

2)若是cell被建立后,原来的casBase就不走了,会不会性能更差?

———————liuinsect注————————

昨天和左耳朵耗子简单讨论了下,发现左耳朵耗子,耗哥对读者思惟的引导仍是很是不错的,在第一次发现这个类后,对里面的实现又提出了更多的问题,引导你们思考,值得学习。

咱们 发现的问题有这么几个(包括以上的问题),本身简单总结下,欢迎你们讨论:

1. jdk 1.7中是否是有这个类?
我确认后,结果以下:    jdk-7u51 版本上尚未  可是jdk-8u20版本上已经有了。代码基本同样 ,增长了对double类型的支持和删除了一些冗余的代码。有兴趣的同窗能够去下载下JDK 1.8看看

2. base有没有参与汇总?
base在调用intValue等方法的时候是会汇总的:

LA10

3. 若是cell被建立后,原来的casBase就不走了,会不会性能更差? base的顺序可不能够调换?
    刚开始我想可不能够调换add方法中的判断顺序,好比,先作casBase的判断? 仔细思考后认为仍是 不调换可能更好,调换后每次都要CAS一下,在高并发时,失败概率很是高,而且是恶性循环,比起一次判断,后者的开销明显小不少,尚未反作用(上一个问题,base变量在sum时base是会被统计的,并不会丢掉base的值)。所以,不调换可能会更好。

4. AtomicLong可不能够废掉?
个人想法是能够废掉了,由于,虽然LongAdder在空间上占用略大,可是,它的性能已经足以说明一切了,不管是从节约空的角度仍是执行效率上,AtomicLong基本没有优点了,具体看这个测试(感谢Lemon的回复):http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/

 

(全文完)

相关文章
相关标签/搜索