昨天简单的看了看Unsafe的使用,今天咱们看看JUC中的原子类是怎么使用Unsafe的,以及分析一下其中的原理!java
一.简单使用AtomicLong编程
还记的上一篇博客中咱们使用了volatile关键字修饰了一个int类型的变量,而后两个线程,分别对这个变量进行10000次+1操做,最后结果不是20000,如今咱们改为AtomicLong以后,你会发现结果始终都是20000了!有兴趣的能够试试,代码以下数组
package com.example.demo.study; import java.util.concurrent.atomic.AtomicLong; public class Study0127 { //这是一个全局变量,注意,这里使用了一个原子类AtomicLong public AtomicLong num = new AtomicLong(); //每次调用这个方法,都会对全局变量加一操做,执行10000次 public void sum() { for (int i = 0; i < 10000; i++) { //使用了原子类的incrementAndGet方法,其实就是把num++封装成原子操做 num.incrementAndGet(); System.out.println("当前num的值为num= "+ num); } } public static void main(String[] args) throws InterruptedException { Study0127 demo = new Study0127(); //下面就是新建两个线程,分别调用一次sum方法 new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); } }
二.走近AtomicLong类缓存
在java中JDK 1.5以后,就出现了一个包,简称JUC并发包,全称就是java.util .concurrent,其中咱们应该据说过一个类ConcurrentHashMap,这个map挺有意思的,有兴趣能够看看源码!还有不少并发时候须要使用的类好比AtomicInteger,AtomicLong,AtomicBoolean等等,其实都差很少,此次咱们就简单看看AtomicLong,其余的几个类也差很少多线程
public class AtomicLong extends Number implements java.io.Serializable { //获取Unsafe对象,上篇博客说了咱们本身的类中不能使用这种方式的缘由,可是官方的这个类为何能够这样获取呢?由于本类AtomicLong //就是在rt.jar包下面,本类就是用Bootstrap类加载的,因此就能够用这种方式 private static final Unsafe unsafe = Unsafe.getUnsafe(); //value这个字段的偏移量 private static final long valueOffset; //判断jvm是否支持long类型的CAS操做 static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); private static native boolean VMSupportsCS8(); static { try { valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //这里用了volatile使的多线程下可见性,必定要分清楚原子性和可见性啊 private volatile long value; //两个构造器很少说 public AtomicLong() { } public AtomicLong(long initialValue) { value = initialValue; }
而后咱们看看AtomicLong的+1操做,能够看到使用的仍是unsafe这个类,只须要看看getAndAddLong方法就能够了并发
方法getAndAddLong里面就是进行了CAS操做,能够当作若是同时有多个线程都调用incrementAndGet方法进行+1,那么同一时间只有一个线程会去进行操做,而其余的会不断的使用CAS去尝试+1,每次尝试的时候都会去主内存中获取最新的值;app
public final long getAndAddLong(Object o, long offset, long delta) { long v; do {
//这个方法就是从新获取主内存的值,由于使用了volatile修饰了那个变量,因此缓存就没用了 v = getLongVolatile(o, offset); //这里就是一个dowhile无限循环,多个线程不断的调用compareAndSwapLong方法去设置值,其实就是CAS,没什么特别好说的吧,
//当某个线程CAS成功就跳出这个循环,不然就一直在循环不断的尝试,这也是CAS和线程阻塞的区别 } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; }
//这个CAS方法看不到,c实现的 public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
有兴趣的能够看看AtomicLong的其余方法,不少都同样,CAS是核心less
三.CAS的不足以及认识LongAdderdom
从上面的例子中,咱们能够知道在多线程下使用AtomicLong类的时候,同一个时刻使用那个共享变量的只能是一个线程,其余的线程都是在无限循环,这种循环也是须要消耗性能的,若是线程比较多,不少的线程都在各自的无限循环中,或者叫作多个线程都在自旋;每一个线程都在自旋无数次真的是比较坑,比较消耗性能,咱们能够想办法自旋必定的次数,线程就结束运行了,有兴趣的能够了解一下自旋锁,其实就是这么一个原理,很容易,哈哈哈!jvm
在JDK8以后,提供了一个更好的类取代AtomicLong,那就是LongAdder,上面说过同一时间只有一个线程在使用那个共享变量,其余的线程都在自旋,那么若是能够把这个共享变量拆开成多个部分,那么是否是能够多个线程同时能够去操做呢?而后操做完以后再综合起来,有点分治法的思想,分而治之,最后综合起来。
那么咱们怎么把那个共享变量拆成多个部分呢?
在LongAdder中是这样处理的,把那个变量拆成一个base(这个是long类型的,初始值为0)和一个Cell(这个里面封装了一个long类型的值,初始值为0),每一个线程只会去竞争不少Cell就好了,最后把多个Cell中的值和base累加起来就是最终结果;并且一个线程若是没有竞争到Cell以后不会傻傻的自旋,直接想办法去竞争下一个Cell;
下图所示
四.简单使用LongAdder
用法其实和AtomicLong差很少,有兴趣的能够试试,最后的结果始终都是20000
package com.example.demo.study; import java.util.concurrent.atomic.LongAdder; public class Study0127 { //这里使用LongAdder类 public LongAdder num = new LongAdder(); //每次调用这个方法,都会对全局变量加一操做,执行10000次 public void sum() { for (int i = 0; i < 10000; i++) { //LongAdder类的自增操做,至关于i++ num.increment(); System.out.println("当前num的值为num= "+ num); } } public static void main(String[] args) throws InterruptedException { Study0127 demo = new Study0127(); //下面就是新建两个线程,分别调用一次sum方法 new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); } }
五.走进LongAdder
从上面能够看到base只能是一个,而Cell可能有多个,并且Cell太多了也是很占内存的,因此一开始的时候不会建立Cell,只有在须要时才建立,也叫作惰性加载。
咱们能够知道LongAdder是继承自Striped64这个类的
而Striped64类中有三个字段,cells数组用于存放多个Cell,一个是base很少说,还有一个cellsBusy用来实现自旋锁,状态只能是0或1(0表示Cell数组没有被初始化和扩容,也没有正在建立Cell元素,反之则为1),在建立Cell,初始化Cell数组或者扩容Cell数组的时候,就会用到这个字段,保证同一时刻只有一个线程能够进行其中之一的操做。
1.咱们简单看看Cell的结构
从下面代码中能够很清楚的看到所谓的Cell就是对一个long类型变量的CAS操做
@sun.misc.Contended //这个注解的做用是为了不伪共享,至于什么伪共享,后面有机会再说说 static final class Cell { //每一个Cell类中就是这个声明的变量后期要进行累加的 volatile long value; //构造函数 Cell(long x) { value = x; } //Unsafe对象 private static final sun.misc.Unsafe UNSAFE; //value的偏移量 private static final long valueOffset; //这个静态代码块中就是获取Unsafe对象和偏移量的 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } //CAS操做,没什么好说的 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } }
2.LongAdder类自增方法increment()
咱们能够看到increment()方法其实就是调用了add方法,咱们须要关注add方法干了一些什么;
public void add(long x) { Cell[] as; long b, v; int m; Cell a; //这里的cells是父类Striped64中的,不为空的话就保存到as中,而后调用casBase方法,就是CAS给base更新为base+x,也就是每次都新增x, //在这里因为add(1L)传入的参数是1,也就是每次就是加一 //若是CAS成功以后就不说了,就完成操做了,若是CAS失败,则进入到里面去 if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; //这个if判断条件贼长,咱们把这几个条件分为1,2,3,4部分,前三部分都是用于决定线程应该访问Cell数组中哪个Cell元素,最后一个部分用于更新Cell的值 //若是第1,2,3部分都不知足,也就是说Cell数组存在并且已经找到了肯定的Cell元素,那就到第四部分,更新对应的Cell中的值(在Cell类中的cas方法已经看过了) //若是第1,2,3部分知足其中一个,那也就是说Cell数组根本就不存在或者线程找不到对应的Cell,就执行longAccumulate方法 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //后面仔细看看这个方法,这是对Cell数组的初始化和扩容,颇有意思 longAccumulate(x, null, uncontended); } } //一个简单的CAS操做 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }
对于上面的,有兴趣的能够看看是怎么找到指定的Cell的,在上面的a = as[getProbe() & m]中,其中m=数组的长度-1,其实这里也是一个取余的运算,而getProbe()这个方法是用于获取当前线程的threadLocalRandomProb(当前本地线程探测值,初始值为0),其实也就是一个随机数啊,而后对数组的长度取余获得的就是对应的数组的索引,首次调用这个方法是数组的第一个元素,若是数组的第一个元素为null,那么就说明没有找到对应的Cell;
对于取余运算,举个简单的例子吧,我也有点忘记了,好比随机数9要对4进行取余,咱们能够9&(4-1)=9&3=1001&0011=1,利用位运算取余了解一下;
如今咱们重点看看longAccumulate方法,代码比较长,单独提取出来看看
3.longAccumulate方法
//此方法是对Cell数组的初始化和扩容,注意有个形参LongBinaryOperator,这是JDK8新增的函数式编程的接口,函数签名为(T,T)->T,这里传进来的是null final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; //初始化当前线程的threadLocalRandomProbd的值,也就是生成一个随机数 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; //这里表示初始化完毕了 if ((as = cells) != null && (n = as.length) > 0) { //这里表示随机数和数组大小取余,获得的结果就是当前线程要匹配到的Cell元素的索引,若是索引对应在Cell数组中的元素为null,就新增一个Cell对象扔进去 if ((a = as[(n - 1) & h]) == null) { //cellsBusy为0,表示当前Cell没有进行扩容、初始化操做或者正在建立Cell等操做,那么当前线程能够对这个Cell数组随心所欲 if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create //看下面的Cell数组初始化,说的很清楚,主要是设置cellsBusy为1,而后将当前线程匹配到的Cell设置为新建立的Cell对象 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { //将cellsBusy重置为0,表示此时其余线程又能够对Cell数组随心所欲了 cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //Cell元素存在就执行CAS更新Cell中的值,这里fn是形参为null else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //当Cell数组元素个数大于CPU的个数 else if (n >= NCPU || cells != as) collide = false; // At max size or stale //是否有冲突 else if (!collide) collide = true; //扩容Cell数组,和上面两个else if一块儿看 //若是当前Cell数组元素没有达到CPU个数并且有冲突就新型扩容,扩容的数量是原来的两倍Cell[] rs = new Cell[n << 1];,为何要和CPU个数比较呢? //由于当Cell数组元素和CPU个数相同的时候,效率是最高的,由于每个线程都是一个CPU来执行,再来修改其中其中一个Cell中的值 //这里仍是利用cellsBusy这个字段,在下面初始化Cell数组中的用法同样,就很少说了 else if (cellsBusy == 0 && casCellsBusy()) { try { //这里就是新建一个数组是原来的两倍,而后将原来数组的元素复制到新的数组,再改变原来的cells的引用指向新的数组 if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //使用完就重置为0 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //这里的做用是当线程找了很久,发现全部Cell个数已经和CPU个数相同了,而后匹配到的Cell正在被其余线程使用 //因而为了找到一个空闲的Cell,因而要从新计算hash值 h = advanceProbe(h); } //初始化Cell数组 //记得上面好像说过cellsBusy这个字段是能是0或者是1,当时0的时候,说明Cell数组没有初始化和扩容,也没有正在建立Cell元素, //反之则为1,而casCellsBusy()方法就是用CAS将cellsBusy的值从0修改成1,表示当前线程正在初始化Cell数组,其余线程就不能进行扩容操做了 //若是一个线程在初始化这个Cell数组,其余线程在扩容的时候,看上面扩容,也会执行casCellsBusy()方法进行CAS操做,会失败,由于指望的值是1,而不是0 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { //这里首先新建一个容量为2的数组,而后用随机数h&1,也就是随机数对数组的容量取余的方式获得索引,而后初始化数组中每一个Cell元素 Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //初始化完成以后要把这个字段重置为0,表示此时其余线程就又能够对这个Cell进行扩容了 cellsBusy = 0; } if (init) break; }
//将base更新为base+x,表示base会逐渐累加Cell数组中每个Cell中的值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
其实longAccumulate方法就是表示多线程的时候对Cell数组的初始化,添加Cell元素还有扩容操做,还有就是当一个线程匹配到了Cell元素,发现其余线程正在使用就会从新计算随机数,而后继续匹配其余的Cell元素去了,没什么特别难的吧!别看这个方法很长,就是作这几个操做
六.总结
这一篇核心就是CAS,咱们简单的说了一下原子操做类AtomicLong的自增,可是当线程不少的状况下,使用CAS有很大的缺点,就是同一时间是会有一个线程在执行,其余全部线程都在自旋,自旋会消耗性能,因而可使用JDK提供的一个LongAdder类代替,这个类的做用就是将AtomicLong中的值优化为了一个base和一个Cell数组,多线程去竞争的时候,假设线程个数个CPU个数相同,那么此时每个线程都有单独的一个CPU去运行,而后单独的匹配到Cell数组中的某个元素,若是没有匹配到那么会对这个Cell数组进行初始化操做;若是匹配到的Cell数组中的元素正在使用,那么久判断是否能够新建一个Cell丢数组里面去,若是数组已经满了,并且数组数量小于CPU个数,那么久进行扩容;扩容结束后,仍是匹配到的Cell数组中的位置正在使用,那么就是冲突,就会从新计算,经过一个新的随机数和数组的取余,获得一个新的索引,再去访问该对应的Cell数组的位置。。。。
仔细看看仍是挺有意思的啊!