并发包学习(一)-Atomic包小记

此篇是J.U.C学习的第一篇Atomic包相关的内容,但愿此篇总结能对本身的基础有所提高。本文总结来源自《Java并发编程的艺术》第七章并配以本身的实践理解。若有错误还请指正。java

1、案例分析

首先看两段代码:算法

代码①:编程

/**
 * @author laoyeye
 * @Description: 5000个线程,200个并发
 * @date 2018/8/16 21:58
 */
public class IntTest {
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(count);
    }

    private static void add() {
        count++;
    }

5000个线程200个并发的状况下,对一个共享变量进行++操做。数组

结果:4997安全

代码②:多线程

public class AtomicIntegerTest {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(count);
    }

    private static void add() {
        count.incrementAndGet();
    }
}

5000个线程200个并发的状况下,一样进行每次加一操做。并发

结果:5000。和预期的结果同样app

那么为何AtomicInteger能够获得预期的结果,而使用基本数据类型Int的值却不对呢?函数

主要是原子性的问题,Int的操做,在多线程的状况下并不保证原子性,而AtomicInteger则是一个JDK提供的一个原子操做类,具体AtomicInteger怎么实现的原子性能够看下文。高并发

2、Atomic相关概念

java从JDK1.5开始提供java.util.concurrent.atomic包,即本文所述的Atomic包。这个包的原子操做类提供了一个简单,高效,线程安全地更新一个变量的方式。

由于变量的类型不少,Atomic包基本上分为四种类型的更新方式,分别是原子更新基本类型,原子更新数组,原子更新引用和原子更新属性(字段)。Atomic包的类基本上都是使用Unsafe实现的包装类。 Unsafe 类提供了硬件级别的原子操做,能够安全的直接操做内存变量,其在 JUC 源码中被普遍的使用。

3、原子更新基本类型

一、AtomicBoolen:原子更新布尔类型。

二、AtomicInteger:原子更新整型。

三、AtomicLong:原子更新整型。

AtomicInteger详解

一样以一种的代码②为例,为何AtomicInteger的incrementAndGet()方法保证了原子性的操做呢,咱们来看一下源码的实现:

源码①:

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

首先咱们经过unsafe调用了它的objectFieldOffset(Field field)方法,这个方法返回指定的变量在所属类的内存偏移地址,偏移地址仅仅在该Unsafe函数中访问指定字段时使用。

源码②:

unsafe.getAndAddInt(this, valueOffset, 1) + 1;
   public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

getIntVolatile获取对象obj中偏移量offset的变量对应的volative内存语义的值,即预期的值var5。

compareAndSwapInt方法中,var1为须要改变的对象,var2为偏移量(即以前求出来的valueOffset的值),var5为expect的值,第四个为update后的值。

当value的值与expect这个值相等,那么则将value修改成update这个值,并返回true,不然返回false。

此操做极为常说的CAS原子操做,这里使用while循环是考虑到多个线程同时调用的状况CAS失败后须要自旋重试。

AtomicBoolen详解

代码③

public class AtomicBooleanTest {

        // 请求总数
        public static int clientTotal = 5000;

        // 同时并发执行的线程数
        public static int threadTotal = 200;

        public static AtomicBoolean isHappened = new AtomicBoolean(false);

        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        test();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(isHappened.get());
        }

        private static void test() {
            if (isHappened.compareAndSet(false, true)) {
                System.out.println("execute");
            }
        }
    }

执行结果:

execute
true

经过结果可知System.out.println("execute");的代码只执行过一次,200的并发,为何只执行了一次呢,咱们再来看下源码的解决办法。

源码③

    public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }

咱们看到当调用compareAndSet方法时,先把Boolean型转换为整型,在使用compareAndSwapInt进行CAS。因此即便在200并发的状况下,AtomicBoolen依旧可以保持原子性。

经过上面两个类的讲解咱们看到都是使用的compareAndSwapInt的方法,unsafe类还提供了compareAndSwapLong,用于AtomicLong,以及compareAndSwapObject方法。而像char,float,double等数据类型没有对应的原子操做类,这时候咱们能够参考AtomicBoolen的思路作相似处理。

 4、原子更新数组

一、AtomicIntegerArray:原子更新整型数组里的元素

二、AtomicLongArray:原子更新长整型数组里的元素

三、AtomicReferenceArray:原子更新引用类型数组里的元素

这里咱们只介绍下AtomicIntegerArray,基本操做相似。

代码④

public class AtomicIntegerArrayTest {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    static int[] value = new int[]{1,2};

    public static AtomicIntegerArray ai = new AtomicIntegerArray(value);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(ai.get(0));
        System.out.println(value[0]);
    }

    private static void test() {
        ai.getAndSet(0,3);
    }
}

结果:3,1

为何是3和1呢,一样的咱们从源码中找答案。

源码④:

    public final int getAndSet(int i, int newValue) {
        return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
    }
    public final int getAndSetInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var4));

        return var5;
    }

一样的原理,当前位置的数组value的值和预期的值相等,而后将对应的元素更新为新的值。可是须要注意的是,AtomicIntegerArray会将当前数组复制一份,因此当AtomicIntegerArray对内部的数组元素进行修改后,不会影响到原先的数组。

5、原子更新引用类型

一、AtomicReference:原子更新引用类型

二、AtomicStampedReference:更新带有版本号的引用类型,可解决CAS的ABA问题

三、AtomicMarkableReference:原子更新带有标记位的引用类型

 原子更新基本类型每次只能更新一个变量,若是要原子更新更多变量,这时候就须要引用类型了。

代码⑤

public class AtomicReferenceTest {
    public static void main(String[] args) {

        User user1 = new User("张三",12);
        User user2 = new User("lisi",20);

        AtomicReference<User> ar = new AtomicReference<User>();
        ar.set(user1);
        ar.compareAndSet(user1, user2);

        System.out.println("user " + ar.get().getName());
    }


static class User {
    private String name;
    private int old;

    public String getName() {
        return name;
    }

    public int getOld() {
        return old;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setOld(int old) {
        this.old = old;
    }

    public User(String name, int old) {
        this.name = name;
        this.old = old;
    }
}
}

结果:user lisi

能够看到结果已经原子更新为lisi了,年龄也同步更新。

代码⑥

public class AtomicMarkableReferenceTest {
    public static void main(String[] args) {


        User user1 = new User("张三",12);
        User user2 = new User("lisi",20);

        AtomicStampedReference ar = new AtomicStampedReference(user1,0);

        final  Integer stamp = ar.getStamp();

        ar.compareAndSet(user1, user2,stamp,stamp+10);

        System.out.println("user " + ((User)ar.getReference()).getName());
        System.out.println("user " + ar.getStamp());

        System.out.println( ar.compareAndSet(user1, user2, stamp,stamp+10));
    }


static class User {
    private String name;
    private int old;

    public String getName() {
        return name;
    }

    public int getOld() {
        return old;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setOld(int old) {
        this.old = old;
    }

    public User(String name, int old) {
        this.name = name;
        this.old = old;
    }
}
}

结果:

user lisi
user 10
false

能够看到咱们在作了原子更新后,版本号也作了改变,这时候若是还用原来的版本号去更新,就会出现更新失败的状况。

AtomicMarkableReference跟AtomicStampedReference相似 
AtomicStampedReference是使用pair的int stamp做为计数器使用,AtomicMarkableReference的pair使用的是boolean mark。 
就像一杯水,AtomicStampedReference可能关心的是动过几回,AtomicMarkableReference关心的是有没有被人动过,方法都比较简单,不在演示了。

6、原子更新字段类

一、AtomicIntegerFieldUpdater:更新整型字段

二、AtomicLongFieldUpdater:更新长整型字段

三、AtomicReferenceFieldUpdater:原子更新引用类型里的字段

public class AtomicIntegerFieldUpdaterTest {

    private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class, "count");

    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicIntegerFieldUpdaterTest ai = new AtomicIntegerFieldUpdaterTest();

        if (updater.compareAndSet(ai, 100, 120)) {
            System.out.println("方法1,"+ai.getCount());
        }

        if (updater.compareAndSet(ai, 100, 120)) {
            System.out.println("方法2,"+ai.getCount());
        } else {
            System.out.println("方法3,"+ai.getCount());
        }
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

结果:

方法1,120
方法3,120

 原子更新字段类须要两部,①必须使用静态方法newupdate()建立一个更新器,而且设置想要更新的类和属性。第二步,更新类的字段属性必须使用public volatile修饰

 7、1.8新增的LongAdder相关类

这个类是1.8新增的一个类,为何在已经有AtomicLong的状况下,仍是增长了这个类呢?

这主要是因为AtomicLong CAS算法的缺陷形成的,众所周知,CAS是比较当前值与预期的值是否相等,相等则更新为新的值,不然从新自旋取值。这就形成了CAS在高并发状况性下大量失败,性能较低的状况。

既然AtomicLong性能问题是因为过多线程同时去竞争同一个变量的更新而下降的,那么若是把一个变量分解为多个变量,让一样多的线程去竞争多个资源,那么性能问题不就迎刃而解了吗?

没错,所以,JDK8 提供的LongAdder就是这个思路。这个类我目前只在网上了解到原理,还未应用也不了解源码实现,等之后再更新吧。

下文来自简书:https://www.jianshu.com/p/22d38d5c8c2a

总结分析下LongAdder减小冲突的方法以及在求和场景下比AtomicLong更高效的缘由

  • 首先和AtomicLong同样,都会先采用cas方式更新值
  • 在初次cas方式失败的状况下(一般证实多个线程同时想更新这个值),尝试将这个值分隔成多个cell(sum的时候求和就好),让这些竞争的线程只管更新本身所属的cell(由于在rehash以前,每一个线程中存储的hashcode不会变,因此每次都应该会找到同一个cell),这样就将竞争压力分散了

AtomicLong能否能够被LongAdder替代

有了传说中更高效的LongAdder,那AtomicLong能否不使用了呢?固然不是!

答案就在LongAdder的java doc中,从咱们翻译的那段能够看出,LongAdder适合的场景是统计求和计数的场景,并且LongAdder基本只提供了add方法,而AtomicLong还具备cas方法(要使用cas,在不直接使用unsafe以外只能借助AtomicXXX了),,例如getAndIncrement、getAndDecrement等,使用起来很是的灵活,而LongAdder只有add和sum,使用起来比较受限。

相关文章
相关标签/搜索