一个简单的Java单例示例谈谈并发

一个简单的单例示例

单例模式多是你们常常接触和使用的一个设计模式,你可能会这么写java

public class UnsafeLazyInitiallization {

    private static UnsafeLazyInitiallization instance;

    private UnsafeLazyInitiallization() {
    }

    public static UnsafeLazyInitiallization getInstance(){

        if(instance==null){   //1:A线程执行

            instance=new UnsafeLazyInitiallization();  //2:B线程执行

        }

        return instance;

    }

}

上面代码你们应该都知道,所谓的线程不安全的懒汉单例写法。在UnsafeLazyInitiallization类中,假设A线程执行代码1的同时,B线程执行代码2,此时,线程A可能看到instance引用的对象尚未初始化。node

你可能会说,线程不安全,我能够对getInstance()方法作同步处理保证安全啊,好比下面这样的写法linux

public class SafeLazyInitiallization {

        private static SafeLazyInitiallization instance;

        private SafeLazyInitiallization() {
        }

        public synchronized static SafeLazyInitiallization getInstance(){

            if(instance==null){

                instance=new SafeLazyInitiallization();

            }

            return instance;

        }
    }

这样的写法是保证了线程安全,可是因为getInstance()方法作了同步处理,synchronized将致使性能开销。如getInstance()方法被多个线程频繁调用,将会致使程序执行性能的降低。反之,若是getInstance()方法不会被多个线程频繁的调用,那么这个方案将可以提供使人满意的性能。c++

那么,有没有更优雅的方案呢?前人的智慧是伟大的,在早期的JVM中,synchronized存在巨大的性能开销,所以,人们想出了一个“聪明”的技巧——双重检查锁定。人们经过双重检查锁定来下降同步的开销。下面来让咱们看看程序员

public class DoubleCheckedLocking { //1
    private static DoubleCheckedLocking instance; //2

    private DoubleCheckedLocking() {

    }

    public static DoubleCheckedLocking getInstance() { //3

        if (instance == null) { //4:第一次检查

            synchronized (DoubleCheckedLocking.class) { //5:加锁

                if (instance == null) //6:第二次检查

                    instance = new DoubleCheckedLocking(); //7:问题的根源出在这里

            } //8

        } //9

        return instance; //10

    } //11

}

如上面代码所示,若是第一次检查instance不为null,那么就不须要执行下面的加锁和初始化操做。所以,能够大幅下降synchronized带来的性能开销。双重检查锁定看起来彷佛很完美,但这是一个错误的优化!为何呢?在线程执行到第4行,代码读取到instance不为null时,instance引用的对象有可能尚未完成初始化。在第7行建立了一个对象,这行代码能够分解为以下的3行伪代码算法

memory=allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance=memory; //3:设置instance指向刚分配的内存地址

上面3行代码中的2和3之间,可能会被重排序(在一些JIT编译器上,这种重排序是真实发生的,若是不了解重排序,后文JMM会详细解释)。2和3之间重排序以后的执行时序以下编程

memory=allocate(); //1:分配对象的内存空间
instance=memory; //3:设置instance指向刚分配的内存地址,注意此时对象尚未被初始化
ctorInstance(memory); //2:初始化对象

回到示例代码第7行,若是发生重排序,另外一个并发执行的线程B就有可能在第4行判断instance不为null。线程B接下来将访问instance所引用的对象,但此时这个对象可能尚未被A线程初始化。在知晓问题发生的根源以后,咱们能够想出两个办法解决windows

  • 不容许2和3重排序
  • 容许2和3重排序,但不容许其余线程“看到”这个重排序

下面就介绍这两个解决方案的具体实现设计模式

基于volatile的解决方案数组

对于前面的基于双重检查锁定的方案,只须要作一点小的修改,就能够实现线程安全的延迟初始化。请看下面的示例代码

public class SafeDoubleCheckedLocking {
    private volatile static SafeDoubleCheckedLocking instance;

    private SafeDoubleCheckedLocking() {
    }

    public static SafeDoubleCheckedLocking getInstance() {

        if (instance == null) {

            synchronized (SafeDoubleCheckedLocking.class) {

                if (instance == null)

                    instance = new SafeDoubleCheckedLocking();//instance为volatile,如今没问题了

            }

        }

        return instance;

    }

}

当声明对象的引用为volatile后,前面伪代码谈到的2和3之间的重排序,在多线程环境中将会被禁止。

基于类初始化的解决方案

JVM在类的初始化阶段(即在Class被加载后,且被线程使用以前),会执行类的初始化。在执行类的初始化期间,JVM会去获取多个线程对同一个类的初始化。基于这个特性,实现的示例代码以下

public class InstanceFactory {

    private InstanceFactory() {
    }

    private static class InstanceHolder {

        public static InstanceFactory instance = new InstanceFactory();

    }

    public static InstanceFactory getInstance() {

        return InstanceHolder.instance; //这里将致使InstanceHolder类被初始化

    }

}

这个方案的本质是容许前面伪代码谈到的2和3重排序,但不容许其余线程“看到”这个重排序。在InstanceFactory示例代码中,首次执行getInstance()方法的线程将致使InstanceHolder类被初始化。因为Java语言是多线程的,多个线程可能在同一时间尝试去初始化同一个类或接口(好比这里多个线程可能会在同一时刻调用getInstance()方法来初始化IInstanceHolder类)。Java语言规定,对于每个类和接口C,都有一个惟一的初始化锁LC与之对应。从C到LC的映射,由JVM的具体实现去自由实现。JVM在类初始化期间会获取这个初始化锁,而且每一个线程至少获取一次锁来确保这个类已经被初始化过了。

JMM

也许你还存在疑问,前面谈的重排序是什么鬼?为何volatile在某方面就能禁止重排序?如今引出本文的另外一个话题JMM(Java Memory Model——Java内存模型)。什么是JMM呢?JMM是一个抽象概念,它并不存在。Java虚拟机规范中试图定义一种Java内存模型(JMM)来屏蔽掉各类硬件和操做系统的内存访问差别,以实现让Java程序在各类平台下都能达到一致的内存访问效果。在此以前,主流程序语言(如C/C++等)直接使用物理硬件和操做系统的内存模型,所以,会因为不一样平台的内存模型的差别,有可能致使程序在一套平台上并发彻底正常,而在另外一套平台上并发访问却常常出错,所以在某些场景就必须针对不一样的平台来编写程序。

Java线程之间的通讯由JMM来控制,JMM决定一个线程共享变量的写入什么时候对另外一个线程可见。JMM保证若是程序是正确同步的,那么程序的执行将具备顺序一致性。从抽象的角度看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量(实例域、静态域和数据元素)存储在主内存(Main Memory)中,每一个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本(局部变量、方法定义参数和异常处理参数是不会在线程之间共享,它们存储在线程的本地内存中)。从物理角度上看,主内存仅仅是虚拟机内存的一部分,与物理硬件的主内存名字同样,二者能够互相类比;而本地内存,可与处理器高速缓存类比。Java内存模型的抽象示意图如图所示

这里先介绍几个基础概念:8种操做指令、内存屏障、顺序一致性模型、as-if-serial、happens-before 、数据依赖性、 重排序。

8种操做指令

关于主内存与本地内存之间具体的交互协议,即一个变量如何从主内存拷贝到本地内存、如何从本地内存同步回主内存之类的实现细节,JMM中定义了如下8种操做来完成,虚拟机实现时必须保证下面说起的每种操做都是原子的、不可再分的(对于double和long类型的遍从来说,load、store、read和write操做在某些平台上容许有例外):

  • lock(锁定):做用于主内存的变量,它把一个变量标识为一条线程独立的状态。
  • unlock(解锁):做用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才能够被其余线程锁定。
  • read(读取):做用于主内存的变量,它把一个变量的值从主内存传输到线程的本地内存中,以便随后的load动做使用。
  • load(载入):做用于本地内存的变量,它把read操做从主内存中获得变量值放入本地内存的变量副本中。
  • use(使用):做用于本地内存的变量,它把本地内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个须要使用到变量的值的字节码指令时将会执行这个操做。
  • assign(赋值):做用于本地内存的变量,它把一个从执行引擎接收到的值赋给本地内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操做。
  • store(存储):做用于本地内存的变量,它把本地内存中的一个变量的值传送到主内存中,以便随后的write操做使用。
  • write(写入):做用于主内存的变量,它把store操做从本地内存中提到的变量的值放入到主内存的变量中。

若是要把一个变量从主内存模型复制到本地内存,那就要顺序的执行read和load操做,若是要把变量从本地内存同步回主内存,就要顺序的执行store和write操做。注意,Java内存模型只要求上述两个操做必须按顺序执行,而没有保证是连续执行。也就是说read与load之间、store与write之间是可插入其余指令的,如对主内存中的变量a、b进行访问时,一种可能出现的顺序是read a read b、load b、load a。

内存屏障

内存屏障是一组处理器指令(前面的8个操做指令),用于实现对内存操做的顺序限制。包括LoadLoad, LoadStore, StoreLoad, StoreStore共4种内存屏障。内存屏障存在的意义是什么呢?它是在Java编译器生成指令序列的适当位置插入内存屏障指令来禁止特定类型的处理器重排序,从而让程序按咱们预想的流程去执行,内存屏障是与相应的内存重排序相对应的。JMM把内存屏障指令分为4类

StoreLoad Barriers是一个“全能型 ”的屏障,它同时具备其余3个屏障的效果。如今的多数处理器大多支持该屏障(其余类型的屏障不必定被全部处理器支持)。执行该屏障开销会很昂贵,由于当前处理器一般要把写缓冲区中的数据所有刷新到内存中。

数据依赖性

若是两个操做访问同一个变量,且这两个操做中有一个为写操做,此时这两个操做之间就存在数据依赖性。数据依赖性分3种类型:写后读、写后写、读后写。这3种状况,只要重排序两个操做的执行顺序,程序的执行结果就会被改变。编译器和处理器可能对操做进行重排序。而它们进行重排序时,会遵照数据依赖性,不会改变数据依赖关系的两个操做的执行顺序。

这里所说的数据依赖性仅针对单个处理器中执行的指令序列和单个线程中执行的操做,不一样处理器之间和不一样线程之间的数据依赖性不被编译器和处理器考虑。

顺序一致性内存模型

顺序一致性内存模型是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型做为参照。它有两个特性:

  • 一个线程中的全部操做必须按照程序的顺序来执行
  • (无论程序是否同步)全部线程都只能看到一个单一的操做执行顺序。在顺序一致性的内存模型中,每一个操做必须原子执行而且马上对全部线程可见。

从顺序一致性模型中,咱们能够知道程序全部操做彻底按照程序的顺序串行执行。而在JMM中,临界区内的代码能够重排序(但JMM不容许临界区内的代码“逸出”到临界区外,那样就破坏监视器的语义)。JMM会在退出临界区和进入临界区这两个关键时间点作一些特别处理,使得线程在这两个时间点具备与顺序一致性模型相同的内存视图。虽然线程A在临界区内作了重排序,但因为监视器互斥执行的特性,这里的线程B根本没法“观察”到线程A在临界区内的重排序。这种重排序既提升了执行效率,又没有改变程序的执行结果。像前面单例示例的类初始化解决方案就是采用了这个思想。

as-if-serial

as-if-serial的意思是无论怎么重排序,(单线程)程序的执行结果不能改变。编译器、runtime和处理器都必须遵照as-if-serial语义。为了遵照as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操做作重排序。

as-if-serial语义把单线程程序保护了起来,遵照as-if-serial语义的编译器、runtime和处理器共同为编写单线程程序的程序员建立了一个幻觉:单线程程序是按程序的顺序来执行的。as-if-serial语义使单线程程序员无需担忧重排序会干扰他们,也无需担忧内存可见性问题。

happens-before

happens-before是JMM最核心的概念。从JDK5开始,Java使用新的JSR-133内存模型,JSR-133 使用happens-before的概念阐述操做之间的内存可见性,若是一个操做执行的结果须要对另外一个操做可见,那么这两个操做之间必须存在happens-before关系。

happens-before规则以下:

  • 程序次序法则:线程中的每一个动做 A 都 happens-before 于该线程中的每个动做 B,其中,在程序中,全部的动做 B 都出如今动做 A 以后。(注:此法则只是要求遵循 as-if-serial语义)
  • 监视器锁法则:对一个监视器锁的解锁 happens-before 于每个后续对同一监视器锁的加锁。(显式锁的加锁和解锁有着与内置锁,即监视器锁相同的存储语意。)
  • volatile变量法则:对 volatile 域的写入操做 happens-before 于每个后续对同一域的读操做。(原子变量的读写操做有着与 volatile 变量相同的语意。)(volatile变量具备可见性和读写原子性。)
  • 线程启动法则:在一个线程里,对 Thread.start 的调用会 happens-before 于每个启动线程中的动做。 线程终止法则:线程中的任何动做都 happens-before 于其余线程检测到这个线程已终结,或者从 Thread.join 方法调用中成功返回,或者 Thread.isAlive 方法返回false。
  • 中断法则法则:一个线程调用另外一个线程的 interrupt 方法 happens-before 于被中断线程发现中断(经过抛出InterruptedException, 或者调用 isInterrupted 方法和 interrupted 方法)。
  • 终结法则:一个对象的构造函数的结束 happens-before 于这个对象 finalizer 开始。
  • 传递性:若是 A happens-before 于 B,且 B happens-before 于 C,则 A happens-before 于 C。

happens-before与JMM的关系以下图所示

as-if-serial语义和happens-before本质上同样,参考顺序一致性内存模型的理论,在不改变程序执行结果的前提下,给编译器和处理器以最大的自由度,提升并行度。

重排序

终于谈到咱们反复说起的重排序了,重排序是指编译器和处理器为了优化程序性能而对指令序列进行从新排序的一种手段。重排序分3种类型。

  • 编译器优化的重排序。编译器在不改变单线程程序语义(as-if-serial )的前提下,能够从新安排语句的执行顺序。
  • 指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction Level Parallelism,ILP)来将多条指令重叠执行。若是不存在数据依赖性,处理器能够改变语句对机器指令的执行顺序。
  • 内存系统的重排序。因为处理器使用缓存和读/写缓冲区,这使得加载和存储操做看上去多是在乱序执行。

从Java源代码到最终实际执行的指令序列,会分别经历下面3种重排序

上述的1属于编译器重排序,2和3属于处理器重排序。这些重排序可能会致使多线程程序出现内存可见性问题。对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是全部的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令序列时,插入特定类型的内存屏障指令,经过内存屏障指令来禁止特定类型的处理器重排序。

JMM属于语言级的内存模型,它确保在不一样的编译器和不一样的处理器平台之上,经过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。

从JMM设计者的角度来讲,在设计JMM时,须要考虑两个关键因素:

  • 程序员对内存模型的使用。程序员但愿内存模型易于理解,易于编程。程序员但愿基于一个强内存模型(程序尽量的顺序执行)来编写代码。
  • 编译器和处理器对内存模型的实现。编译器和处理器但愿内存模型对它们的束缚越少越好,这样它们就能够作尽量多的优化(对程序重排序,作尽量多的并发)来提升性能。编译器和处理器但愿实现一个弱内存模型。

JMM设计就须要在这二者之间做出协调。JMM对程序采起了不一样的策略:

  • 对于会改变程序执行结果的重排序,JMM要求编译器和处理器必须禁止这种重排序。
  • 对于不会改变程序执行结果的重排序,JMM对编译器和处理器不做要求(JMM容许这种重排序)。

介绍完了这几个基本概念,咱们不难推断出JMM是围绕着在并发过程当中如何处理原子性、可见性和有序性这三个特征来创建的:

  • 原子性:由Java内存模型来直接保证的原子性操做就是咱们前面介绍的8个原子操做指令,其中lock(lock指令实际在处理器上原子操做体现对总线加锁或对缓存加锁)和unlock指令操做JVM并未直接开放给用户使用,可是却提供了更高层次的字节码指令monitorenter和monitorexit来隐式使用这两个操做,这两个字节码指令反映到Java代码中就是同步块——synchronize关键字,所以在synchronized块之间的操做也具有原子性。除了synchronize,在Java中另外一个实现原子操做的重要方式是自旋CAS,它是利用处理器提供的cmpxchg指令实现的。至于自旋CAS后面J.U.C中会详细介绍,它和volatile是整个J.U.C底层实现的核心。
  • 可见性:可见性是指一个线程修改了共享变量的值,其余线程可以当即得知这个修改。而咱们上文谈的happens-before原则禁止某些处理器和编译器的重排序,来保证了JMM的可见性。而体如今程序上,实现可见性的关键字包含了volatile、synchronize和final。
  • 有序性:谈到有序性就涉及到前面说的重排序和顺序一致性内存模型。咱们也都知道了as-if-serial是针对单线程程序有序的,即便存在重排序,可是最终程序结果仍是不变的,而多线程程序的有序性则体如今JMM经过插入内存屏障指令,禁止了特定类型处理器的重排序。经过前面8个操做指令和happens-before原则介绍,也不难推断出,volatile和synchronized两个关键字来保证线程之间的有序性,volatile自己就包含了禁止指令重排序的语义,而synchronized则是由监视器法则得到。

J.U.C

谈完了JMM,那么Java相关类库是如何实现的呢?这里就谈谈J.U.C( java.util.concurrent),先来张J.U.C的思惟导图

不难看出,J.U.C由atomic、locks、tools、collections、executor这五部分组成。它们的实现基于volatile的读写和CAS所具备的volatile读和写。AQS(AbstractQueuedSynchronizer,队列同步器)、非阻塞数据结构和原子变量类,这些J.U.C中的基础类都是使用了这种模式实现的,而J.U.C中的高层类又依赖于这些基础类来实现的。从总体上看,J.U.C的实现示意图以下

也许你对volatile和CAS的底层实现原理不是很了解,这里先这里先简单介绍下它们的底层实现

volatile

Java语言规范第三版对volatile的定义为:Java编程语言容许线程访问共享变量,为了确保共享变量能被准确和一致性的更新,线程应该确保经过排他锁单独得到这个变量。若是一个字段被声明为volatile,Java内存模型确保这个全部线程看到这个值的变量是一致的。而volatile是如何来保证可见性的呢?若是对声明了volatile的变量进行写操做,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存(Lock指令会在声言该信号期间锁总线/缓存,这样就独占了系统内存)。可是,就算是写回到内存,若是其余处理器缓存的值仍是旧的,再执行计算操做就会有问题。因此,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每一个处理器经过嗅探在总线(注意处理器不直接跟系统内存交互,而是经过总线)上传播的数据来检查本身缓存的值是否是过时了,当处理器发现直接缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操做的时候,会从新从系统内存中把数据读处处理器缓存里。

CAS

CAS其实应用挺普遍的,咱们经常听到的悲观锁乐观锁的概念,乐观锁(无锁)指的就是CAS。这里只是简单说下在并发的应用,所谓的乐观并发策略,通俗的说,就是先进性操做,若是没有其余线程争用共享数据,那操做就成功了,若是共享数据有争用,产生了冲突,那就采起其余的补偿措施(最多见的补偿措施就是不断重试,治到成功为止,这里其实也就是自旋CAS的概念),这种乐观的并发策略的许多实现都不须要把线程挂起,所以这种操做也被称为非阻塞同步。而CAS这种乐观并发策略操做和冲突检测这两个步骤具有的原子性,是靠什么保证的呢?硬件,硬件保证了一个从语义上看起来须要屡次操做的行为只经过一条处理器指令就能完成。

也许你会存在疑问,为何这种无锁的方案通常会比直接加锁效率更高呢?这里其实涉及到线程的实现和线程的状态转换。实现线程主要有三种方式:使用内核线程实现、使用用户线程实现和使用用户线程加轻量级进程混合实现。而Java的线程实现则依赖于平台使用的线程模型。至于状态转换,Java定义了6种线程状态,在任意一个时间点,一个线程只能有且只有其中的一种状态,这6种状态分别是:新建、运行、无限期等待、限期等待、阻塞、结束。 Java的线程是映射到操做系统的原生线程之上的,若是要阻塞或唤醒一个线程,都须要操做系统来帮忙完成,这就须要从用户态转换到核心态中,所以状态转换须要耗费不少的处理器时间。对于简单的同步块(被synchronized修饰的方法),状态转换消耗的时间可能比用户代码执行的时间还要长。因此出现了这种优化方案,在操做系统阻塞线程之间引入一段自旋过程或一直自旋直到成功为止。避免频繁的切入到核心态之中。

可是这种方案其实也并不完美,在这里就说下CAS实现原子操做的三大问题

  • ABA问题。由于CAS须要在操做值的时候,检查值有没有变化,若是没有发生变化则更新,可是若是一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有变化,可是实际上发生变化了。ABA解决的思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1。JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。不过目前来讲这个类比较“鸡肋”,大部分状况下ABA问题不会影响程序并发的正确性,若是须要解决ABA问题,改用原来的互斥同步可能会比原子类更高效。
  • 循环时间长开销大。自旋CAS若是长时间不成功,会给CPU带来很是大的执行开销。因此说若是是长时间占用锁执行的程序,这种方案并不适用于此。
  • 只能保证一个共享变量的原子操做。当对一个共享变量执行操做时,咱们可使用自旋CAS来保证原子性,可是对多个共享变量的操做时,自旋CAS就没法保证操做的原子性,这个时候能够用锁。

谈完了这两个概念,下面咱们就来逐个分析这五部分的具体源码实现

atomic

atomic包的原子操做类提供了一种简单、性能高效、线程安全操做一个变量的方式。atomic包里一共13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用、原子更新属性。atomic包里的类基本使用Unsafe实现的包装类。

下面经过一个简单的CAS方式实现计数器(一个线程安全的计数器方法safeCount和一个非线程安全的计数器方法count)的示例来讲下

public class CASTest {

    public static void main(String[] args){
        final Counter cas=new Counter();
        List ts=new ArrayList(600);
        long start=System.currentTimeMillis();
        for(int j=0;j<100;j++){
            Thread t=new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=0;i<10000;i++){
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }
        for(Thread t:ts){
            t.start();
        }

        for(Thread t:ts){
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis()-start);
    }

}

public class Counter {
    public AtomicInteger atomicI=new AtomicInteger(0);
    public int i=0;

    /** * 使用CAS实现线程安全计数器 */
    public void safeCount(){
        for(;;){
            int i=atomicI.get();
            boolean suc=atomicI.compareAndSet(i,++i);
            if(suc){
                break;
            }
        }
    }

    /** * 非线程安全计数器 */
    public void count(){
        i++;
    }

}

safeCount()方法的代码块实际上是getandIncrement()方法的实现,源码for循环体第一步优先取得atomicI里存储的数值,第二步对atomicI的当前数值进行加1操做,关键的第三步调用compareAndSet()方法来进行原子更新操做,该方法先检查当前数值是否等于current,等于意味着atomicI的值没有被其余线程修改过,则将atomicI的当前数值更新成next的值,若是不等compareAndSet()方法会返回false,程序则进入for循环从新进行compareAndSet()方法操做进行不断尝试直到成功为止。在这里咱们跟踪下compareAndSet()方法以下

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

从上面源码咱们发现是使用Unsafe实现的,其实atomic里的类基本都是使用Unsafe实现的。咱们再回到这个本地方法调用,这个本地方法在openjdk中依次调用c++代码为unsafe.cpp、atomic.app和atomic_windows_x86.inline.hpp。关于本地方法实现的源代码这里就不贴出来了,其实大致上是程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。若是程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(Lock Cmpxchg)。反之,若是程序是在单处理器上运行,就省略lock前缀(单处理器自身就会维护单处理器内的顺序一致性,不须要lock前缀提供的内存屏障效果)。

locks

锁是用来控制多个线程访问共享资源的形式,Java SE 5以后,J.U.C中新增了locks来实现锁功能,它提供了与synchronized关键字相似的同步功能。只是在使用时须要显示的获取和释放锁。虽然它缺乏了隐式获取和释放锁的便捷性,可是却拥有了锁获取和释放的可操做性、可中断的获取锁及超时获取锁等多种synchronized关键字不具有的同步特性。

locks在这咱们只介绍下核心的AQS(AbstractQueuedSynchronizer,队列同步器),AQS是用来构建锁或者其余同步组件的基础框架,它使用一个用volatile修饰的int成员变量表示同步状态。经过内置的FIFO队列来完成资源获取线程的排队工做。同步器的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程免不了要对同步状态进行更改,这时候就会使用到AQS提供的3个方法:getState()、setState()和compareAndSetState()来进行操做,这是由于它们可以保证状态的改变是原子性的。为何这么设计呢?由于锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节,而AQS面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操做。锁和AQS很好的隔离了使用者和实现者锁关注的领域。

如今咱们就自定义一个独占锁来详细解释下AQS的实现机制

public class Mutex implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -4387327721959839431L;

        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

实现自定义组件的时候,咱们能够看到,AQS可重写的方法是tryAcquire()——独占式获取同步状态、tryRelease()——独占式释放同步状态、tryAcquireShared()——共享式获取同步状态、tryReleaseShared ()——共享式释放同步状态、isHeldExclusively()——是否被当前线程所独占。这个示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只容许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire()中,若是通过CAS设置成功(同步状态设置为1),则表示获取了同步状态,而在tryRelease()中,只是将同步状态重置为0。接着咱们对比一下重入锁(ReentrantLock)的源码实现

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */
        abstract void lock();

        /** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /** * Reconstitutes this lock instance from a stream. * @param s the stream */
        private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    /** * Sync object for non-fair locks */
    static final class NonfairSync extends Sync {
    //todo sth...
    }
    //todo sth...
}

重入锁分公平锁和不公平锁,默认使用的是不公平锁,在这咱们看到实现重入锁大致上跟咱们刚才自定义的独占锁差很少,可是有什么区别呢?咱们看看重入锁nonfairTryAcquire()方法实现:首先获取同步状态(默认是0),若是是0的话,CAS设置同步状态,非0的话则判断当前线程是否已占有锁,若是是的话,则偏向更新同步状态。从这里咱们不难推断出重入锁的概念,同一个线程能够屡次得到同一把锁,在释放的时候也必须释放相同次数的锁。经过对比相信你们对自定义一个锁有了一个初步的概念,也许你存在疑问咱们重写的这几个方法在AQS哪地方用呢?如今咱们来继续往下跟踪,咱们深刻跟踪下刚才自定义独占锁lock()方法里面acquire()的实现

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这个方法在AQS类里面,看到里面的tryAcquire(arg)你们也就明白了,tryAcquire(arg)方法获取同步状态,后面acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法就是说的节点构造、加入同步队列及在同步队列中自旋等待的AQS没暴露给咱们的相关操做。大致的流程就是首先调用自定义同步器实现的tryAcquire()方法,该方法保证线程安全的获取同步状态,若是获取同步状态失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并经过addWaiter()方法将该节点加入到同步队列的尾部,最后调用acquireQueued()方法,使得该节点以“死循环”的方式获取同步状态。若是获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要靠前驱节点的出队或阻塞线程被中断来实现。也许你仍是不明白刚才所说的,那么咱们继续跟踪下addWaiter()方法的实现

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面的代码经过使用compareAndSetTail()方法来确保节点可以被线程安全添加。在enq()方法中,同步器经过“死循环”来确保节点的正确添加,在”死循环“中只有经过CAS将节点设置成为尾节点以后,当前线程才可以从该方法返回,不然,当前线程不断地尝试重试设置。

在节点进入同步队列以后,发生了什么呢?如今咱们继续跟踪下acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

从上面的代码咱们不难看出,节点进入同步队列以后,就进入了一个自旋的过程,每一个节点(或者说每一个线程)都在自省的观察,当条件知足时(本身的前驱节点是头节点就进行CAS设置同步状态)就得到同步状态,而后就能够从自旋的过程当中退出,不然依旧在这个自旋的过程当中。

collections

从前面的思惟导图咱们能够看到并发容器包括链表、队列、HashMap等.它们都是线程安全的。

  • ConcurrentHashMap : 一个高效的线程安全的HashMap。
  • CopyOnWriteArrayList : 在读多写少的场景中,性能很是好,远远高于vector。
  • ConcurrentLinkedQueue : 高效并发队列,使用链表实现,能够当作线程安全的LinkedList。
  • BlockingQueue : 一个接口,JDK内部经过链表,数组等方式实现了这个接口,表示阻塞队列,很是适合用做数据共享 。
  • ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找 。

另外Collections工具类能够帮助咱们将任意集合包装成线程安全的集合。在这里重点说下ConcurrentHashMap和BlockingQueue这两个并发容器。

咱们都知道HashMap线程不安全的,而咱们能够经过Collections.synchronizedMap(new HashMap<>())来包装一个线程安全的HashMap或者使用线程安全的HashTable,可是它们的效率都不是很好,这时候咱们就有了ConcurrentHashMap。为何ConcurrentHashMap高效且线程安全呢?其实它使用了锁分段技术来提升了并发的访问率。假如容器里有多把锁,每一把锁用于锁容器的一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效地提升并发访问效率,这就是锁分段技术。首先将数据分红一段段的存储,而后给每段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。而既然数据被分红了多个段,线程如何定位要访问的段的数据呢?这里实际上是经过散列算法来定位的。

如今来谈谈阻塞队列,阻塞队列其实跟后面要谈的线程池息息相关的,JDK7提供了7个阻塞队列,分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

若是队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?若是让你来设计阻塞队列你会如何设计,让生产者和消费者可以高效率的进行通信呢?让咱们先来看看JDK是如何实现的。

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。经过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码以下:

private final Condition notFull;
private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        //省略其余代码
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

当咱们往队列里插入一个元素时,若是队列不可用,阻塞生产者主要经过LockSupport.park(this)来实现

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,而后调用unsafe.park阻塞当前线程。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是个native方法,代码以下:

public native void park(boolean isAbsolute, long time);

park这个方法会阻塞当前线程,只有如下四种状况中的一种发生时,该方法才会返回。

  • 与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,而后再执行的park。
  • 线程被中断时。
  • 若是参数中的time不是零,等待了指定的毫秒数时。
  • 发生异常现象时。这些异常事先没法肯定。

咱们继续看一下JVM是如何实现park方法的,park在不一样的操做系统使用不一样的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码以下:

void os::PlatformEvent::park() {
            int v ;
            for (;;) {
                v = _Event ;
            if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
            }
            guarantee (v >= 0, "invariant") ;
            if (v == 0) {
            // Do this the hard way by blocking ...
            int status = pthread_mutex_lock(_mutex);
            assert_status(status == 0, status, "mutex_lock");
            guarantee (_nParked == 0, "invariant") ;
            ++ _nParked ;
           while (_Event < 0) {          
           status = pthread_cond_wait(_cond, _mutex);        
           // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... 
           // Treat this the same as if the wait was interrupted 
            if (status == ETIME) { status = EINTR; }        
            assert_status(status == 0 || status == EINTR, status, "cond_wait");        
           }    
           -- _nParked ;          
          // In theory we could move the ST of 0 into _Event past the unlock(), 
          // but then we'd need a MEMBAR after the ST. 
          _Event = 0 ;     
         status = pthread_mutex_unlock(_mutex);    
         assert_status(status == 0, status, "mutex_unlock");       
         }        
          guarantee (_Event >= 0, "invariant") ;
        }
    }

pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思能够理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。

当队列满时,生产者往阻塞队列里插入一个元素,生产者线程会进入WAITING (parking)状态。

executor

Executor框架提供了各类类型的线程池,不一样的线程池应用了前面介绍的不一样的堵塞队列

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。 对于核心的几个线程池,不管是newFixedThreadPool()、newSingleThreadExecutor()仍是newCacheThreadPool()方法,虽然看起来建立的线程具备彻底不一样的功能特色,但其内部均使用了ThreadPoolExecutor实现

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue());
}
  • newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize同样的,并使用了LinkedBlockingQueue任务队列(无界队列)的线程池。当任务提交很是频繁时,该队列可能迅速膨胀,从而系统资源耗尽。
  • newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1。
  • newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它老是破事线程池增长新的线程来执行任务。当任务执行完后因为corePoolSize为0,所以空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),若是有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样作法可能会很快耗尽系统的资源,由于它会增长无穷大数量的线程。

由以上线程池的实现能够看到,它们都只是ThreadPoolExecutor类的封装。咱们看下ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor( //核心线程池,指定了线程池中的线程数量 int corePoolSize, //基本线程池,指定了线程池中的最大线程数量 int maximumPoolSize, //当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即屡次时间内会被销毁。 long keepAliveTime, //keepAliveTime的单位 TimeUnit unit, //任务队列,被提交但还没有被执行的任务。 BlockingQueue workQueue, //线程工厂,用于建立线程,通常用默认的便可 ThreadFactory threadFactory, //拒绝策略,当任务太多来不及处理,如何拒绝任务。 RejectedExecutionHandler handler) 

ThreadPoolExecutor的任务调度逻辑以下

从上图咱们能够看出,当提交一个新任务到线程池时,线程池的处理流程以下:

  • 首先线程池判断基本线程池是否已满,若是没满,建立一个工做线程来执行任务。满了,则进入下个流程。
  • 其次线程池判断工做队列是否已满,若是没满,则将新提交的任务存储在工做队列里。满了,则进入下个流程。
  • 最后线程池判断整个线程池是否已满,若是没满,则建立一个新的工做线程来执行任务,满了,则交给饱和策略来处理这个任务。

下面咱们来看看ThreadPoolExecutor核心调度代码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        /** * workerCountOf(c)获取当前线程池线程总数 * 当前线程数小于corePoolSize核心线程数时,会将任务经过addWorker(command, true)方法直接调度执行。 * 不然进入下个if,将任务加入等待队列 **/
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /** * workQueue.offer(command) 将任务加入等待队列。 * 若是加入失败(好比有界队列达到上限或者使用了synchronousQueue)则会执行else。 * **/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /** * addWorker(command, false)直接交给线程池, * 若是当前线程已达到maximumPoolSize,则提交失败执行reject()拒绝策略。 **/
        else if (!addWorker(command, false))
            reject(command);
    }

从上面的源码咱们能够知道execute的执行步骤:

  • 若是当前运行的线程少于corePoolSize,则建立新线程来执行任务(注意,执行这一步骤须要获取全局锁)。
  • 若是运行的线程等于或多于corePoolSize,则将任务加入到BlockingQueue。
  • 若是没法将任务假如BlockingQueue(队列已满),则建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。
  • 若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采起上述步骤的整体设计思路,是为了在执行execute()方法时,尽量的避免获取全局锁(那将会是一个严重的 可伸缩瓶颈)。在ThreadPoolExecutor完成预热以后(当前运行的线程数大于等于corePoolSize),几乎全部的execute()方法调用都是执行步骤2,而步骤2不须要获取全局锁。

相关文章
相关标签/搜索