java 锁实现——volatile、synchronized、ReentrantLock

欢迎访问我的个人博客休息的风
java的锁实现,有最轻量级volatile同步机制,有基于jvm语义的synchronized关键字,也有基于AQS实现的重入锁,读写锁。本篇博客主要分析volatile的语义,synchronized在jvm层面的实现以及ReentrantLock是如何基于AQS实现的。
  • 指令重排序
jvm在执行java程序语句时,会出于优化,对程序的指令进行重排序。举个例子,对于int a = 1;
long b = 2L; 
float c = 2.1f;
这三行代码,jvm底层并不一定是先执行第一条,再执行第二条,第三条。有可能会先执行long b = 2L;这句代码,再执行float c = 2.1f,最后执行int a = 1;这是由jvm指令重排决定的。

  • 内存屏障
所谓内存屏障,也拿前面讲到的例子说明。
int a = 1;
long b = 2L; 
float c = 2.1f;
这三行代码,如果在long b = 2L;这句代码后面加上内存屏障,那么指令重排时,只能对int a = 1; long b = 2L重排。float c = 2.1f这句代码的执行一定在long b = 2L之后。内在屏障的作用就是重排序时不能把后面的指令重排序到内存屏障之前的位置。

  • hapen-before
java虚拟机规定了一些不需要内存屏障就“天然”先行发生的规则。有如下这几个规则:

1、程序次序规则(Program Order Rule):在一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作。准确地说,应该是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构

2、管程锁定规则(Monitor Lock Rule):一个unlock操作先行发生于后面对同一个锁的lock操作。这里必须强调的是同一个锁,而“后面”是指时间上的先后顺序。

3、volatile变量规则(Volatile Variable Rule):对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的“后面”同样是指时间上的先后顺序。

4、线程启动规则(Thread Start Rule):Thread对象的start()方法先行发生于此线程的每一个动作。

5、线程终止规则(Thread Termination Rule):线程中的所有操作都先行发生于对此线程的终止检测

6、线程中断规则(Thread Interruption Rule):对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生

7、对象终结规则(Finalizer Rule):一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

8、传递性(Transitivity):如果操作A先行发生于操作B,操作B先行发生于操作C,那就可以得出操作A先行发生于操作C的结论。

  • volatile语义
要理解volatile语义,要先从java内存模型说起。java的内存模型如下图:


java内存分为工作内存和主内存。工作内存是每个线程私有的,主内存是公共的。使用内存变量时,要先从主内存拷贝到工作内存,线程再使用。这里有个问题,如果位于主内存的变量A,在被线程A拷贝到工作内存后,线程B就把自已工作内存里的变量A同步回了主内存,也就是改变了主内存里变量A的值,那么此时,之前线程A拷贝到工作内存里的变量A就不是最新值,之后做的操作也有可能是错误的。为了解决这个问题,引入了volatile关键字。
也就是说volatile关键字,确保了变量A在各个工作内存间是可见的,就是在其他线程对主内存里的变量A进行修改后,各个工作内存里的变量A要使用前,需再获取最新的值。这样确保变量A的值都是最新的。

  • synchronized jvm实现
synchronized关键字,在jvm字节码指令是monitorenter,和monitorexit。也就是说,如果一个线程获取了这个对象的monitor,那么其他线程也要获取这个对象时就必须等待上一个线程释放这个对象的monitor。jvm有专门对这个重量级锁的优化,下面分别分析轻量级锁和偏向锁。

  •  轻量级锁
所谓轻量级锁,就是jvm认为,对于绝大部分的锁,在整个同步周期内都是不存在竞争的。也就是说如果一线程获取了锁,其他线程再去获取这个锁的概率比较低。不使用互斥量这样的重量级锁,而使用CAS操作去设置。但是如果另一个线程同样使用CAS去获取这个锁,轻量级锁就会膨胀为重量级锁。轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用操作系统互斥量产生的性能消耗。
要理解轻量级锁,首先要介绍一下Hotspot的对象头这样一个数据结构。


对象头存储的信息就是存储内容一栏,标志位的值的意思就是状态一栏的解释。比如标志位如果是01,表示未锁定;
00表示轻量级锁定等。

使用轻量级锁加锁的过程如下:
1、代码进入同步块,在栈帧中创建锁记录(Lock record),存储对象头的拷贝,如图13-3
2、虚拟机CAS修改对象头指向锁记录的指针,指向栈帧中的锁记录地址。
3、如果CAS更新成功,则状态位改为00,表示地象处于轻量级锁的状态,如图13-4
4、如果CAS更新失败,虚拟机首先检查对象的对象头是否指向当前线程。是,则当前线程已经拥有这个对象锁,可以往下执行代码;否则,就是其他线程在争这个锁,那轻量级锁就会膨胀为重量级锁,标志位设置为10




轻量级锁解锁的过程也是用CAS操作的,如下:
1、把栈帧中的锁记录与对象头用CAS换回来,
2、替换成功,则同步周期完成
3、替换失败,则说明有其他线程在竞争这个锁,释放锁的时候要唤醒挂起的线程。
使用轻量级锁,在没有竞争的情况下,会比使用互斥量这样重量级锁开销要小;但是如果有竞争,会在使用CAS操作之后,还要再使用互斥量,比单纯使用互斥量开销更大。所有有资源竞争,轻量级锁开销比重时级锁更大。
  •  偏向锁
所谓偏向锁,就是在轻量级锁更进一步,这个锁偏向于第一个获取它的线程,这个线程之后获取偏向锁后,就不需要再进行同步了。但是一旦有其他线程竞争这个锁,偏向锁就会膨胀为轻量级锁或重量锁。如果说轻量级锁是在无竞争的情况下使用CAS操作去消除同步使用的互斥量,那偏向锁就是在无竞争的情况下把整个同步都消除掉
偏向锁,轻量级锁及对象头的转换过程如下图:


如果对象设置为可偏向,则首先去获取偏向锁。如果有另一个线程去再去获取偏向锁,则偏向锁就结束。接下来,如果轻量级锁可用,就去获取轻量级锁,再有资源竞争,就膨胀为重量级锁。

  • ReentrantLock实现
ReentrantLock的实现基于AQS。所谓的AQS就是AbstractQueuedSynchronizer这个抽象类。我们从这个类入手,结合源码分析ReentrantLock非公平锁的加锁和解锁过程。
AbstractQueuedSynchronizer基本的原理是:使用CAS去获取锁(改变state的值),如果已被获取(state的值非0),则将线程封装为node节点。并阻塞线程加入到等待队列中。待拿到锁的线程释放锁后,根据是否公平,唤醒等待队列中的线程,并把相应的node移出队列。
Node节点的数据结构意义:
属    性 定    义
Node SHARED = new Node() 表示Node处于共享模式
Node EXCLUSIVE = null 表示Node处于独占模式
int CANCELLED = 1 因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,
只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,
被GC回收
int SIGNAL = -1 表示这个Node的继任Node被阻塞了,到时需要通知它
 int CONDITION = -2 表示这个Node在条件队列中,因为等待某个条件而被阻塞 
int PROPAGATE = -3 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
 int waitStatus 0,新Node会处于这种状态 
 Node prev 队列中某个Node的前驱Node 
 Node next 队列中某个Node的后继Node 
Thread thread 这个Node持有的线程,表示等待锁的线程
Node nextWaiter 表示下一个等待condition的Node


具体我们来分析ReentrantLock的源码:
NonfairSync.lock方法
final void lock() {
    //cas操作,设置state的值为1,表示锁被获取
    if (compareAndSetState(0, 1))
        //state设置成功后,更新独享线程为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //state没有更新成功,说明锁已被获取,加入等待队列中
        acquire(1);
}

如果有锁的竞争,会进入acquire方法中,这个方法位于AbstractQueuedSynchronizer类中
public final void acquire(int arg) {
    //这里,先tryAcquire在子类中尝试获取锁
    //如果获取失败,则调用addWaiter将当前线程封装为等待的Node节点,调用acquireQueued加入到队列中
    //加入队列失败就中断自身线程
    if (!tryAcquire(arg) &&
            acquireQueued(封装为等待Node节点addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
        selfInterrupt();
}

一一对尝试获取锁tryAcquire、封装为等待Node节点addWaiter和加入队列acquireQueued这三个方法进行分析。
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

在NonfairSync子类里,会去调用nonfairTryAcquire尝试获取
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //当前锁没被获取,就以CAS设置state值,并设置独占线程为当前线程
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //这里实现可重入的功能,当前独占线程的值就是当前线程,则state状态值加一,成功获取锁
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

接下来,分析位于AbstractQueuedSynchronizer的addWaiter
private Node addWaiter(Node mode) {
    //封装为node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //初始化head为空节点
    enq(node);
    return node;
}

addWaiter生成的队列如下图:


在acquireQueued加入队列中,也会再次尝试获取锁,没拿到锁就设置state为-1并加入等待队列返回。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //这个循环重点关注
        for (;;) {
            //获取前驱节点
            final Node p = node.predecessor();
            //放入等待队列前会再次尝试获取锁,因为如果这个时间锁释放了,就直接获取,不必加入等待队列中
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //第一次走shouldParkAfterFailedAcquire时,会把state设置为-1,表示线程被阻塞,返回false
            //第二次时,才会去走parkAndCheckInterrupt,调用LockSupport.park阻塞当前线程,
            // 并Thread.interrupted()判断是否中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

至此,ReentrantLock加锁的过程基本完成,接下来我们看下解锁:
public void unlock() {
    sync.release(1);
}

AbstractQueuedSynchronizer中的release
public final boolean release(int arg) {
    //子类尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //这里会去唤醒等待队列中的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在唤醒线程时,会把head的state设置为0。
private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        //设置状态为0,表示未获取锁
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒head的下一个node的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
查看刚刚构建的队列,唤醒的就是head指向节点(空节点)的下一个节点的线程。

到这里,ReentrantLock释放锁的过程就基本结束了。
总结一下,本篇博客主要介绍了java的锁,从jvm的指令重排优化造成的问题引出,到monitorenter、monitorexit指令,内存屏障,happen-before原则的解决方案——synchronized和volatile关键字。在jvm层面通过轻量级锁和偏向锁进一步优化,尽量不马上使用重量级锁。再分析通过AQS代码实现ReentrantLock的加锁解锁过程。这样,我们就对java的锁机制有个整体的认识。