并发-AtomicInteger源码分析—基于CAS的乐观锁实现

AtomicInteger源码分析—基于CAS的乐观锁实现html

 

参考:java

http://www.importnew.com/22078.htmlgit

https://www.cnblogs.com/mantu/p/5796450.htmlgithub

http://hustpawpaw.blog.163.com/blog/static/184228324201210811243127/算法

 

 

 

1. 悲观锁与乐观锁

咱们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不一样的thread/process轮流执行,时间片与时间片之间,须要进行cpu切换,也就是会发生进程的切换。切换涉及到清空寄存器,缓存数据。而后从新加载新的thread所需数据。当一个线程被挂起时,加入到阻塞队列,在必定的时间或条件下,在经过notify(),notifyAll()唤醒回来。在某个资源不可用的时候,就将cpu让出,把当前等待线程切换为阻塞状态。等到资源(好比一个共享数据)可用了,那么就将线程唤醒,让他进入runnable状态等待cpu调度。这就是典型的悲观锁的实现。独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的状况,而且只有在确保其它线程不会形成干扰的状况下执行,会致使其它全部须要锁的线程挂起,等待持有锁的线程释放锁。windows

可是,因为在进程挂起和恢复执行过程当中存在着很大的开销。当一个线程正在等待锁时,它不能作任何事,因此悲观锁有很大的缺点。举个例子,若是一个线程须要某个资源,可是这个资源的占用时间很短,当线程第一次抢占这个资源时,可能这个资源被占用,若是此时挂起这个线程,可能马上就发现资源可用,而后又须要花费很长的时间从新抢占锁,时间代价就会很是的高。缓存

因此就有了乐观锁的概念,他的核心思路就是,每次不加锁而是假设没有冲突而去完成某项操做,若是由于冲突失败就重试,直到成功为止。在上面的例子中,某个线程能够不让出cpu,而是一直while循环,若是失败就重试,直到成功为止。因此,当数据争用不严重时,乐观锁效果更好。好比CAS就是一种乐观锁思想的应用。安全

2.   java中CAS的实现

CAS就是Compare and Swap的意思,比较并操做。不少的cpu直接支持CAS指令。CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知此次竞争中失败,并能够再次尝试。CAS有3个操做数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。性能优化

JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操做,而且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令。在Java.util.concurrent.atomic包下面的全部的原子变量类型中,好比AtomicInteger,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操做。多线程

在CAS操做中,会出现ABA问题。就是若是V的值先由A变成B,再由B变成A,那么仍然认为是发生了变化,并须要从新执行算法中的步骤。有简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号,即便这个值由A变为B,而后为变为A,版本号也是不一样的。AtomicStampedReference和AtomicMarkableReference支持在两个变量上执行原子的条件更新。AtomicStampedReference更新一个“对象-引用”二元组,经过在引用上加上“版本号”,从而避免ABA问题,AtomicMarkableReference将更新一个“对象引用-布尔值”的二元组。

3.  AtomicInteger的实现。

AtomicInteger 是一个支持原子操做的 Integer 类,就是保证对AtomicInteger类型变量的增长和减小操做是原子性的,不会出现多个线程下的数据不一致问题。若是不使用 AtomicInteger,要实现一个按顺序获取的 ID,就必须在每次获取时进行加锁操做,以免出现并发时获取到一样的 ID 的现象。

接下来经过源代码来看AtomicInteger具体是如何实现的原子操做。

首先看incrementAndGet() 方法,下面是具体的代码。

1
2
3
4
5
6
7
8
public final int incrementAndGet() { 
        for (;;) { 
            int current = get(); 
            int next = current + 1
            if (compareAndSet(current, next)) 
                return next; 
        
    }

经过源码,能够知道,这个方法的作法为先获取到当前的 value 属性值,而后将 value 加 1,赋值给一个局部的 next 变量,然而,这两步都是非线程安全的,可是内部有一个死循环,不断去作compareAndSet操做,直到成功为止,也就是修改的根本在compareAndSet方法里面,compareAndSet()方法的代码以下:

1
2
3
public final boolean compareAndSet(int expect, int update) { 
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 
    }

compareAndSet()方法调用的compareAndSwapInt()方法的声明以下,是一个native方法。

publicfinal native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);

compareAndSet 传入的为执行方法时获取到的 value 属性值,next 为加 1 后的值, compareAndSet所作的为调用 Sun 的 UnSafe 的 compareAndSwapInt 方法来完成,此方法为 native 方法,compareAndSwapInt 基于的是CPU 的 CAS指令来实现的。因此基于 CAS 的操做可认为是无阻塞的,一个线程的失败或挂起不会引发其它线程也失败或挂起。而且因为 CAS 操做是 CPU 原语,因此性能比较好。

相似的,还有decrementAndGet()方法。它和incrementAndGet()的区别是将 value 减 1,赋值给next 变量。

AtomicInteger中还有getAndIncrement() 和getAndDecrement() 方法,他们的实现原理和上面的两个方法彻底相同,区别是返回值不一样,前两个方法返回的是改变以后的值,即next。而这两个方法返回的是改变以前的值,即current。还有不少的其余方法,就不列举了。

下面以具体的例子分析下AtomicInteger的实现:

  计数器(Counter),采用Java里比较方便的锁机制synchronized关键字,初步的代码以下:

复制代码
 1 public class Counter {
 2     private int value;  
 3       
 4     public synchronized int getValue() {  
 5         return value;  
 6     }  
 7   
 8     public synchronized int increment() {  
 9         return ++value;  
10     }  
11   
12     public synchronized int decrement() {  
13         return --value;  
14     }  
15 }
复制代码

  synchronized关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程须要等待,直到该线程释放锁,这也就咱们前面所说的悲观锁。这里会有些问题:首先,若是被阻塞的线程优先级很高很重要怎么办?其次,若是得到锁的线程一直不释放锁怎么办?(这种状况是很是糟糕的)。还有一种状况,若是有大量的线程来竞争资源,那CPU将会花费大量的时间和资源来处理这些竞争(事实上CPU的主要工做并不是这些),同时,还有可能出现一些例如死锁之类的状况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重,所以,对于这种需求咱们期待一种更合适、更高效的线程安全机制。

package TestAtomicInteger;
 
import java.util.concurrent.atomic.AtomicInteger;
 
class MyThread implements Runnable {
//    static  int i = 0;
     static AtomicInteger ai=new AtomicInteger(0);
      
 
    public void run() {
        for (int m = 0; m < 1000000; m++) {
            ai.getAndIncrement();
        }
    }
};
 
public class TestAtomicInteger {
    public static void main(String[] args) throws InterruptedException {
        MyThread mt = new MyThread();
 
        Thread t1 = new Thread(mt);
        Thread t2 = new Thread(mt);
        t1.start();
        t2.start();
        Thread.sleep(500);
        System.out.println(MyThread.ai.get());
    }
}

  能够发现结果都是2000000,也就是说AtomicInteger是线程安全的。

  下面咱们就以模拟CAS机制来实现Counter的例子:

   CAS类:

复制代码
 1 public class SimpleCAS {
 2     private volatile int value;
 3     public synchronized int getValue(){
 4         return value;  
 5     } 
 6     public synchronized boolean comperaAndSwap(int expectedValue,int newValue){
 7         int oldValue = value;
 8         if(oldValue == expectedValue){
 9             value = newValue;
10             return true;
11         }else{
12             return false;
13         }
14     }
15 }
复制代码

  CASCounter类:

复制代码
 1 public class CASCounter {
 2     private SimpleCAS cas;  
 3     public int getValue(){
 4         return cas.getValue();
 5     }
 6     public int increment(){
 7         int olevalue = cas.getValue();
 8         for (; ;) {
 9             if(cas.comperaAndSwap(olevalue, olevalue+1)){
10                 return cas.getValue();
11             }
12         }
13          
14     }
15 }
复制代码

  上面的模拟不是CSA的真正实现,其实咱们在语言层面是没有作任何同步的操做的,你们也能够看到源码没有任何锁加在上面,可它为何是线程安全的呢?这就是Atomic包下这些类的奥秘:语言层面不作处理,咱们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性便可实现基于原子操做的线程安全。因此说,CAS并非无阻塞,只是阻塞并不是在语言、线程方面,而是在硬件层面,因此无疑这样的操做会更快更高效!

  总结一下,AtomicInteger基于冲突检测的乐观并发策略。 能够想象,这种乐观在线程数目很是多的状况下,失败的几率会指数型增长。

 

 

 

 

 

AtomicInteger等对象出现的目的主要是为了解决在多线程环境下变量计数的问题,例如经常使用的i++,i--操做,它们不是线程安全的,AtomicInteger引入后,就没必要在进行i++和i--操做时,进行加锁操做,在咱们平常工做中,有不少业务场景须要在多线程环境下进行变量的计数:订单数统计、访问量统计、累计相应时长统计等。

demo 源码:https://github.com/mantuliu/javaAdvance

    下面咱们先分析一下AtomicInteger的源代码。经过源码分析咱们知道,AtomicInteger的核心就是一个CAS算法(CompareAndSwap),比较并交换算法,此算法是由unsafe的底层代码实现,它是一个原子的操做,原理就是:若是内存中的实际值与update值相同,则将实际值更新为expect值,反之则返回失败,由上层系统循环获取实际值后,再次调用此CAS算法:

复制代码
/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent.atomic;
import sun.misc.Unsafe;

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;//value值的偏移地址

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;//实际的值

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;//初始化
    }

    /**
     * Creates a new AtomicInteger with initial value {@code 0}.
     */
    public AtomicInteger() {
    }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final int get() {
        return value;//返回value值
    }

    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(int newValue) {
        value = newValue;//设置新值,由于没有判断oldvalue,因此此操做是非线程安全的
    }

    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);//与set操做效果同样,只是采用的是unsafe对象中经过偏移地址来设置值的方式
    }

    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {//原子操做,设置新值,返回老值
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))//经过CAS算法,比较current的值和实际值是否一致,若是一致则设置为newValue
                return current;
        }
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
     * and does not provide ordering guarantees, so is only rarely an
     * appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful.
     */
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {//i++操做
        for (;;) {
            int current = get();//获取当前值
            int next = current + 1;//当前值+1
            if (compareAndSet(current, next))//比较current值和实际的值是否一致,如不一致,则继续循环
                return current;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the previous value
     */
    public final int getAndDecrement() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {//例如:当咱们统计接口的响应时间时,能够利用此方法
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final int decrementAndGet() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value.
     */
    public String toString() {
        return Integer.toString(get());
    }


    public int intValue() {
        return get();
    }

    public long longValue() {
        return (long)get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
复制代码

      下面,咱们为四种状况(同步关键字、ReentrantLock公平锁和非公平锁、AtomicInteger)作一下性能对比分析,当咱们看到上面的代码分析后,咱们判断AtomicInteger应该比加锁的方式快,可是实验的结果代表,AtomicInteger只比ReentrantLock加公平锁的状况快几十倍,比其它两种方式略慢一些。

     四个demo都用100个线程来循环模拟下单60秒钟:

demo Lesson8SyncIntPerform:在使用同步关键字加锁的状况下100个线程循环下单数为:677337556

demo Lesson8SyncIntPerform:在使用同步关键字加锁的状况下100个线程循环下单数为:755994691

demo Lesson8AtomicIntPerform:在使用AtomicInteger的状况下100个线程循环下单数为:562359607

demo Lesson8AtomicIntPerform:在使用AtomicInteger的状况下100个线程循环下单数为:575367967

demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的状况下100个线程循环下单数为:857239882

demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的状况下100个线程循环下单数为:860364303

demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的状况下100个线程循环下单数为:19153640

demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的状况下100个线程循环下单数为:19076567

   上面的实验结果代表,在jdk1.6及后续的版本中(本实验的jdk版本是1.7,操做系统为windows操做系统),已经对于synchronized关键字的性能优化了不少,已经和ReentrantLock的性能差很少,加锁的效果比不加锁时使用AtomicInteger性能效果还要略好一些,可是公平锁的性能明显下降,其它三种状况下的性能是公平锁性能的几十倍以上,这和公平锁每次试图占有锁时,都必须先要进等待队列,按照FIFO的顺序去获取锁,所以在咱们的实验情景下,使用公平锁的线程进行了频繁切换,而频繁切换线程,性能必然会降低的厉害,这也告诫了咱们在实际的开发过程当中,在须要使用公平锁的情景下,务必要考虑线程的切换频率。

 

 

 

AtomicStampedReference解决ABA问题

在运用CAS作Lock-Free操做中有一个经典的ABA问题:

线程1准备用CAS将变量的值由A替换为B,在此以前,线程2将变量的值由A替换为C,又由C替换为A,而后线程1执行CAS时发现变量的值仍然为A,因此CAS成功。但实际上这时的现场已经和最初不一样了,尽管CAS成功,但可能存在潜藏的问题,例以下面的例子:

AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,而后但愿用CAS将栈顶替换为B:

head.compareAndSet(A,B);

在T1执行上面这条指令以前,线程T2介入,将A、B出栈,再pushD、C、A,此时堆栈结构以下图,而对象B此时处于游离状态:

AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

此时轮到线程T1执行CAS操做,检测发现栈顶仍为A,因此CAS成功,栈顶变为B,但实际上B.next为null,因此此时的状况变为:

AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

其中堆栈中只有B一个元素,C和D组成的链表再也不存在于堆栈中,无缘无故就把C、D丢掉了。

以上就是因为ABA问题带来的隐患,各类乐观锁的实现中一般都会用版本戳version来对记录或对象标记,避免并发操做带来的问题,在Java中,AtomicStampedReference<E>也实现了这个做用,它经过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题,例以下面的代码分别用AtomicInteger和AtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操做,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败:

 

public class Test {

    private static AtomicInteger atomicInt = new AtomicInteger(100);

    private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);

 

    public static void main(String[] args) throws InterruptedException {

       Thread intT1 = new Thread(new Runnable() {

           @Override

           public void run() {

              atomicInt.compareAndSet(100, 101);

              atomicInt.compareAndSet(101, 100);

           }

       });

 

       Thread intT2 = new Thread(new Runnable() {

           @Override

           public void run() {

              try {

                  TimeUnit.SECONDS.sleep(1);

              } catch (InterruptedException e) {

              }

              boolean c3 = atomicInt.compareAndSet(100, 101);

              System.out.println(c3); // true

           }

       });

 

       intT1.start();

       intT2.start();

       intT1.join();

       intT2.join();

 

       Thread refT1 = new Thread(new Runnable() {

           @Override

           public void run()

              try {

                  TimeUnit.SECONDS.sleep(1);

              } catch (InterruptedException e) {

              }

              atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

              atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

           }

       });

 

       Thread refT2 = new Thread(new Runnable() {

           @Override

           public void run() {

              int stamp = atomicStampedRef.getStamp();

              try {

                  TimeUnit.SECONDS.sleep(2);

              } catch (InterruptedException e) {

              }

              boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);

              System.out.println(c3); // false

           }

       });

 

       refT1.start();

       refT2.start();

    }

}

相关文章
相关标签/搜索