什么是CAS
全称:Compare and Set ,也就是先比较再设置的意思
也称为:Compare and swap,先比较并交换
流程
并发包(j.u.c)下的应用示例
以原子包下的AtomicInteger类进行源码分析CAS
//建立一个原子类的对象,并进行++ 操做,从0开始
AtomicInteger atomicInteger = new AtomicInteger();
atomicInteger.incrementAndGet();
复制代码
看看内部进行了哪些操做
AtomicInteger 内部的结构
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// CAS的核心类,因为java方法没法直接访问底层系统,须要经过本地(native)方法来访问
//Unsafe至关于一个后门,基于该类能够直接操做特定内存的数据
private static final Unsafe unsafe = Unsafe.getUnsafe();
//表示该变量值在内存中的偏移地址,由于Unsafe就是根据内存偏移地址获取数据的
//偏移量的理解:内存中存数数据的方式:一个存储数据的 实际地址=段首地址+偏移量
//对应的现实中 家庭地址= 小区地址+门牌号
private static final long valueOffset;
//使用volatile保证了多线程之间的内存可见性
private volatile int value;
// 建立对象的时候 就会将valueOffset的值获取到
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value" ));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* 有参构造
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* 无参构造
*/
public AtomicInteger () {
}
复制代码
/**
*
*/
public final int incrementAndGet () {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/**
*
* @param var1 : AtomicInteger对象
* @param var2 : valueOffset
* @param var4 : 固定值 1
* @Description :将value进行自增,而且返回自增的值
* @Author Licy
* @Date 2019/6/14 20:03
* @return int
*
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
//能够当作compareAndSwapInt(obj, offset, expect, update)
} while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
复制代码
具体流程
实际存储的值是放在value中的
类加载的时候,还获取了unsafe实例
定义了valueOffset,而且在静态代码块中初始化了改值,当静态代码块执行的时候,获取的就是value的偏移量
getAndAddInt函数中 var5获取的就是valueOffset所表明的具体的值,也就是value的值
compareAndSwapInt函数的意思:若是obj内的value和expect相等,就证实没有其余线程改变过这个变量,那么久更新他为update,若是这一步CAS没有成功,那就采用自旋的方式继续进行CAS操做
比较和设置乍一看是两个步骤,其实在JNI里面借助了cpu的指令完成,保证其原子问题
底层原理
CAS底层使用JNI调用C代码实现的,若是你有Hotspot源码,那么在Unsafe.cpp里能够找到它的实现:
所产生的问题
一、ABA的问题
CAS须要在操做值的时候检查下值有没有发生变化,若是没有发生变化则更新,可是若是一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,可是实际上却变化了。这就是CAS的ABA问题
常见的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
目前在JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法做用是首先检查当前引用是否等于预期引用,而且当前标志是否等于预期标志,若是所有相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
二、循环时间开销大
若是CAS不成功,则会原地自旋,若是长时间自旋会给CPU带来很是大的执行开销。
不过 在jdk8中已经解决此问题
Java 8对CAS机制的优化
java.util.concurrency.atomic.LongAdder
大致的流程
public class LongAdder extends Striped64 implements Serializable {
//.....
}
abstract class Striped64 extends Number {
/**
*
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
//
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value" ));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
复制代码
LongAdder继承了Striped64类
Striped64类中维护了一个懒加载数据Cell[]和一个额外的base实例域
数据的大小是2的N次方,使用每一个线程Thread内部的哈希值访问
使用注解Contended修饰是解决false sharing(伪共享的问题),解决不一样操做系统位数缓存行大小不同,防止cell数据发生伪共享的状况
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder () {
}
public void add(long x) {
// as是Striped64中的cells属性
// b是Striped64中的base属性
// v是当前线程hash 到的Cell中存储的值
// m是cells的长度减1,hash 时做为掩码使用
// a是当前线程hash 到的Cell
Cell[] as; long b, v; int m; Cell a;
// 条件1:cells不为空,说明出现过竞争,cells已经建立
// 条件2:cas操做base失败,说明其它线程先一步修改了base,正在出现竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true 表示当前竞争还不激烈
// false 表示竞争激烈,多个线程hash 到同一个Cell,可能要扩容
boolean uncontended = true ;
// 条件1:cells为空,说明正在出现竞争,上面是从条件2过来的
// 条件2:应该不会出现
// 条件3:当前线程所在的Cell为空,说明当前线程尚未更新过Cell,应初始化一个Cell
// 条件4:更新当前线程所在的Cell失败,说明如今竞争很激烈,多个线程hash 到了同一个Cell,应扩容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是线程中的threadLocalRandomProbe字段
// 它是经过随机数生成的一个值,对于一个肯定的线程这个值是固定的
// 除非刻意修改它
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 调用Striped64中的方法处理
longAccumulate(x, null, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment () {
add(1L);
}
/**
* Equivalent to {@code add(-1)}.
*/
public void decrement () {
add(-1L);
}
复制代码
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存储线程的probe值
int h;
// 若是getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) {
// 强制初始化
ThreadLocalRandom.current(); // force initialization
// 从新获取probe值
h = getProbe();
// 都未初始化,确定还不存在竞争激烈
wasUncontended = true ;
}
// 是否发生碰撞
boolean collide = false ; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells已经初始化过
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程所在的Cell未初始化
if ((a = as[(n - 1) & h]) == null) {
// 当前无其它线程在建立或扩容cells,也没有线程在建立Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个Cell,值为当前须要增长的值
Cell r = new Cell(x); // Optimistically create
// 再次检测cellsBusy,并尝试更新它为1
// 至关于当前线程加锁
if (cellsBusy == 0 && casCellsBusy()) {
// 是否建立成功
boolean created = false ;
try { // Recheck under lock
Cell[] rs; int m, j;
// 从新获取cells,并找到当前线程hash 到cells数组中的位置
// 这里必定要从新获取cells,由于as并不在锁定范围内
// 有可能已经扩容了,这里要从新获取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置处
rs[j] = r;
// 建立成功
created = true ;
}
} finally {
// 至关于释放锁
cellsBusy = 0;
}
// 建立成功了就返回
// 值已经放在新建的Cell里面了
if (created)
break ;
continue ; // Slot is now non-empty
}
}
// 标记当前未出现冲突
collide = false ;
}
// 当前线程所在的Cell不为空,且更新失败了
// 这里简单地设为true ,至关于简单地自旋一次
// 经过下面的语句修改线程的probe再从新尝试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true ; // Continue after rehash
// 再次尝试CAS更新当前线程所在Cell的值,若是成功了就返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break ;
// 若是cells数组的长度达到了CPU核心数,或者cells扩容了
// 设置collide为false 并经过下面的语句修改线程的probe再从新尝试
else if (n >= NCPU || cells != as)
collide = false ; // At max size or stale
// 上上个elseif都更新失败了,且上个条件不成立,说明出现冲突了
else if (!collide)
collide = true ;
// 明确出现冲突了,尝试占有锁,并扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 检查是否有其它线程已经扩容过了
if (cells == as) { // Expand table unless stale
// 新数组为原数组的两倍
Cell[] rs = new Cell[n << 1];
// 把旧数组元素拷贝到新数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 从新赋值cells为新数组
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 已解决冲突
collide = false ;
// 使用扩容后的新数组从新尝试
continue ; // Retry with expanded table
}
// 更新失败或者达到了CPU核心数,从新生成probe,并重试
h = advanceProbe(h);
}
// 未初始化过cells数组,尝试占有锁并初始化cells数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化成功
boolean init = false ;
try { // Initialize table
// 检测是否有其它线程初始化过
if (cells == as) {
// 新建一个大小为2的Cell数组
Cell[] rs = new Cell[2];
// 找到当前线程hash 到数组中的位置并建立其对应的Cell
rs[h & 1] = new Cell(x);
// 赋值给cells数组
cells = rs;
// 初始化成功
init = true ;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化成功直接返回
// 由于增长的值已经同时建立到Cell中了
if (init)
break ;
}
// 若是有其它线程在初始化cells数组中,就尝试更新base
// 若是成功了就返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break ; // Fall back on using base
}
}
复制代码
首先有一个base的值,刚开始多线程来不停的累加数值,都是对base进行累加的
好比刚开始累加成了5,接着发现并发更新的线程数量过多,就会开始实行分段CAS机制
分段CSA机制就是在内部有一个Cell数组,每一个数组是一个数据的分段,这是让大量的线程分别去对不一样Cell内部的value值进行CAS的累加操做,这样就把CAS计算压力分散到了不一样的Cell分段数值中,奖励了多线程并发更新同一个数值时出现的无限循环的问题
并且该类内部也实现了自动分段迁移的机制,就是若是某个Cell的value执行CAS失败了,那么就会自动去找另一个Cell分段内的value值进行CAS操做
若是你要从LongAdder中获取当前累加的总值,就会把base值和全部Cell分段数值加起来返回给你。