【并发编程】对线程安全性的理解(Atomic、sync、volatile、Lock)

线程安全性

阅读前须要对JVM内存模型有必定了解。 java

定义算法

当多个线程访问某个类时,无论运行时环境采用何种调度方式或者这些进程将如何交替执行,而且在主调代码中不须要任何额外的同步或协同,这个类都能表现出正确的行为,这个类就是线程安全的。数组

原子性-Atomic包

定义安全

提供互斥访问,同一时刻只能有一个线程对它进行操做。bash

AtomicXXX:CAS、Unsafe.compareAndSwapInt

计数测试多线程

@Slf4j
public class CountExample2 {

    /** * 请求总数 */
    public static int clientTotal = 5000;
    /** * 同时并发执行线程数 */
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add(){
        count.incrementAndGet();
    }
}
复制代码

执行结果:并发

屡次执行结果始终是5000,由此咱们能够认为这个类是线程安全的。app

由线程不安全到线程安全咱们只是把countint改为了AtomicInteger,为了找到具体缘由咱们来看AtomicInteger的源码。高并发

找到incrementAndGet方法性能

/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
复制代码

incrementAndGet方法实现中使用了一个unsafe的类并调用了其getAndAddInt方法,咱们点进这个方法看一下它的实现

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
复制代码

这个方法里使用了一个do-while语句,while的判断条件调用了compareAndSwapInt方法,咱们进入这个方法看一下

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
复制代码

能够看到这个方法是native标识的方法,表示是Java底层的方法,不是用Java实现的。

如今回来看getAndAddInt方法,首先传入的第一个值var1是一个对象,如计数测试中的count;第二个值var2是当前的值,如执行2 + 1这个操做,那么当前var2就是2, 第三个参数var4就是1.

接下来看方法内部,var5是提供调用底层方法获得的底层当前的值,若是没有其余线程过来处理var1这个变量时,var5的正常返回值应该是2(在上述例子的背景下),所以传到compareAndSwapInt方法中的参数分别是:count对象,当前值2,当前从底层传过来的2,从底层取出的值+增长量(这里是1)。这个方法但愿达到的目标是对于count这个对象,若是当前的值与底层的值相同的话就把它更新成var5 + var4的值。因为传入的var2var4可能会被其余线程更改,所以这里要判断当前的var2和当前底层var5是否相等。经过这样不停地循环判断来实现指望的值与底层值彻底相同的时候才执行+1的操做覆盖底层值。

compareAndSwapInt方法的核心思想就是CAS的核心。

AtomicLong、LongAdder

public static AtomicLong count = new AtomicLong(0);
复制代码

count的类型改为AtomicLong,执行几回发现结果跟上面同样。

LongAdderJDK8中新增的一个类,下面来使用一下

@Slf4j
@ThreadSafe
public class AtomicExample3 {

    /** * 请求总数 */
    public static int clientTotal = 5000;
    /** * 同时并发执行线程数 */
    public static int threadTotal = 200;

    public static LongAdder count = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        count.increment();
    }
}
复制代码

屡次运行测试后,咱们能够发现这个类是线程安全的。

AtomicLong与LongAdder对比

CAS的底层实现咱们知道AtomicLong是在一个死循环中不断尝试修改目标值,直到修改为功,在竞争不激烈时修改为功的几率很高,可是竞争激烈时修改失败的几率也会很高,在大量修改失败时就会进行屡次的循环尝试,所以性能会收到影响。对于普通类型的longdouble变量JVM容许将64位的读写操做拆分红两个32位的操做。

LongAdder的核心是将热点数据分离,如将AtomicLong的内部核心数据value分离成一个数组,每一个线程访问时经过哈希等算法映射到其中一个数字进行计数,最终的计数结果为这个数组的求和累加。其中热点数据value会被分离成多个单元的cell,每一个cell独自维护内部的值,当前对象的实际值由全部cell累计合成。

这样热点就实现了有效分离并提升了并行度,LongAdder就至关于在AtomicLong的基础上把单点的更新压力分散到各个节点上,在低并发时经过对base的直接更新能够很好地保证和Atomic的性能基本一致;在高并发时经过分散提升了性能。

可是LongAdder也有缺点,在统计时若是有并发更新可能会致使统计的数据有些偏差。

在线程竞争很低的时候使用LongAdder仍是更简单,效率稍高一点。

在须要准确的数值如序列号生成的时候就须要AtomicLong来保证准确性。

AtomicReference、AtomicReferenceFieldUpdater

AtomicReference

AtomicReferenceAtomicInteger很是相似,不一样之处就在于AtomicInteger是对整数的封装,底层采用的是compareAndSwapInt实现CAS,比较的是数值是否相等,而AtomicReference则对应普通的对象引用,底层使用的是compareAndSwapObject实现CAS,比较的是两个对象的地址是否相等。也就是它能够保证你在修改对象引用时的线程安全性。

引用类型的赋值是原子的。虽然虚拟机规范中说64位操做能够不是原子性的,能够分为两个32位的原子操做,可是目前商用的虚拟机几乎都实现了64位原子操做。

首先咱们来看一下AtomicReference源码中的compareAndSet方法

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
复制代码

能够看到这个方法的底层也是使用CAS实现,这个方法的做用是当当前值与第一个参数值相等时将其更新为第二个参数的值。

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2);// 2
        count.compareAndSet(0, 1);// no
        count.compareAndSet(1, 3);// no
        count.compareAndSet(2, 4);// 4
        count.compareAndSet(3, 5);// no
        log.info("count:{}", count);
    }
}
复制代码

执行结果:

执行过程当中count的值已在注释中标出。

AtomicReferenceFieldUpdater

这里以AtomicIntegerFieldUpdater为例

@Slf4j
@ThreadSafe
public class AtomicExample5 {

    @Getter
    public volatile int count = 100;

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater
            .newUpdater(AtomicExample5.class, "count");

    public static void main(String[] args) {
        AtomicExample5 example5 = new AtomicExample5();

        if (updater.compareAndSet(example5, 100, 120)){
            log.info("update success, {}", example5.getCount());
        }
    }
}
复制代码

AtomicStampReference: CAS 的 ABA 问题

ABA问题:指在CAS操做的时候其余线程将变量值A改为了B可是又改回了A,当线程使用指望值A与当前变量比较的时候发现当前变量没有变,因而CAS就将A值进行了交换操做。

解决思路:每次变量更新时把变量版本号加1.

看一下AtomicStampReference源码中是怎么实现的

/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}
复制代码

这里的compareAndSet方法与以前的区别是加入了stamp值的比较,用法与以前相同。

下面来看AtomicBoolean类中的compareAndSet方法

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
复制代码

根据这个方法写一个例子

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    /** * 请求总数 */
    public static int clientTotal = 5000;
    /** * 同时并发执行线程数 */
    public static int threadTotal = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)){
            log.info("execute");
        }
    }

}
复制代码

输出结果

由结果能够知道尽管循环执行了5000次可是日志只输出了1次,缘由是compareAndSet是原子性操做,它能保证从false变成true只会执行一次。

这个方法可让一段代码只执行一次,不会重复执行。

能保证同一时间只有一个线程进行操做的除了Atomic包以外还有锁。

Java中的锁主要有如下两种:

1.synchronized:依赖JVM

2.Lock:依赖特殊的CPU指令,代码实现。
复制代码

synchronized

修饰对象:

1.代码块:做用范围大括号括起来的代码,做用于调用的对象。

2.方法:做用范围整个方法,做用于调用的对象,称为同步方法。

3.静态方法:做用范围整个静态方法,做用于全部对象。

4.类:做用范围括号括起来的部分,做用于全部对象。
复制代码

测试修饰代码块

@Slf4j
public class SynchronizedExample1 {

    /** * 修饰一个代码块 */
    public void test1(int j){
        synchronized (this){
            for (int i = 0; i < 10; i++){
                log.info("test1 {} - {}", j, i);
            }
        }
    }

    /** * 修饰一个方法 */
    public synchronized void test2(){
        for (int i = 0; i < 10; i++){
            log.info("test2 - {}", i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            example1.test1(1);
        });
        executorService.execute(() -> {
            example2.test1(2);
        });
    }
}
复制代码

运行结果

这个结果就验证了同步代码块做用于当前对象,不一样对象间是互不影响的。

测试修饰方法

public static void main(String[] args) {
    SynchronizedExample1 example1 = new SynchronizedExample1();
    SynchronizedExample1 example2 = new SynchronizedExample1();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test2(1);
    });
    executorService.execute(() -> {
        example2.test2(2);
    });
}
复制代码

运行结果于上面类似,代表修饰方法时也是做用于调用对象的,不一样对象间互不影响。

注意:当子类继承父类时,父类中带synchronized的方法在子类中不能带synchronized。若是子类也想使用synchronized,则须要在方法上显式声明synchronized

测试修饰静态方法

/** * 修饰一个静态方法 */
public static synchronized void test2(int j){
    for (int i = 0; i < 10; i++){
        log.info("test2 {} - {}", j, i);
    }
}

public static void main(String[] args) {
    SynchronizedExample2 example1 = new SynchronizedExample2();
    SynchronizedExample2 example2 = new SynchronizedExample2();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test2(1);
    });
    executorService.execute(() -> {
        example2.test2(2);
    });
}
复制代码

运行结果

从这个结果能够知道修饰静态方法时是做用于所有对象的,即一个对象执行完后才能执行第二个,不能同步进行。

测试修饰类

/** * 修饰一个类 */
public static void test1(int j){
    synchronized (SynchronizedExample2.class){
        for (int i = 0; i < 10; i++){
            log.info("test1 {} - {}", j, i);
        }
    }
}

public static void main(String[] args) {
    SynchronizedExample2 example1 = new SynchronizedExample2();
    SynchronizedExample2 example2 = new SynchronizedExample2();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test1(1);
    });
    executorService.execute(() -> {
        example2.test1(2);
    });
}
复制代码

运行结果于上面相同,于预期一致。

使用synchronized保证计数安全

只需在以前的add方法前加上synchronized修饰便可。

private synchronized static void add(){
    count++;
}
复制代码

执行结果始终是5000

原子性-对比

  • synchronized:不可中断锁,适合竞争不激烈,可读性好。

  • Lock:可中断锁,多样化同步,竞争激烈时能维持常态。

  • Atomic:竞争激烈时能维持常态,比Lock性能好,但只能同步一个值。

可见性

定义:一个线程对主内存的修改能够及时地被其余线程观察到。

致使共享变量在线程间不可见的缘由:

  • 1.线程交叉执行

  • 2.重排序结合线程交叉执行

  • 3.共享变量更新后的值没有在工做内存与主内存间及时更新

可见性 - synchronized

JMM关于synchronized的两条规定:

  • 1.线程解锁前,必须把共享变量的最新值刷新到主内存。

  • 2.线程加锁时,将清空工做内存中共享变量的值,从而使用共享变量时须要从主内存中从新读取最新值。

可见性 - volatile

经过加入内存屏障和禁止重排序优化来实现。

实现方法:

对volatile变量写操做时,会在写操做后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存。

对volatile变量读操做时,会在读操做前加入一条load屏障指令,从主内存中读取共享变量。

示意图:

这些过程都是在CPU指令级别进行操做,咱们在使用时直接使用volatile修饰须要的地方便可。

使用条件:

  • 1.对变量的写操做不依赖于当前值。

  • 2.该变量没有包含在具备其余变量的不变的式子中。

有序性

定义

Java内存模型中容许编译器和处理器对指令进行重排序,可是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

happens-before原则

来自《深刻理解Java虚拟机》

  • 1.程序次序原则:一个线程内,按照代码顺序,书写在前面的操做先行发生于书写在后面的操做。可能会发生重排序,可是重排序不会影响到最终结果,所以看起来仍是顺序执行的。

  • 2.锁定规则:一个unLock操做先行发生于后面对同一个锁的lock操做。

  • 3.volatile变量规则:对一个变量的写操做先行发生于后面对这个变量的读操做。

  • 4.传递规则:若是操做A先行发生于B,而操做B又先行发生于C,则能够得出操做A先行发生于C.

  • 5.线程启动规则:Thread对象的start()方法先行发生于此线程的每个动做。

  • 6.线程中断规则:对线程的interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生。

  • 7.线程终结规则:线程中全部的操做都先行发生于线程的终止检测,咱们能够经过Thread.join()方法结束、Thread.isAlive()的返回值手段检测线程是否已终止运行。

  • 8.对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始。

若是两个操做的执行次序没法从happens-before原则中推导出来就不能保证它们的有序性,虚拟机就能够随意地对它们进行重排序。

Written by Autu.

2019.7.11

相关文章
相关标签/搜索