并发 - CAS 的操做、实现、原理及优化

简介

在 Java 中不少工具类都在使用 CAS(Compare And Set)用以提高并发的效率以及数据的准确性质。java

  • concurrent 和 concurrent.atomic 下面的不少 AtomicInteger 等类
  • concurrent.locks 包下面的 ReentrantLock 、WriteLock 等
  • 其它

对于大部分人来讲,最多见的应该就是使用 AtomicXXX、以及在使用 Lock 相关的子类 的时候咱们知道他们的底层运用了 CAS,也知道 CAS 就是传入一个更新前得期待值(expect)和一个须要更新的值(update),若是知足要求那么执行更新,不然的话就算执行失败,来达到数据的原子性。linux

咱们知道 CAS 确定用某一种方式在底层保证了数据的原子性,它的好处是c++

  • 没必要作同步阻塞的挂起以及唤醒线程这样大量的开销
  • 将保证数据原子性的这个操做交给了底层硬件性能远远高于作同步阻塞挂起、唤醒等操做,因此它的并发性更好
  • 能够根据 CAS 返回的状态决定后续操做来达到数据的一致性,好比 increment 失败那就一值循环直到成功为止(下文会讲)等等

首先来看一个错误的 increment()

private int value = 0;

    public static void main(String[] args) {
        Test test = new Test();
        test.increment();
        System.out.println("期待值:" + 100 * 100 + ",最终结果值:" + test.value);
    }

    private void increment() {
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value++;
                }
            }).start();
        }
    }
复制代码

输出:期待值:10000,最终结果值:9900windows

能够发现输出的结果值错误,这是由于 value++ 不是一个原子操做,它将 value++ 拆分红了 3 个步骤 load、add、store,多线程并发有可能上一个线程 add 事后尚未 store 下一个线程又执行了 load 了这种重复形成获得的结果可能比最终值要小。数组

固然在这里加 volatile int value 也是没有用的由于 32 位的 int 操做自己就是原子的,并且 volatile 也没有办法让这 3 个操做原子性执行,它只能禁止某个指令重排序来保证其对应的内存可见,若是是 long 等 64 位操做类型的能够加上 volatile,由于在 32 位的机器上写操做可能会被分配到不一样的总线事务上去操做(能够想象成分红了 2 步操做,第一步操做前 32 位后一步操做后 32 位),而总线事务的执行是由总线仲裁决定的不能保证它的执行顺序(至关于前者加了 32 位可能就切换到其它的地方执行了,好比直接就读取了,那么数据的读取就只读取到了写入一半的值)安全

使用 CAS 来保证 increment() 正确

咱们知道关于 CAS 的操做基本上都封装在 Unsafe 这个包里面,可是因为 Unsafe 不容许咱们外部使用,它认为这是一个不安全的操做,好比若是直接使用 Unsafe unsafe = Unsafe.getUnsafe(); 就会抛出 Exception in thread "main" java.lang.SecurityException: Unsafe服务器

咱们查看下源代码,原来是由于它作了校验多线程

public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }
复制代码

因此咱们能够经过反射来调用它(固然实际操做中不建议这么使用,此处为了演示方便)并发

public class Test {

    // value 的内存地址,便于直接找到 value
    private static long valueOffset = 0;

    {
        try {
            // 这个内存地址是和 value 这个成员变量的值绑定在一块儿的
            valueOffset = getUnsafe().objectFieldOffset
                (Test.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    private int value;

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        Test test = new Test();
        test.increment();
    }

    private void increment() throws NoSuchFieldException, IllegalAccessException {
        Unsafe unsafe = getUnsafe();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    unsafe.getAndAddInt(this, valueOffset, 1);
                }
            }).start();
        }
        System.out.println("须要获得的结果为: " + 100 * 1000);
        System.out.println("实际获得的结果为: " + value);
    }

    // 反射获取 Unsafe
    private Unsafe getUnsafe() throws NoSuchFieldException, IllegalAccessException {
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        return (Unsafe) field.get(null);
    }
}
复制代码

这下咱们就能从输出中看到结果是正确的了app

CAS 底层的实现原理

咱们继续探讨, getAndAddInt 调用了 unsafe.compareAndSwapInt(Object obj, long valueOffset, int expect, int update) 这个方法在 Hotspot 究竟是如何实现的,咱们发现调用的是 native 的 unsafe.compareAndSwapInt(Object obj, long valueOffset, int expect, int update),咱们翻看 Hotspot 源码发如今 unsafe.cpp 中定义了这样一段代码

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, 
jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
复制代码

从中咱们能够看到它是使用了 Atomic::cmpxchg(x, addr, e) 这个操做来完成的,在不一样的底层硬件会有不同的代码 Hotspot 向上帮咱们屏蔽了细节。这个实现方法在 solaris,windows,linux_x86 等都有不同的实现方法,咱们用咱们最多见的服务器 linux_x86 来讲,它的实现代码以下

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint*  dest, 
jint compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
复制代码

从以上代码能够看出几点

  • Hotspot 直接调用底层汇编来实现对应的功能
  • __asm__ 表示的是后续是一段汇编代码
  • volatile 此处的 volatile 和 Java 中的有些区别,这里使用用以告诉编译器再也不对这段代码进行汇编优化
  • LOCK_IF_MP 表示的是若是操做系统是多核的那么就须要加锁来保证其原子性
  • cmpxchgl 就是汇编中的比较而且交换

从这里就能看出来,CAS 底层也是在用锁来保证其原子性的。在 Intel 早期的实现中是直接将总线锁住,这样致使其它没有得到总线事务访问权的处理器没法执行后续的操做,性能会极大的下降。

后续 Intel 对其进行了优化升级,在 x86 处理器中能够只须要锁定 特定的内存地址,那么其它处理器也就能够继续使用总线来访问内存数据了,只不过是若是其它总线也要访问被锁住的内存地址数据时会阻塞而已,这样来大幅度的提高了性能。

可是思考一下如下几点问题的

  1. 并发量很是高,可能致使都在不停的争抢该值,可能致使不少线程一致处于循环状态而没法更新数据,从而致使 CPU 资源的消耗太高
  2. ABA 问题,好比说上一个线程增长了某个值,又改变了某个值,而后后面的线程觉得数据没有发生过变化,其实已经被改动了

JAVA8 对于 CAS 的优化

固然 ABA 的问题可使用增长版本号来控制,每次操做版本号 + 1,版本号变动了说明值就被改过一次了,在 Java 中 AtomicStampedReference 这个类提供了这种问题的解决方案。

而对于说第一个问题来讲在 Java8 中也有了对应的优化,Java 8 中提供了一些新的工具类用以解决这种问题,以下

咱们挑一个来看,其它都是相似的

能够看到他是能够序列化的,而且必须是 Number 类型的,继承 Striped64 可以支持动态的分段

它的原理主要采用CAS分段机制与自动分段迁移机制,最开始是在 base 上面进行 CAS 操做,后续并发线程过多,那么就将这大量的线程分配到 cells 数组中去,每一个数组的线程单独去执行累加操做,最终再合并结果

图来自【基础巩固篇】Java 8中对CAS的优化

总结

能够看到跟作直接作同步挂起或者唤醒线程相好比果可以合理的使用 CAS 进行操做的话或者是将其两者合并使用,那么在并发性能上可以提高一个量级

  • 对于像 ReentrantLock 之类的都是使用的将同步阻塞 + CAS 这种方式来实现高性能的锁,好比 ReentrantLock 中 tryAcuqire() 若是使用 CAS 未能获取到对应的锁,那么就将其放入阻塞队列,等待后续的唤醒
  • 好比自旋锁在指定的次数经过 CAS 都未能获取到锁的话就挂起进入阻塞队列等待被唤醒
  • 好比使用 AtomicInteger 进行自增的时候就会一值不停的轮询判断更新,直到操做成功为止
  • 使用轮询 CAS 处理而不嵌入阻塞挂起和唤醒的话,它的优点就是在于可以快速响应用户请求减小资源消耗,由于线程的挂起和唤醒涉及到用户态内核态的调用又涉及到线程“快照”数据的相关保存,对于响应和资源消耗是又慢又高,不过咱们也须要考虑在 CPU 轮询上的开销,因此能够将两者必定程度上的融合在一块儿使用。
  • 因此理解 CAS 仍是很是重要的

参考: JAVA 中的 CAS

相关文章
相关标签/搜索