AQS之ReentLock源码解析

前言:

Java中的同步类 ReentrantLock是基于AbstractQueuedSynchronizer(简称为 AQS)实现的。

今天从源码来了解下ReentrantLock中非公平锁的加锁和释放锁(ReentrantLock中支持公平锁和非公平锁,默认是非公平锁的,但能够经过建立ReentrantLock对象时传入参数指定使用公平锁)。html

在了解ReentrantLock前,须要对AQS有必定的了解,不然在学习时会比较困难的,而且在经过源码学习ReentrantLock时也会穿插着讲解AQS内容。java

AQS扫荡:

1.0、AQS中state变量

​AQS中提供了一个int类型的state变量,而且state变量被volatile修饰,表示state变量的读写操做能够保证原子性;而且AQS还提供了针对state变量的读写方法,以及使用CAS算法更新state变量的方法。 AQS使用state变量这个状态变量来实现同步状态。node

①、源码展现算法

/**
 * The synchronization state. 
 */
private volatile int state;

/**
 * get 获取state变量值 
 */
protected final int getState() {
    return state;
}

/**
 * set 更新state变量值 
 * @param newState  新的状态变量值
 */
protected final void setState(int newState) {
    state = newState;
}


/**
 * 使用CAS算法更新state变量值; 当从共享内存中读取出的state变量值与expect指望值一致的话,
 * 就将其更新为update值。使用CAS算法保证其操做的原子性
 *
 * @param expect  指望值
 * @param update  更新值
 */
protected final boolean compareAndSetState(int expect, int update) {
    // 使用Unsafe类的本地方法来实现CAS
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

1.一、state同步状态的竞争

​ 多个线程同时竞争AQS的state同步状态,在同一时刻只能有一个线程获取到同步状态(获取到锁),那其它没获取到锁的线程该怎么办呢app

它们会进去到一个同步队列中,在队列中等待同步锁的释放;

这个同步队列是一个基于链表的双向队列, 基于链表的话,就会存在Node节点,那么AQS中节点是怎么实现的呢源码分析

①、Node节点:学习

AQS中本身实现了一个内部Node节点类,Node节点类中定义了一些属性,下面来简单说说属性的意思:ui

static final class Node {
        // 标志在同步队列中Node节点的模式,共享模式 
        static final Node SHARED = new Node();
        // 标志在同步队列中Node节点的模式,独占(排他)模式 
        static final Node EXCLUSIVE = null;

        // waitStatus值为1时表示该线程节点已释放(超时等),已取消的节点不会再阻塞。 
        static final int CANCELLED =  1;
    
        // waitStatus值为-1时表示当此节点的前驱结点释放锁时,而后当前节点中的线程就能够去获取锁运行 
        static final int SIGNAL    = -1;
    
        /**
         * waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用),
         * 当其余线程调用了Condition的signal()方法后,CONDITION状态的结点将从
         * 等待队列转移到同步队列中,等待获取同步锁。
         */ 
        static final int CONDITION = -2;
    
        /**
         * waitStatus为-3时,与共享模式有关,在共享模式下,该状态表示可运行
         * (CountDownLatch中有使用)。
         */
        static final int PROPAGATE = -3;

        /**
         * waitStatus:等待状态,指的是当前Node节点中存放的线程的等待状态,
         * 等待状态值就是上面的四个状态值:CANCELLED、SIGNAL、CONDITION、PROPAGATE
         */
        volatile int waitStatus;

        /**
         * 由于同步队列是双向队列,那么每一个节点都会有指向前一个节点的 prev 指针
         */
        volatile Node prev;

        /**
         * 由于同步队列是双向队列,那么每一个节点也都会有指向后一个节点的 next 指针
         */
        volatile Node next;

        /**
         * Node节点中存放的阻塞的线程引用
         */
        volatile Thread thread;

        /**
         * 当前节点与其next后继结点的所属模式,是SHARED共享模式,仍是EXCLUSIVE独占模式,
         *
         * 注:好比说当前节点A是共享的,那么它的这个字段是shared,也就是说在这个等待队列中,
         * A节点的后继节点也是shared。
         */
        Node nextWaiter;

        /**
         * 获取当前节点是否为共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 获取当前节点的 prev前驱结点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { }
    
        // 在后面的addWaiter方法会使用到,线程竞争state同步锁失败时,会建立Node节点存放thread
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

②、同步队列结构图(双向队列):this

1.二、图解AQS原理

​ 经过前面两点,能够了解到AQS的原理究竟是什么了,总结为一句话:AQS使用一个Volatile的int类型的成员变量来表示同步状态,经过内置的FIFO队列来完成资源获取的排队工做,经过CAS完成对State值的修改spa

而后再来一张图,使得理解更加深入:

图片来源:Java技术之AQS详解

好了, AQS暂时能够先了解到这里了,知道这些后,在后面了解 ReentrantLock时就会变的容易些,而且后面经过源码学习 ReentrantLock时,因为会使用到AQS的模版方法,因此也会讲解到AQS的内容。

剑指ReentrantLock源码:

2.0、ReentrantLock   vs   Synchronized

​ 在了解ReentrantLock以前,先将ReentrantLockSynchronized进行比较下,这样能够更加了解ReentrantLock的特性,也有助于下面源码的阅读;

2.一、ReentrantLock的公平锁与非公平锁

建立一个ReentrantLock对象,在建立对象时,若是不指定公平锁的话,默认是非公平锁;

①、简单了解下什么是公平锁,什么是非公平锁?

公平锁:按照申请同步锁的顺序来获取锁;

非公平锁:不会按照申请锁的顺序获取锁,存在锁的抢占;

注:后面会经过源码了解下非公平锁和公平锁是怎样获取锁的。

②、源码以下:

// 默认是非公平的锁
ReentrantLock lock = new ReentrantLock();
// 构造方法默认建立了一个 NonfairSync 非公平锁对象
public ReentrantLock() {
    // NonfairSync继承了Sync类,Sync类又继承了AQS类
    sync = new NonfairSync();
}


// 传入参数 true,指定为公平锁
ReentrantLock lock = new ReentrantLock(true);
// 传入参数的构造方法,当fair为true时,建立一个公平锁对象,不然建立一个非公平锁对象
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

2.二、经过源码看下非公平锁的加锁机制:(独占模式)

①、开始先经过一个简单流程图来看下独占模式下加锁的流程:

​ 图片来源:美团技术团队

②、源码分析:加锁时首先使用CAS算法尝试将state状态变量设置为1,设置成功后,表示当前线程获取到了锁,而后将独占锁的拥有者设置为当前线程;若是CAS设置不成功,则进入Acquire方法进行后续处理。

final void lock() {
    // 使用CAS算法尝试将state状态变量设置为1
    if (compareAndSetState(0, 1))
        // 设置成功后,表示当前线程获取到了锁,而后将独占锁的拥有者设置为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 进行后续处理,会涉及到重入性、建立Node节点加入到队列尾等
        acquire(1);
}

③、探究下acquire(1) 方法里面是什么呢 acquire(1) 方法是AQS提供的方法:

public final void acquire(int arg) {
    /**
     * 使用tryAcquire()方法,让当前线程尝试获取同步锁,获取成的话,就不会执行后面的acquireQueued()
     * 方法了,这是因为 && 逻辑运算符的特性决定的。
     *
     * 若是使用tryAcquire()方法获取同步锁失败的话,就会继续执行acquireQueued()方法,它的做用是
     * 一直死循环遍历同步队列,直到使addWaiter()方法建立的节点中线程获取到锁。
     *
     * 若是acquireQueued()返回的true,这个true不是表明成功的获取到锁,而是表明当前线程是否存在
     * 中断标志,若是存在的话,在获取到同步锁后,须要使用selfInterrupt()对当前线程进行中断。
     */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1)tryAcquire(arg) 方法源码解读:NonfairSync 非公平锁中重写了AQS的tryAcquire()方法

final boolean nonfairTryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state同步状态变量值,因为使用volatile修饰,单独的读写操做具备原子性
    int c = getState();
    // 若是状态值为0
    if (c == 0) {
        // 使用compareAndSetState方法这个CAS算法尝试将state同步状态变量设置为1 获取同步锁
        if (compareAndSetState(0, acquires)) {
            // 而后将独占锁的拥有者设置为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是拥有独占锁的的线程是当前线程的话,表示当前线程须要重复获取锁(重入锁)
    else if (current == getExclusiveOwnerThread()) {
        // 当前同步状态state变量值加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 写入state同步状态变量值,因为使用volatile修饰,单独的读写操做具备原子性
        setState(nextc);
        return true;
    }
    return false;
}

2)addWaiter( Node.EXCLUSIVE ) :建立一个同步队列Node节点,同时绑定节点的模式为独占模式,而且将建立的节点插入到同步队列尾部;addWaiter( ) 方法是AQS提供方法。

private Node addWaiter(Node mode) {
    // model参数是独占模式,默认为null;
    Node node = new Node(Thread.currentThread(), mode);
    // 将当前同步队列的tail尾节点的地址引用赋值给pre变量
    Node pred = tail;
    // 若是pre不为null,说明同步队列中存在节点
    if (pred != null) {
        // 当前节点的前驱结点指向pre尾节点
        node.prev = pred;
        // 使用CAS算法将当前节点设置为尾节点,使用CAS保证其原子性
        if (compareAndSetTail(pred, node)) {
            // 尾节点设置成功,将pre旧尾节点的后继结点指向新尾节点node
            pred.next = node;
            return node;
        }
    }
    // 若是尾节点为null,表示同步队列中尚未节点,enq()方法将当前node节点插入到队列中
    enq(node);
    return node;
}

3)、说完addWaiter( Node.EXCLUSIVE )方法,接下来讲下acquireQueued()方法,它是怎样使addWaiter()建立的节点中的线程获取到state同步锁的。(这个方法也是AQS提供的)

源码走起:

final boolean acquireQueued(final Node node, int arg) {
    // 标志cancelAcquire()方法是否执行
    boolean failed = true;
    try {
        // 标志是否中断,默认为false不中断
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的前驱结点
            final Node p = node.predecessor();
            /**
             * 若是当前节点的前驱结点已是同步队列的头结点了,说明了两点内容:
             * 一、其前驱结点已经获取到了同步锁了,而且锁还没释放
             * 二、其前驱结点已经获取到了同步锁了,可是锁已经释放了
             *
             * 而后使用tryAcquire()方法去尝试获取同步锁,若是前驱结点已经释放了锁,那么就会获取成功,
             * 不然同步锁获取失败,继续循环
             */
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为同步队列的head头结点
                setHead(node);
                // 而后将当前节点的前驱结点的后继结点置为null,帮助进行垃圾回收
                p.next = null; // help GC
                failed = false;
                // 返回中断的标志
                return interrupted;
            }
            /**
             * shouldParkAfterFailedAcquire()是对当前节点的前驱结点的状态进行判断,以及去针对各类
             * 状态作出相应处理,因为文章篇幅问题,具体源码本文不作讲解;只需知道若是前驱结点p的状态为
             * SIGNAL的话,就返回true。
             *
             * parkAndCheckInterrupt()方法会使当前线程进去waiting状态,而且查看当前线程是否被中断,
             * interrupted() 同时会将中断标志清除。
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 中断标志置为true
                interrupted = true;
        }
    } finally {
        if (failed)
            /**
             * 若是for(;;)循环中出现异常,而且failed=false没有执行的话,cancelAcquire方法
             * 就会将当前线程的状态置为 node.CANCELLED 已取消状态,而且将当前节点node移出
             * 同步队列。
             */
            cancelAcquire(node);
    }
}

4)、最后说下 selfInterrupt() 方法, 这个方法就是将当前线程进行中断:

static void selfInterrupt() {
    // 中断当前线程
    Thread.currentThread().interrupt();
}

2.三、公平锁与非公平锁在加锁时的区别:

①、公平锁 FairSync 的加锁 lock() 加锁方法:

final void lock() {
    acquire(1);
}

②、非公平锁 NonfairSync 的加锁 lock() 加锁方法:上面讲解源码的时候有提到哟,还有印象吗,没印象的话也不要紧,不要哭 , 嘿嘿,我都准备好了。 源码奉上:

final void lock() {
    /** 
     * 看到这,是否是发现了什么,非公平锁在此处直观看的话,发现比公平锁多了这几行代码; 
     * 这里就是使得线程存在了一个抢占,若是当前同步队列中的head头结点中 线程A 恰好释放了同步锁,
     * 而后此时 线程B 正好来了,那么此时线程B就会获取到锁,而此时同步队列中head头结点的后继结点中的
     * 线程C 就没法获取到同步锁,只能等待线程B释放锁后,尝试获取锁了。
     */
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

③、除了上面那处不一样以外,还有别的地方吗;别急,再看看 acquire(1) 方法是否同样呢?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

​ 诶呀,方法点进去都是同样的呀,可不嘛,都是调用的AQS提供的 acquire(1) 方法;可是别着急,上面在讲解非公平锁加锁时,有提到的 tryAcquire(arg) 方法在AQS的不一样子孙类中都有各自的实现的。如今打开公平锁的 tryAcquire(arg) 方法看看其源码与非公平锁有什么区别:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /**
         * 经过对比源码发现,公平锁比非公平锁多了这块代码: !hasQueuedPredecessors() 
         * hasQueuedPredecessors() 是作什么呢?就是判断当前同步队列中是否存在节点,若是存在节点呢,
         * 就返回true,因为前面有个 !,那么就是false,再根据 && 逻辑运算符的特性,不会继续执行了;
         * 
         * tryAcquire()方法直接返回false,后面的逻辑就和非公平锁的一致了,就是建立Node节点,并将
         * 节点加入到同步队列尾; 公平锁:发现当前同步队列中存在节点,有线程在本身前面已经申请可锁,那
         * 本身就得乖乖的向后面排队去。
         *
         * 友情提示:在生活中,咱们也须要按照先来后到去排队,保证素质; 还有就是怕大家不排队被别人打了。
         */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
松口气,从中午一直写到下午快四点了,先让我歇口气,快累成狗了;本文还剩下释放锁部分没写呢,歇口气,喝口水继续

注意:ReentrantLock在释放锁的时候,并不区分公平锁和非公平锁

2.四、经过源码看下释放锁机制:(独占模式)

①、unlock() 释放锁的方法:

public void unlock() {
    // 释放锁时,须要将state同步状态变量值进行减 1,传入参数 1
    sync.release(1);
}

②、release( int arg ) 方法解析:(此方法是AQS提供的)

public final boolean release(int arg) {
    // tryRelease方法:尝试释放锁,成功true,失败false
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 头结点不为空而且头结点的waitStatus不是初始化节点状况,而后唤醒此阻塞的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

注意:这里的判断条件为何是 h != null && h.waitStatus != 0 ?

h == null Head还没初始化。初始状况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。因此说,这里若是还没来得及入队,就会出现head == null 的状况。

h != null && waitStatus == 0 代表后继节点对应的线程仍在运行中,不须要唤醒。

h != null && waitStatus < 0 代表后继节点可能被阻塞了,须要唤醒。

③、而后再来看看tryRelease(arg) 方法:

protected final boolean tryRelease(int releases) {
    // 当前state状态值进行减一
    int c = getState() - releases;
    // 若是当前独占锁的拥有者不是当前线程,则抛出 非法监视器状态 异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 更新state同步状态值
    setState(c);
    return free;
}

④、最后看看unparkSuccessor(Node node) 方法:

private void unparkSuccessor(Node node) {
    // 获取头结点waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取当前节点的下一个节点
    Node s = node.next;
    // 若是下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled状态的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 若是当前节点的后继结点不为null,则将其节点中处于阻塞状态的线程unpark唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

注意:为何要从后往前找第一个非Cancelled的节点呢?缘由以下:

因为以前加锁时的addWaiter( )方法的缘由;

private Node addWaiter(Node mode) {
    // model参数是独占模式,默认为null;
    Node node = new Node(Thread.currentThread(), mode);
    // 将当前同步队列的tail尾节点的地址引用赋值给pre变量
    Node pred = tail;
    // 若是pre不为null,说明同步队列中存在节点
    if (pred != null) {
        // 当前节点的前驱结点指向pre尾节点
        node.prev = pred;
        // 使用CAS算法将当前节点设置为尾节点,使用CAS保证其原子性
        if (compareAndSetTail(pred, node)) {
            // 尾节点设置成功,将pre旧尾节点的后继结点指向新尾节点node
            pred.next = node;
            return node;
        }
    }
    // 若是尾节点为null,表示同步队列中尚未节点,enq()方法将当前node节点插入到队列中
    enq(node);
    return node;
}

从这里能够看到,节点入队并非原子操做,也就是说,node.prev = pred ; compareAndSetTail( pred, node ) 这两个地方能够看做Tail入队的原子操做,可是此时 pred.next = node; 还没执行,若是这个时候执行了unparkSuccessor方法,就没办法从前日后找了,因此须要从后往前找。还有一点缘由,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,所以也是必需要从后往前遍历才可以遍历彻底部的Node。

end! 长吸一口气,终于本文算是写完了,最后再看看有没有错别字,以及排排版。

后续还会出一篇结合CountDownLatch源码学习共享锁(共享模式)的文章。

参考资料:

一、从ReentrantLock的实现看AQS的原理及应用

二、Java技术之AQS详解

相关文章
相关标签/搜索