AtomicInteger源码分析—基于CAS的乐观锁实现html
参考:java
http://www.importnew.com/22078.htmlgit
https://www.cnblogs.com/mantu/p/5796450.htmlgithub
http://hustpawpaw.blog.163.com/blog/static/184228324201210811243127/算法
咱们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不一样的thread/process轮流执行,时间片与时间片之间,须要进行cpu切换,也就是会发生进程的切换。切换涉及到清空寄存器,缓存数据。而后从新加载新的thread所需数据。当一个线程被挂起时,加入到阻塞队列,在必定的时间或条件下,在经过notify(),notifyAll()唤醒回来。在某个资源不可用的时候,就将cpu让出,把当前等待线程切换为阻塞状态。等到资源(好比一个共享数据)可用了,那么就将线程唤醒,让他进入runnable状态等待cpu调度。这就是典型的悲观锁的实现。独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的状况,而且只有在确保其它线程不会形成干扰的状况下执行,会致使其它全部须要锁的线程挂起,等待持有锁的线程释放锁。windows
可是,因为在进程挂起和恢复执行过程当中存在着很大的开销。当一个线程正在等待锁时,它不能作任何事,因此悲观锁有很大的缺点。举个例子,若是一个线程须要某个资源,可是这个资源的占用时间很短,当线程第一次抢占这个资源时,可能这个资源被占用,若是此时挂起这个线程,可能马上就发现资源可用,而后又须要花费很长的时间从新抢占锁,时间代价就会很是的高。缓存
因此就有了乐观锁的概念,他的核心思路就是,每次不加锁而是假设没有冲突而去完成某项操做,若是由于冲突失败就重试,直到成功为止。在上面的例子中,某个线程能够不让出cpu,而是一直while循环,若是失败就重试,直到成功为止。因此,当数据争用不严重时,乐观锁效果更好。好比CAS就是一种乐观锁思想的应用。安全
CAS就是Compare and Swap的意思,比较并操做。不少的cpu直接支持CAS指令。CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知此次竞争中失败,并能够再次尝试。CAS有3个操做数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。性能优化
JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操做,而且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令。在Java.util.concurrent.atomic包下面的全部的原子变量类型中,好比AtomicInteger,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操做。多线程
在CAS操做中,会出现ABA问题。就是若是V的值先由A变成B,再由B变成A,那么仍然认为是发生了变化,并须要从新执行算法中的步骤。有简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号,即便这个值由A变为B,而后为变为A,版本号也是不一样的。AtomicStampedReference和AtomicMarkableReference支持在两个变量上执行原子的条件更新。AtomicStampedReference更新一个“对象-引用”二元组,经过在引用上加上“版本号”,从而避免ABA问题,AtomicMarkableReference将更新一个“对象引用-布尔值”的二元组。
AtomicInteger 是一个支持原子操做的 Integer 类,就是保证对AtomicInteger类型变量的增长和减小操做是原子性的,不会出现多个线程下的数据不一致问题。若是不使用 AtomicInteger,要实现一个按顺序获取的 ID,就必须在每次获取时进行加锁操做,以免出现并发时获取到一样的 ID 的现象。
接下来经过源代码来看AtomicInteger具体是如何实现的原子操做。
首先看incrementAndGet() 方法,下面是具体的代码。
1
2
3
4
5
6
7
8
|
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1 ;
if (compareAndSet(current, next))
return next;
}
}
|
经过源码,能够知道,这个方法的作法为先获取到当前的 value 属性值,而后将 value 加 1,赋值给一个局部的 next 变量,然而,这两步都是非线程安全的,可是内部有一个死循环,不断去作compareAndSet操做,直到成功为止,也就是修改的根本在compareAndSet方法里面,compareAndSet()方法的代码以下:
1
2
3
|
public final boolean compareAndSet( int expect, int update) {
return unsafe.compareAndSwapInt( this , valueOffset, expect, update);
}
|
compareAndSet()方法调用的compareAndSwapInt()方法的声明以下,是一个native方法。
publicfinal native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);
compareAndSet 传入的为执行方法时获取到的 value 属性值,next 为加 1 后的值, compareAndSet所作的为调用 Sun 的 UnSafe 的 compareAndSwapInt 方法来完成,此方法为 native 方法,compareAndSwapInt 基于的是CPU 的 CAS指令来实现的。因此基于 CAS 的操做可认为是无阻塞的,一个线程的失败或挂起不会引发其它线程也失败或挂起。而且因为 CAS 操做是 CPU 原语,因此性能比较好。
相似的,还有decrementAndGet()方法。它和incrementAndGet()的区别是将 value 减 1,赋值给next 变量。
AtomicInteger中还有getAndIncrement() 和getAndDecrement() 方法,他们的实现原理和上面的两个方法彻底相同,区别是返回值不一样,前两个方法返回的是改变以后的值,即next。而这两个方法返回的是改变以前的值,即current。还有不少的其余方法,就不列举了。
下面以具体的例子分析下AtomicInteger的实现:
计数器(Counter),采用Java里比较方便的锁机制synchronized关键字,初步的代码以下:
1 public class Counter {
2 private int value;
3
4 public synchronized int getValue() {
5 return value;
6 }
7
8 public synchronized int increment() {
9 return ++value;
10 }
11
12 public synchronized int decrement() {
13 return --value;
14 }
15 }
synchronized关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程须要等待,直到该线程释放锁,这也就咱们前面所说的悲观锁。这里会有些问题:首先,若是被阻塞的线程优先级很高很重要怎么办?其次,若是得到锁的线程一直不释放锁怎么办?(这种状况是很是糟糕的)。还有一种状况,若是有大量的线程来竞争资源,那CPU将会花费大量的时间和资源来处理这些竞争(事实上CPU的主要工做并不是这些),同时,还有可能出现一些例如死锁之类的状况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重,所以,对于这种需求咱们期待一种更合适、更高效的线程安全机制。
package TestAtomicInteger;
import java.util.concurrent.atomic.AtomicInteger;
class MyThread implements Runnable {
// static int i = 0;
static AtomicInteger ai= new AtomicInteger( 0 );
public void run() {
for ( int m = 0 ; m < 1000000 ; m++) {
ai.getAndIncrement();
}
}
};
public class TestAtomicInteger {
public static void main(String[] args) throws InterruptedException {
MyThread mt = new MyThread();
Thread t1 = new Thread(mt);
Thread t2 = new Thread(mt);
t1.start();
t2.start();
Thread.sleep( 500 );
System.out.println(MyThread.ai.get());
}
}
|
能够发现结果都是2000000,也就是说AtomicInteger是线程安全的。
下面咱们就以模拟CAS机制来实现Counter的例子:
CAS类:
1 public class SimpleCAS {
2 private volatile int value;
3 public synchronized int getValue(){
4 return value;
5 }
6 public synchronized boolean comperaAndSwap(int expectedValue,int newValue){
7 int oldValue = value;
8 if(oldValue == expectedValue){
9 value = newValue;
10 return true;
11 }else{
12 return false;
13 }
14 }
15 }
CASCounter类:
1 public class CASCounter {
2 private SimpleCAS cas;
3 public int getValue(){
4 return cas.getValue();
5 }
6 public int increment(){
7 int olevalue = cas.getValue();
8 for (; ;) {
9 if(cas.comperaAndSwap(olevalue, olevalue+1)){
10 return cas.getValue();
11 }
12 }
13
14 }
15 }
上面的模拟不是CSA的真正实现,其实咱们在语言层面是没有作任何同步的操做的,你们也能够看到源码没有任何锁加在上面,可它为何是线程安全的呢?这就是Atomic包下这些类的奥秘:语言层面不作处理,咱们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性便可实现基于原子操做的线程安全。因此说,CAS并非无阻塞,只是阻塞并不是在语言、线程方面,而是在硬件层面,因此无疑这样的操做会更快更高效!
总结一下,AtomicInteger基于冲突检测的乐观并发策略。 能够想象,这种乐观在线程数目很是多的状况下,失败的几率会指数型增长。
AtomicInteger等对象出现的目的主要是为了解决在多线程环境下变量计数的问题,例如经常使用的i++,i--操做,它们不是线程安全的,AtomicInteger引入后,就没必要在进行i++和i--操做时,进行加锁操做,在咱们平常工做中,有不少业务场景须要在多线程环境下进行变量的计数:订单数统计、访问量统计、累计相应时长统计等。
demo 源码:https://github.com/mantuliu/javaAdvance
下面咱们先分析一下AtomicInteger的源代码。经过源码分析咱们知道,AtomicInteger的核心就是一个CAS算法(CompareAndSwap),比较并交换算法,此算法是由unsafe的底层代码实现,它是一个原子的操做,原理就是:若是内存中的实际值与update值相同,则将实际值更新为expect值,反之则返回失败,由上层系统循环获取实际值后,再次调用此CAS算法:
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
import sun.misc.Unsafe;
/**
* An {@code int} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An
* {@code AtomicInteger} is used in applications such as atomically
* incremented counters, and cannot be used as a replacement for an
* {@link java.lang.Integer}. However, this class does extend
* {@code Number} to allow uniform access by tools and utilities that
* deal with numerically-based classes.
*
* @since 1.5
* @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;//value值的偏移地址
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;//实际的值
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;//初始化
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final int get() {
return value;//返回value值
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(int newValue) {
value = newValue;//设置新值,由于没有判断oldvalue,因此此操做是非线程安全的
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);//与set操做效果同样,只是采用的是unsafe对象中经过偏移地址来设置值的方式
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {//原子操做,设置新值,返回老值
for (;;) {
int current = get();
if (compareAndSet(current, newValue))//经过CAS算法,比较current的值和实际值是否一致,若是一致则设置为newValue
return current;
}
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return true if successful.
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {//i++操做
for (;;) {
int current = get();//获取当前值
int next = current + 1;//当前值+1
if (compareAndSet(current, next))//比较current值和实际的值是否一致,如不一致,则继续循环
return current;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {//例如:当咱们统计接口的响应时间时,能够利用此方法
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final int decrementAndGet() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value.
*/
public String toString() {
return Integer.toString(get());
}
public int intValue() {
return get();
}
public long longValue() {
return (long)get();
}
public float floatValue() {
return (float)get();
}
public double doubleValue() {
return (double)get();
}
}
下面,咱们为四种状况(同步关键字、ReentrantLock公平锁和非公平锁、AtomicInteger)作一下性能对比分析,当咱们看到上面的代码分析后,咱们判断AtomicInteger应该比加锁的方式快,可是实验的结果代表,AtomicInteger只比ReentrantLock加公平锁的状况快几十倍,比其它两种方式略慢一些。
四个demo都用100个线程来循环模拟下单60秒钟:
demo Lesson8SyncIntPerform:在使用同步关键字加锁的状况下100个线程循环下单数为:677337556
demo Lesson8SyncIntPerform:在使用同步关键字加锁的状况下100个线程循环下单数为:755994691
demo Lesson8AtomicIntPerform:在使用AtomicInteger的状况下100个线程循环下单数为:562359607
demo Lesson8AtomicIntPerform:在使用AtomicInteger的状况下100个线程循环下单数为:575367967
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的状况下100个线程循环下单数为:857239882
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的状况下100个线程循环下单数为:860364303
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的状况下100个线程循环下单数为:19153640
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的状况下100个线程循环下单数为:19076567
上面的实验结果代表,在jdk1.6及后续的版本中(本实验的jdk版本是1.7,操做系统为windows操做系统),已经对于synchronized关键字的性能优化了不少,已经和ReentrantLock的性能差很少,加锁的效果比不加锁时使用AtomicInteger性能效果还要略好一些,可是公平锁的性能明显下降,其它三种状况下的性能是公平锁性能的几十倍以上,这和公平锁每次试图占有锁时,都必须先要进等待队列,按照FIFO的顺序去获取锁,所以在咱们的实验情景下,使用公平锁的线程进行了频繁切换,而频繁切换线程,性能必然会降低的厉害,这也告诫了咱们在实际的开发过程当中,在须要使用公平锁的情景下,务必要考虑线程的切换频率。
在运用CAS作Lock-Free操做中有一个经典的ABA问题:
线程1准备用CAS将变量的值由A替换为B,在此以前,线程2将变量的值由A替换为C,又由C替换为A,而后线程1执行CAS时发现变量的值仍然为A,因此CAS成功。但实际上这时的现场已经和最初不一样了,尽管CAS成功,但可能存在潜藏的问题,例以下面的例子:
现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,而后但愿用CAS将栈顶替换为B:
head.compareAndSet(A,B);
在T1执行上面这条指令以前,线程T2介入,将A、B出栈,再pushD、C、A,此时堆栈结构以下图,而对象B此时处于游离状态:
此时轮到线程T1执行CAS操做,检测发现栈顶仍为A,因此CAS成功,栈顶变为B,但实际上B.next为null,因此此时的状况变为:
其中堆栈中只有B一个元素,C和D组成的链表再也不存在于堆栈中,无缘无故就把C、D丢掉了。
以上就是因为ABA问题带来的隐患,各类乐观锁的实现中一般都会用版本戳version来对记录或对象标记,避免并发操做带来的问题,在Java中,AtomicStampedReference<E>也实现了这个做用,它经过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题,例以下面的代码分别用AtomicInteger和AtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操做,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败:
public class Test {
private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
@Override
public void run() {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
}
});
Thread intT2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); // true
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(new Runnable() {
@Override
public void run()
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
}
});
Thread refT2 = new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(c3); // false
}
});
refT1.start();
refT2.start();
}
}