从同步原语看非阻塞同步以及Java中的应用

  非阻塞同步:基于冲突检测的乐观并发策略,通俗讲就是先进行操做,若是没有其余线程争用共享数据,那操做就成功了,若是争用数据有冲突那就采用其余的补偿措施(最多见的就是不断重试直到成功),这种乐观的并发策略使得不少线程不须要由于竞争失败直接挂起,这种同步措施称为非阻塞同步。下面咱们就从硬件原语开始了解非阻塞同步,并看一看在Java中非阻塞同步的一些应用。html

1、从硬件原语上理解同步(非特指Java)

  同步机制是多处理机系统的重要组成部分,其实现方式除了关系到计算的正确性以外还有效率的问题。同步机制的实现一般是在硬件提供的同步指令的基础上,在经过用户级别软件例程实现的。上面说到的乐观策略实际上就是创建在硬件指令集的基础上的(咱们须要实际操做和冲突检测是原子性的),通常有下面的经常使用指令:测试并设置(test_and_set)、获取并增长(fetch_and_increment)、原子交换(Atomic_Exchange)、比较并交换(CAS)、加载链接条件存储(LL/SC),下面咱们会讲到这些以及经过这些硬件同步原语实现的旋转锁和栅栏同步。java

一、基本硬件原语

  在多处理机中实现同步,所需的主要功能是一组能以原子操做读出并修改存储单元的硬件原语。若是没有这种操做,创建基本的同步原语的代价会很是大。基本硬件原语有几种形式提供选择,他们都能以原子操做的方式读改存储单元,并指出进行的操做是否能以原子形式进行,这些原语做为基本构建提供构造各类各样的用户及同步操做。算法

  一个典型的例子就是原子交换(Atomic Exchange),他的功能是将一个存储单元中的值和一个寄存器的值进行交换。咱们看看这个原语怎样构造一个咱们一般意义上说的简单的锁。编程

  假设如今咱们构造这样一个简单的锁:其值为0表示锁是开的(锁可用),为1表示上锁(不可用)。当处理器要给该锁上锁的时候,将对应于该锁的存储单元的值与存放在某个寄存器中的1进行交换。若是别的处理器已经上了锁,那么交换指令返回的值为1不然为0。返回0的时候,由于是原子交换,锁的值就会从0变为1表示上锁成功;返回1,原子交换锁的值仍是1,可是返回1表示已经被上了锁。 咱们考虑使用这个锁:假设两个处理器同时进行交换操做(原子交换),竞争的结果就是,只有一个处理器会先执行成功而获得返回值0,而另外一个获得的返回值为1表示已经被上锁。从这些咱们能够看出,采用原子交换指令是实现同步的关键:这个原子交换操做的不可再分的,两个交换操做将由写顺序机制肯定前后顺序,这也保证了两个线程不能同时获取同步变量锁。数组

  除此以外,还有别的原语能够实现同步(关键都在于能以原子的方式读-改-写存储单元的值)。例如:测试并置定(test_and_set)(先测试一个存储单元的值,若是符合条件就修改其值),另外一个同步原语是读取并加1(fetch_and_increment))(返回存储单元的值并自动增长该值)。(看到这里能够回忆一下Java中的CAS操做的实现以及在Java中的实现原理)安全

  那么,上面的基本原语操做又是怎样实现的呢,这在一条指令中完成上述操做显然是困难的(在一条不可中断的指令中完成一次存储器的读改写,并且要求不容许其余的访存操做还要避免死锁)。如今的计算机上采用一对指令来实现上述的同步原语。该指令对由两条特殊的指令组成,一条是特殊的load指令(LL指令),另外一条是特殊的store指令(SC)。指令的执行顺序是:若是LL指令指明的存储单元的值在SC对其进行写以前被其余的指令改写过,则第二条指令执行失败,若是在两条指令之间进行切换也会致使执行SC失败,而SC指令将经过返回一个值来指出该指令操做是否成功(若是返回的1表示执行成功,返回0表示失败)。为何说这对指令至关于原子操做呢,这指的是是全部其余处理器进行的操做或者在这对指令以前执行或者在其后执行,不存在两条指令之间进行,因此在这一对指令之间不存在任何其余处理器改变相应存储单元的值。多线程

  下面是一段实现对R1指出的存储单元进行的原子交换操做并发

1 try:OR    R3,R4,R0 //R4中为交换值,将该值送入R3
2     LL    R2,0(R1) //将0(R1)中的值取到R2
3     SC    R3,0(R1) //若0(R1)中的值与R3中的值相同,则置R3的值为1,不然为0
4     BEQZ R3,try //R3的值为0表示存失败,转移从新尝试
5     MOV R4,R2 //成功,将取出的值送往R4     

  最终R4和由R1指向的存储单元值进行了原子交换,在LL和SC之间若是有别的处理器插入而且修改了存储单元的值则SC都会返回0并存入R3中从而从新执行交换操做。下面是实现各个讲到的读取并加1(fetch_and_increment)原语的实现app

1 try:LL    R2,0(R1) //将0(R1)中的值送入R2
2     DADDIU    R2,R2,#1 //加1操做(R2+1->R2)
3     SC    R2,0(R1) //若是0(R1)中的值和R2中的值相同就置R2的值为1,不然为0
4     BEQZ    R2,try //R2的值为0表示存失败,转移到开始出从新执行

  上面的指令的执行须要跟踪地址,一般LL指令指定一个寄存器,该寄存器中存放着目的存储单元的地址,这个寄存器称为链接寄存器,若是发生中断切换或者与链接寄存器中的地址匹配的cache块被做废(被别的SC指令访问),则将链接寄存器清零,SC指令则检查它的存储地址和链接寄存器汇中的内容是够匹配,若是匹配则SC指令继续执行,不然执行失败。ide

二、用一致性实现锁

  咱们如今用上面的原子交换的同步原语实现自旋锁(spin lock)(处理器不停请求得到锁的试用权,围绕该锁反复执行循环程序,直到得到锁)。自旋锁适用于这样的场景:锁被占用时间少,在得到锁以后加锁的过程延迟小。

  下面咱们考虑使用一种简单的方法实现:将锁变量保存在存储器中,处理器能够不断经过原子交换操做来请求其使用权,好比使用原子交换操做得到其返回值从而直达锁变量的使用状况。释放锁的时候,处理器只须要将说置为0。以下面的程序:使用原子交换操做堆自旋锁进行加锁,其中R1中存放的是自旋锁变量的地址

1         DADDIU R2,R0,#1
2 lockit: EXCH R2,0(R1) //原子交换,得到自旋锁的值并在下面比较自旋锁的值为1仍是0,为1表示已经上锁
3         BNEZ R2,lockit //若R2的内容不为0,则表示已经有其余程序得到了锁变量,就继续旋转等待

  下面咱们对这个简单的自旋锁实现进行一些改进(下面说到的可类比JMM内存模型理解)若是计算机支持Cache一致性,就能够将锁调入Cache中(类比本地内存),并经过一致性保证使得锁的值保持和存储器中的值一致(类比内存可见性和本地内存主内存的值一致同步)。这样作有下面的好处:使得环绕自旋锁的线程(自旋请求锁变量)只对本地Cache中的锁(主存中的副本)进行操做,而不用再每次请求占用锁时候进行一次全局的访存操做(访问主内存存储器中存放的锁的值) 利用访问锁的程序局部性原理(处理器最近使用的锁可能不久后还会使用),这种状况就可使得锁驻留在对应的Cache中,大大减小了得到锁所须要的时间(处于性能考虑,须要减小全局访存操做)。

  在改进以前,咱们应该知道,在上面的简单实现的基础上(上面的每次循环交换均须要一次写操做,由于有多个处理器会同时请求加锁,这就会致使一个处理器请求成功后,其余处理器都会写不命中),须要对这个程序进行改进,使得它只对本地副本中的锁变量进行读取和检测,直到发现锁已经被释放。发现释放以后,马上去进行交换操做跟别的处理器竞争锁变量。全部这些进程仍是以原子交换的方式得到锁,也只有一个进程能够得到成功(得到锁变量成功的进程交换后看到的锁变量值为0,交换以后的锁变量值为1表示上锁成功;而得到失败的进程虽然也交换了锁变量的值,可是由于交换后本身看到的锁变量的值已是1,就表示本身进程失败了),其余的须要继续旋转等待。当得到锁的进程使用完以后,将锁变量置为0表示释放锁由其余须要获取的进程去竞争它(其余进程会在本身的Cache中发现锁变量的值发生变化,这是上面所说的Cache一致性)。下面是修改后的旋转锁程序

1 lockit: LD    R2,0(R1) //取得锁的值
2         BNEZ R2,lockit //若是锁尚未释放(R2的值仍是1)
3         DADDIU    R2,R0,#1 //将R2值置为1(这里面能够这样想:上面BNEZ执行失败表示R2值为0,那么这个时候就+1)
4         EXCH R2,0(R1) //将R2中的值和0(R1)中的锁变量进行原子交换
5         BNEZ R2,lockit //上面第一次判断是当前进程首先发现主存中的锁变量值发生变化;
6                        //进行原子交换结果判断和上面同样,若是狡猾后返回值为0表示成功,为1表示失败就继续旋转等待获取

三、使用上面的旋转锁实现咱们一个同步原语——栅栏同步

  首先解释一下什么叫栅栏同步(barrier)。假设有一个相似于栅栏的东西,它会强制全部到达栅栏的进程进行等待,直到所有的进程都到达以后释放全部到达的进程继续往下执行,从而造成同步。下面咱们就经过上面说的旋转锁来简单模拟实现这样的一个同步原语

  使用两个旋转锁,一个表示计数器,记录已经到达该栅栏的进程数;另外一个用来封锁进程知道最后一个进程到达该栅栏。为了实现栅栏,咱们须要一个变量,到达并阻塞住的进程须要在这个变量上自旋等待知道知足它须要的条件(都到达栅栏而后才能往下执行)。咱们使用spin表示这个条件condition。以下的程序所示,其中lock和unlock提供基本的旋转锁,变量count记录已经到达栅栏的进程数,total表示已经到达栅栏的进程总数,对counterlock加锁保证了增量操做的原子性,release用来封锁最后一个到达栅栏的进程。spin(release==1)表示须要所有进程都到达栅栏。

 1 lock(counterlock); //确保更新的原子性
 2 if(count == 0) release = 0; //第一个进程到达,这时候重置release为0表示在其值变为1以前后续到达的进程都须要等待
 3 count = count + 1; //记录到达的进程数
 4 unlock(counterlock); //释放锁
 5 if(count == total) { //进程所有到达
 6     count = 0; //重置计数器count
 7     release = 1; //将release置为1表示释放所欲到达的进程
 8 } else { //进程尚未所有到达
 9     spin(release == 1); //已经到达的进程旋转等待知道全部的进程到达(言外之意就是release=1)
10 }

  可是上面的这种简单实现仍是存在问题的,咱们考虑下面这种可能发生的状况:当栅栏的使用在循环当中时候,这时候全部释放的进程在运行一段时间以后还会到达栅栏,假设其中一个进程在上次释放的时候尚未来得及离开栅栏,而是依旧停留在旋转操做上(可能操做系统从新进行进程调度致使那个进程没有来得及离开栅栏)。若是第二次栅栏使用的时候,一个执行较快的进程到达栅栏(这个快的意思是,当他到达栅栏以后上次那个尚未离开栅栏的进程还在旋转操做上),这个快的进程会发现count=0,那么他就会将release置为0,这时候就会致使那个还在旋转等待的进程发现release值为0,而后那就更不会再退出这个旋转操做了,就至关于被捆绑在栅栏上出不去(这个问题会致使后续的count计数少了一个进程到达,而老是小于total),那这样的话,因为count老是小于total那不是全部到达栅栏的进程都在spin上一直自旋了吗。那怎么解决这个问题呢,一种方法就是在进程离开栅栏的时候也进行计数,在上次使用栅栏的进程所有离开栅栏以前不容许执行快的进程再次使用并初始化栅栏的一些变量值。还有一种方法是使用sense_reversing栅栏,即每一个进程只用一个本地私有变量local_sense并初始化为1,用它和release判断进程是否须要自旋等待。

2、Java中的原子性操做概述

  所谓原子操做,就是指执行一系列操做的时候,要么所有执行要么所有不执行,不存在只执行一部分的状况。在设置计数器的时候通常是读取当前的值,而后+1在更新(读-改-写的过程),若是不能保证这这几个操做的过程的原子性就可能出现线程安全问题,好比下面的代码示例,++value在没有任何额外保证的前提下不是原子操做。

1 public class ThreadUnSafe{
2     private Long value;
3     
4     public Long getValue() {return value;}
5     
6     public void increment() {++value;}
7 }

  使用Javap -c XX.class查看汇编代码以下

  这是个复合操做,是不具有原子性的。而保证这个操做原子性的方法最简单的就是加上synchronized关键字(固然也能够是其余的加锁操做,参考Java中的锁——Lock和synchronized实现原理),使用synchronized能够实现线程安全性,可是这是个独占锁,没有获取内部锁的线程会被阻塞住(即使是这里的getValue操做,多线程访问也会阻塞住),这对于并发性能的提升是很差的(而这里也不能简单的去掉getValue上的synchronized,由于读操做须要保证value的读一致性,即须要得到主内存中的值而不是线程工做内存中的多是旧的副本值)。那么除了加锁以外其余安全的方法?后面讲到的原子类(使用CAS实现)就能够做为一个选择。

3、Java中的CAS操做概述

  Java中提供非阻塞的volatile关键字解决保证共享变量的可见性问题,可是不能解决部分符合操做不具有原子性的问题(好比自增运算)。CAS即CompareAndSwap是JDK提供的非阻塞原子操做,经过硬件保证比较更新的原子性。咱们经过compareAndSwapLong来简单介绍CAS:

  compareAndSwapLong(Object obj, long valueOffset, long expect, long update),该方法中compareAndSwap表示比较并交换,方法中有四个操做数,其中obj表示对象内存的位置,valueOffset表示对象中存储变量的偏移量,expect表示变量的预期值,update表示更新值。操做含义就是,若果对象obj中内存偏移量为valueOffset的变量值为expect则使用心得update值替换旧的值expect,这是处理器提供的一个原子指令。这些方法有sun.misc.Unsafe类提供。后面咱们会说到Unsafe类

  在此以前咱们先说一下CAS操做的一个经典的ABA问题:假如线程1 使用CAS修改初始值为A的变量X,那么线程1会首先回去当前变量X的值(A),而后使用CAS操做尝试修改X的值为B,若是使用CAS修改为功了,那么程序必定执行正确了吗?在往下的假设看,若是线程I在获取变量X的值A后,在执行CAS以前线程II使用CAS修改变量X的值为B而后由修改回了A。这时候虽然线程I执行CAS时候X的值依旧是A可是这个A已经不是线程I获取时候的A了,这就是ABA问题。ABA产生的缘由是变量的状态值产生了环形转换,即变量值从A->B,而后又从B->A。jdk中提供了带有标记的原子类AtomicStampedReference(时间戳原子引用)经过控制变量的版本保证CAS的正确性。以下所作的测试ABA问题以及使用AtomicStampedReference来解决这个问题

  (1)模拟ABA问题,下面的程序输出结果会是这样的

 1 package test;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 import java.util.concurrent.atomic.AtomicReference;
 5 
 6 public class TestAtomicStampedReference {
 7 
 8     static AtomicReference<Integer> atomicReference = new AtomicReference<>(1);
 9 
10     public static void main(String[] args) {
11         Thread t1 = new Thread(new Runnable() {
12             @Override
13             public void run() {
14                 atomicReference.compareAndSet(1,2);
15                 atomicReference.compareAndSet(2,1);
16                 System.out.println(Thread.currentThread() + "线程修改后的变量值" + atomicReference.get());
17             }
18         });
19 
20         Thread t2 = new Thread(new Runnable() {
21             @Override
22             public void run() {
23                 //sleep 1秒,保证线程t1完成1->2->1的模拟ABA操做
24                 try {
25                     TimeUnit.SECONDS.sleep(1);
26                 } catch (InterruptedException e) {
27                     e.printStackTrace();
28                 }
29                 atomicReference.compareAndSet(1,3);
30                 System.out.println(Thread.currentThread() + "线程修改后的变量值" + atomicReference.get());
31             }
32         });
33 
34         t1.start();
35         t2.start();
36     }
37 }

   (2)使用AtomicStampedReference从新实现,下面是运行结果

 1 package test;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 import java.util.concurrent.atomic.AtomicStampedReference;
 5 
 6 public class TestAtomicStampedReference {
 7 
 8     static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(10,1); //定义初始值和初始版本号
 9     public static void main(String[] args) {
10         Thread t1 = new Thread(new Runnable() {
11             @Override
12             public void run() {
13                 //线程1得到初始版本号并sleep1秒
14                 int version = atomicStampedReference.getStamp();
15                 System.out.println(Thread.currentThread() + "当前线程得到的版本号" + version);
16                 try {
17                     TimeUnit.SECONDS.sleep(1);
18                 } catch (InterruptedException e) {
19                     e.printStackTrace();
20                 }
21                 System.out.println(Thread.currentThread() + "修改变量结果true/false?:" +
22                         atomicStampedReference.compareAndSet(10,11,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1)
23                         + "修改后的结果:" + atomicStampedReference.getReference());
24                 System.out.println(Thread.currentThread() + "修改变量结果true/false?:" +
25                         atomicStampedReference.compareAndSet(11,10,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1)
26                         + "修改后的结果:" + atomicStampedReference.getReference());
27             }
28         });
29 
30         Thread t2 = new Thread(new Runnable() {
31             @Override
32             public void run() {
33                 //首先得到初始版本号,sleep2秒让线程1完成10->11->10的模拟ABA操做
34                 int version = atomicStampedReference.getStamp();
35                 System.out.println(Thread.currentThread() + "当前线程得到的版本号" + version);
36                 try {
37                     TimeUnit.SECONDS.sleep(2);
38                 } catch (InterruptedException e) {
39                     e.printStackTrace();
40                 }
41                 System.out.println(Thread.currentThread() + "修改变量结果true/false?:" +
42                         atomicStampedReference.compareAndSet(10,20,version,atomicStampedReference.getStamp()+1)
43                         + "修改后的结果:" + atomicStampedReference.getReference());            
44             }
45         });
46 
47         t1.start();
48         t2.start();
49     }
50 }

4、Java中的Unsafe类

  JDK中的rt.jar包中的Unsafe类提供了硬件级别的原子性操做,Unsafe类中许多方法都是native方法,他们使用JNI的方式访问本地C++中的实现库。下面咱们了解一下Unsafe类提供的几个主要的方法以及如何使用unsafe类进行一些编程操做。

一、Unsafe类中的重要方法介绍

(1)public native long objectFieldOffset(Field var1):返回指定的变量在所属类中的内存偏移地址,该偏移地址仅仅在该Unsafe函数中访问指定字段时候使用。以下使用Unsafe类获取变量value在Atomic对象中的内存偏移量

 

(2)public native int arrayBaseOffset(Class<?> var1):获取数组中第一个元素的地址

(3)public native int arrayIndexScale(Class<?> var1):获取数组中一个元素占用的字节

(4)public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5):比较对象var1中的偏移量为var2的变量的值是否与var4相同,相同则使用var6的值更新,并返回true,不然返回false。

(5)public native long getLongVolatile(Object var1, long var2):获取对象var1中偏移量为offset的变量对应volatile语义的值。

(6)public native void putLongVolatile(Object var1, long var2, long var4):设置var1对象中offset偏移类型为long的值为var4,支持volatile语义

(7)public native void putOrderedLong(Object var1, long var2, long var4):设置对象obj中offset偏移地址对应的long型的field的值为value。这是一个有延迟的putLongVolatile方法,而且不保证对应的值类型的修改对其余线程可见,只有变量在只用volatile修饰而且预计会被意外修改的时候才会使用该方法、

(8)public native void park(boolean var1, long var2):阻塞当前线程,其中参数var1等于false且var2等于0表示一直阻塞,var2大于0表示等待指定的时间后阻塞线程会被唤醒。这个var的值是相对的,为一个增量值,也就是至关当前时间累加事假后当前线程就会被唤醒。若是var1位true,而且var2大于0,则表示阻塞的线程到指定的时间点后就会被唤醒,这里的时间var2是个绝对时间,是某个时间点换算为ms后的值。

(9)public native void unpark(Object var1):唤醒调用park方法以后的线程。

下面是jdk8以后新增长的,咱们列出Long类型的方法

(10)getAndSetLong()方法:获取当前对象var1中偏移量为var2的变量volatile语义的当前值,并设置变量volatile语义的值为var4。

 首先使用getLongVolatile获取当前变量的值,而后使用CAS原子操做设置新的值。这里使用while是当CAS失败时候进行重试。

 

(11)getAndAddLong()方法:获取对象var1中偏移量为var2变量的volatile语义的值,设置变量值为原始值+var4

 

二、Unsafe类的使用

  考虑编写出下面的程序,并在本身的IDE中运行下面的程序,观察结果。

 1 package test;
 2 
 3 import sun.misc.Unsafe;
 4 
 5 public class TestUnsafe {
 6 
 7     //获取Unsafe的实例
 8     static Unsafe unsafe = Unsafe.getUnsafe();
 9 
10     //记录变量value在TestUnsafe中的偏移量
11     static long valueState;
12 
13     //变量
14     private volatile long value;
15 
16     static {
17         try {
18             //获取value变量在TestUnsafe类中的偏移量
19             valueState = unsafe.objectFieldOffset(TestUnsafe.class.getDeclaredField("value"));
20         } catch (Exception e) {
21             System.out.println(e.getMessage());
22         }
23     }
24 
25     public static void main(String[] args) {
26         TestUnsafe testUnsafe = new TestUnsafe();
27         System.out.println(unsafe.compareAndSwapInt(testUnsafe,valueState,0,1));
28     }
29 }

  上面的程序中首先获取Unsafe的一个实例,而后使用unsafe的objectFieldOffset方法获取TestUnsafe类中value变量,计算在TestUnsafe类中value变量的内存偏移地址并保存到valueState中。main中调用unsafe的compareAndSwapInt方法设置testUnsafe对象的value变量的值为1(若是是0的话)。value初始默认是0,咱们但愿代码能输出true(即compareAndSwapInt可以执行成功),可是最终运行时下面的结果

   咱们看到上面的异常报错在getUnsafe方法位置,下来咱们看一看getUnsafe方法

 1 public static Unsafe getUnsafe() {
 2     //(1)获取调用getUnsafe类的这个Class类,按照上面的程序中的TestUnsafa类
 3     Class var0 = Reflection.getCallerClass();
 4     //(2)看下面的那个方法
 5     if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
 6         throw new SecurityException("Unsafe");
 7     } else {
 8         return theUnsafe;
 9     }
10 }
11 /**
12  * (3)判断是否是启动类加载器加载的类,即看看是否是由BootStrapClassLoader加载的TestUnsafe.class,
13  *    因为咱们这是一个简单测试类,是由应用程序类加载器AppClassLoader加载的,因此直接报出SecurityException异常
14  */
15 public static boolean isSystemDomainLoader(ClassLoader var0) {
16     return var0 == null;
17 }

  因为Unsafe类rt.jar包提供的,该包下面的类都是经过Bootstrap类加载器加载的,而咱们使用的main方法所在的类是由AppClassLoader加载的,因此在main方法中加载Unsafe类的时候根据双亲委派机制会委托给Bootstrap加载。那么若是想要使用Unsafe类应该怎样使用呢,《深刻理解java虚拟机》中这一块告诉咱们可使用反射来使用,下面咱们来试一下

 1 package test;
 2 
 3 import sun.misc.Unsafe;
 4 
 5 import java.lang.reflect.Field;
 6 
 7 public class TestUnsafe2 {
 8 
 9     static Unsafe unsafe;
10 
11     static long valueOffset;
12 
13     private volatile long value = 0;
14 
15     static {
16         try {
17             //使用反射获取Unsafe的成员变量theUnsafe
18             Field field = Unsafe.class.getDeclaredField("theUnsafe");
19             //设置为课存取
20             field.setAccessible(true);
21             //设置该变量的值
22             unsafe = (Unsafe) field.get(null);
23             //获取value偏移量
24             valueOffset = unsafe.objectFieldOffset(TestUnsafe2.class.getDeclaredField("value"));
25         } catch (NoSuchFieldException e) {
26             e.printStackTrace();
27         } catch (IllegalAccessException e) {
28             e.printStackTrace();
29         }
30     }
31 
32     public static void main(String[] args) {
33         TestUnsafe2 test = new TestUnsafe2();
34         System.out.println("修改变量结果true/false?:" +
35                 unsafe.compareAndSwapInt(test,valueOffset,0,1)
36                 + "修改后的结果:" + test.value);
37     }
38 }

  获得下面的结果:

5、JUC中原子操做类AtomicLong的原理探究

一、原操做类概述

  JUC包中提供了不少原子操做类,这些类都是经过上面说到的非阻塞CAS算法来实现的,相比较使用锁来实现原子性操做CAS在性能上有很大提升。因为原子操做类的原理都大体相同,因此下面分析AtomicLong类的实现原理来进一步了解原子操做类。

二、AtomicLong的源码

  下面是AtomicLong原子类的部分源码,其中主要包含其成员变量以及一些静态代码块和构造方法

 1 public class AtomicLong extends Number implements java.io.Serializable {
 2 
 3     //(1)获取Unsafe实例
 4     private static final Unsafe unsafe = Unsafe.getUnsafe();
 5     
 6     //(2)保存value值的偏移量
 7     private static final long valueOffset;
 8     
 9     //(3)判断当前JVM是否支持Long类型的无锁CAS
10     static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
11     private static native boolean VMSupportsCS8();
12     
13     static {
14         try {
15             //(4)获取value值在AtomicLong中的偏移量
16             valueOffset = unsafe.objectFieldOffset
17                 (AtomicLong.class.getDeclaredField("value"));
18         } catch (Exception ex) { throw new Error(ex); }
19     }
20     
21     //(5)实际存的变量值value
22     private volatile long value;
23 
24     //构造方法
25     public AtomicLong(long initialValue) {
26         value = initialValue;
27     }
28 }

  在上面的部分代码中,代码(1)经过Unsafe.getUnsafe()方法获取到Unsafe类的实例(AtomicLong类也是rt.jar包下面的,因此AtomicLong也是经过启动类加载器进行类加载的)。(2)(4)两处是计算并保存AtomicLong类中存储的变量value的偏移量。(5)中的value被声明为volatile的(关于volatile的内存语义以及实现原理参考前面写到的Java并发编程基础之volatile),这是为了在多线程下保证内存的可见性,而value就是具体存放计数的变量。下面咱们看看AtomicLong中的主要几个函数

  (1)递增和递减的源码

 1 //使用unsafe的方法,原子性的设置value值为原始值+1,返回值为递增以后的值
 2 public final long getAndIncrement() {
 3     return unsafe.getAndAddLong(this, valueOffset, 1L);
 4 }    
 5 public final long incrementAndGet() {
 6     return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
 7 }
 8 //使用unsafe的方法,原子性的设置value值为原始值-1,返回值为递减以后的值
 9 public final long getAndDecrement() {
10     return unsafe.getAndAddLong(this, valueOffset, -1L);
11 }
12 public final long decrementAndGet() {
13     return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
14 }

  在上面的代码中都是经过调用Unsafe类的getAndAddLong方法来实现操做的,咱们来看看这个方法,这个方法是个原子性操做:其中的第一个参数是AtomicLong实例的引用,第二个参数是value变量在AtomicLong中的偏移量,第三个参数是要设置为第二个变量的值。下面就是getAndAddLong方法的实现,以及一些分析

 1 public final long getAndAddLong(Object var1, long var2, long var4) {
 2     long var6;
 3     do {
 4         //public native long getLongVolatile(Object var1, long var2);
 5         //该方法就是获取var1引用指向的内存地址中偏移量为var2位置的值,而后赋给var6
 6         var6 = this.getLongVolatile(var1, var2);
 7     /**public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
 8      * var1:AtomicXXX类型的一个引用,指向堆内存中的一块地址
 9      * var2:AtomicXXX源码中的valueOffset,表示AtomicXXX源码中实际存储的值value在原子类型内存中的地址偏移量
10      * var4:要比较的目标值expectValue,若是从内存指定地址处(var1和var2决定的那块地址)的值和该值相等,则CAS成功
11      * var6:CAS成功后向该内存中写进的新值
12      */
13     //该方法就是使用CAS的方式,比较指定内存地址处(var1指向的内存地址块中偏移量为var2处)的值和上面同一块地址处取出的var6是否相等,
14     //相等就将var6+var4(这里能够当作var6+1)和指定内存地址处(var2引用指向的地址块中偏移量为var2处)的值交换,并返回true,而后就会结束循环
15     //CAS失败返回false,而后继续执行循环体内部的代码,直到成功(也就是自增运算成功就会跳出循环并返回自增后的值)
16     } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
17 
18     return var6;
19 }

  (2)CompareAndSet方法

  下面是compaerAndSet方法的实现,主要仍是调用unsafe类的compareAndSwapLong方法,其原理和上面分析的差很少,都是经过CAS的方式进行比较交换值。

1 public final boolean compareAndSet(long expect, long update) {
2     //public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
3     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
4 }

  (3)扩展,下面是compareAndSwapInt的底层实现,其实是经过硬件同步原语来实现的CAS,下面的cmpxchg就是基于硬件原语实现的

1 UNSAFE_ENTRY(jboolean,Usafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
2 UsafeWrapper("Usafe_CompareAndSwapInt");
3 oop p = JNIHasdles::resolve(obj);
4 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
5 return (jint)(Atomic::cmpxchg(x,addr,e)) == e;
6 UNSAFE_END

  (4)下面是一个例子,使用AtomicLong来进行技术运算

 1 package test;
 2 
 3 import java.util.concurrent.atomic.AtomicLong;
 4 
 5 public class TestAtomic1 {
 6 
 7     //建立AtomicLong类型的计数器
 8     private static AtomicLong atomicLong = new AtomicLong();
 9 //    private static Long atomicLong = 0L;
10     //建立两个数组,计算数组中的0的个数
11     private static Integer[] arr1 = {0,1,2,3,0,5,6,0,56,0};
12     private static Integer[] arr2 = {10,1,2,3,0,5,6,0,56,0};
13 
14     public static void main(String[] args) throws InterruptedException {
15 
16         //线程1统计arr1中0的个数
17         Thread t1  = new Thread(new Runnable() {
18             @Override
19             public void run() {
20                 int size = arr1.length;
21                 for (int i = 0; i < size; i++) {
22                     if(arr1[i].intValue() == 0) {
23 //                        atomicLong.getAndIncrement();
24                         atomicLong++;
25                     }
26                 }
27             }
28         });
29 
30         Thread t2  = new Thread(new Runnable() {
31             @Override
32             public void run() {
33                 int size = arr2.length;
34                 for (int i = 0; i < size; i++) {
35                     if(arr2[i].intValue() == 0) {
36 //                        atomicLong.getAndIncrement();
37                         atomicLong++;
38                     }
39                 }
40             }
41         });
42 
43         t1.start();
44         t2.start();
45 
46         t1.join();
47         t2.join();
48 
49         System.out.println("两个数组中0出现的次数为: " + atomicLong);//两个数组中0出现的次数为: 7
50     }
51 }

  若是没有使用原子类型进行计数运算,那么可能就是下面的结果

   

相关文章
相关标签/搜索