逐行分析AQS源码(1)——独占锁的获取

前言

AQS(AbstractQueuedSynchronizer)是JAVA中众多锁以及并发工具的基础,其底层采用乐观锁,大量使用了CAS操做, 而且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。java

AQS虽然被定义为抽象类,但事实上它并不包含任何抽象方法。这是由于AQS是被设计来支持多种用途的,若是定义抽象方法,则子类在继承时必需要覆写全部的抽象方法,这显然是不合理的。因此AQS将一些须要子类覆写的方法都设计成protect方法,将其默认实现为抛出UnsupportedOperationException异常。若是子类使用到这些方法,可是没有覆写,则会抛出异常;若是子类没有使用到这些方法,则不须要作任何操做。node

AQS中实现了锁的获取框架,锁的实际获取逻辑交由子类去实现,就锁的获取操做而言,子类必须重写 tryAcquire方法。编程

本篇咱们将以ReentrantLock的公平锁为例来详细看看使用AQS获取独占锁的流程。segmentfault

本文中的源码基于JDK1.8 。安全

系列文章目录数据结构

Java并发工具类的三板斧

在开始看AQS源码以前,咱们先来了解如下java并发工具的设计套路,我把它总结成三板斧:多线程

状态,队列,CAS

每当咱们学习一个java并发编程工具的时候,咱们首先要抓住这三点。并发

  • 状态:通常是一个state属性,它基本是整个工具的核心,一般整个工具都是在设置和修改状态,不少方法的操做都依赖于当前状态是什么。因为状态是全局共享的,通常会被设置成volatile类型,以保证其修改的可见性;
  • 队列:队列一般是一个等待的集合,大多数以链表的形式实现。队列采用的是悲观锁的思想,表示当前所等待的资源,状态或者条件短期内可能没法知足。所以,它会将当前线程包装成某种类型的数据结构,扔到一个等待队列中,当必定条件知足后,再从等待队列中取出。
  • CAS: CAS操做是最轻量的并发处理,一般咱们对于状态的修改都会用到CAS操做,由于状态可能被多个线程同时修改,CAS操做保证了同一个时刻,只有一个线程能修改为功,从而保证了线程安全,CAS操做基本是由Unsafe工具类的compareAndSwapXXX来实现的;CAS采用的是乐观锁的思想,所以经常伴随着自旋,若是发现当前没法成功地执行CAS,则不断重试,直到成功为止,自旋的的表现形式一般是一个死循环for(;;)

AQS核心实现

上面咱们已经总结了java并发编程的套路,下面咱们就以这个套路为切入点来分析AQS的实现。框架

状态

首先是找状态。函数

在AQS中,状态是由state属性来表示的,不出所料,它是volatile类型的:

private volatile int state;

该属性的值即表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,这里之因此说大于0而不说等于1是由于可能存在可重入的状况。你能够把state变量当作是当前持有该锁的线程数量。

因为本篇咱们分析的是独占锁,同一时刻,锁只能被一个线程所持有。经过state变量是否为0,咱们能够分辨当前锁是否被占用,但光知道锁是否是被占用是不够的,咱们并不知道占用锁的线程是哪个。在监视器锁中,咱们用ObjectMonitor对象的_owner属性记录了当前拥有监视器锁的线程,而在AQS中,咱们将经过exclusiveOwnerThread属性:

private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

exclusiveOwnerThread属性的值即为当前持有锁的线程,它就是咱们在分析监视器锁的原理的时候所说的“铁王座”。

队列

接着咱们来看队列,AQS中,队列的实现是一个双向链表,被称为sync queue,它表示全部等待锁的线程的集合,有点相似于咱们前面介绍synchronized原理的时候说的wait set

咱们前面说过,在并发编程中使用队列一般是将当前线程包装成某种类型的数据结构扔到等待队列中,咱们先来看看队列中的每个节点是怎么个结构:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

这个结构看起来很复杂,其实属性只有4类:

// 节点所表明的线程
volatile Thread thread;

// 双向链表,每一个节点须要保存本身的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;

// 线程所处的等待锁的状态,初始化时,该值为0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 该属性用于条件队列或者共享锁
Node nextWaiter;

注意,在这个Node类中也有一个状态变量waitStatus,它表示了当前Node所表明的线程的等待锁的状态,在独占锁模式下,咱们只须要关注CANCELLED SIGNAL两种状态便可。这里还有一个nextWaiter属性,它在独占锁模式下永远为null,仅仅起到一个标记做用,没有实际意义。这么一分析,这个Node类是否是就简单了好多?<( ̄︶ ̄)>

说完队列中的节点,咱们接着说回这个sync queue,AQS是怎么使用这个队列的呢,既然是双向链表,操纵它天然只须要一个头结点和一个尾节点:

// 头结点,不表明任何线程,是一个哑结点
private transient volatile Node head;

// 尾节点,每个请求锁的线程会加到队尾
private transient volatile Node tail;

到这里,咱们就了解到了这个sync queue的全貌:
wait queue

不过这里有一点咱们提早说一下,在AQS中的队列是一个CLH队列,它的head节点永远是一个哑结点(dummy node), 它不表明任何线程(某些状况下能够看作是表明了当前持有锁的线程),所以head所指向的Node的thread属性永远是null。只有从次头节点日后的全部节点才表明了全部等待锁的线程。也就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即便队列是空的,它也会排在第二个,咱们会在它的前面新建一个dummy节点(具体的代码咱们在后面分析源码时再详细讲)。为了便于描述,下文中咱们把除去head节点的队列称做是等待队列,在这个队列中的节点才表明了全部等待锁的线程:
dummy head

在继续往下以前咱们再对着上图总结一下Node节点各个参数的含义:

  • thread:表示当前Node所表明的线程
  • waitStatus:表示节点所处的等待状态,共享锁模式下只需关注三种状态:SIGNAL CANCELLED 初始态(0)
  • prev next:节点的前驱和后继
  • nextWaiter:进做为标记,值永远为null,表示当前处于独占锁模式

CAS操做

前面咱们提到过,CAS操做大对数是用来改变状态的,在AQS中也不例外。咱们通常在静态代码块中初始化须要CAS操做的属性的偏移量:

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

从这个静态代码块中咱们也能够看出,CAS操做主要针对5个属性,包括AQS的3个属性state,headtail, 以及Node对象的两个属性waitStatus,next。说明这5个属性基本是会被多个线程同时访问的。

定义完属性的偏移量以后,接下来就是CAS操做自己了:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

如前面所说,最终CAS操做调用的仍是Unsafe类的compareAndSwapXXX方法。

最后就是自旋了,这一点就没有什么好说的了,咱们在后面源码分析的时候再详细讲。

AQS核心属性

前面咱们以java并发编程工具类的“三板斧”为切入点分析了AQS的状态,队列和CAS操做,对这个工具类有了初步的认识。接下来,咱们就要开始进入源码分析了。在进入正式的分析以前,咱们先来总结下AQS核心属性:

(1)锁相关的属性有两个:

private volatile int state; //锁的状态
private transient Thread exclusiveOwnerThread; // 当前持有锁的线程,注意这个属性是从AbstractOwnableSynchronizer继承而来

(2)sync queue相关的属性有两个:

private transient volatile Node head; // 队头,为dummy node
private transient volatile Node tail; // 队尾,新入队的节点

(3)队列中的Node中须要关注的属性有三组:

// 节点所表明的线程
volatile Thread thread;

// 双向链表,每一个节点须要保存本身的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;

// 线程所处的等待锁的状态,初始化时,该值为0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;

拎了这些属性后,咱们下面分析源码就容易不少了。

Example: FairSync in ReentrantLock

前面已经提到, AQS大多数状况下都是经过继承来使用的, 子类经过覆写 tryAcquire 来实现本身的获取锁的逻辑,咱们这里以ReentrantLock为例来讲明锁的获取流程。

值得注意的是, ReentrantLock有 公平锁非公平锁 两种实现, 默认实现为非公平锁, 这体如今它的构造函数中:

public class ReentrantLock implements Lock, java.io.Serializable {
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    
    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    
    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync{
        ...
    }
    
    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        ...
    }
    
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    // 获取锁
    public void lock() {
        sync.lock();
    }
    
    ...
}

能够看出, FairSync 继承自 Sync, 而Sync继承自 AQS, ReentrantLock获取锁的逻辑是直接调用了 FairSync 或者 NonfairSync的逻辑.

好了, ReentrantLock 就简单说到这里, 之后咱们有机会再详细讲, 这里直接以 FairLock 为例, 来逐行分析锁的获取:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    //获取锁
    final void lock() {
        acquire(1);
    }
    ...
}

lock 方法调用的 acquire方法来自父类AQS。

这里首先给出完整的获取锁的流程图, 再逐行分析代码, 由于看源码的时候, 代码会在函数或者循环中来回跳转,读者能够对照如下流程图, 就不容易被绕晕了.

ReentrantLock 公平锁获取流程图

acquire

acquire 定义在AQS类中,描述了获取锁的流程

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

能够看出, 该方法中涉及了四个方法的调用:

(1)tryAcquire(arg)

该方法由继承AQS的子类实现, 为获取锁的具体逻辑。

(2)addWaiter(Node mode)

该方法由AQS实现, 负责在获取锁失败后调用, 将当前请求锁的线程包装成Node扔到sync queue中去,并返回这个Node。

(3)acquireQueued(final Node node, int arg)

该方法由AQS实现,这个方法比较复杂, 主要对上面刚加入队列的Node不断尝试如下两种操做之一:

  • 在前驱节点就是head节点的时候,继续尝试获取锁
  • 将当前线程挂起,使CPU再也不调度它

(4)selfInterrupt

该方法由AQS实现, 用于中断当前线程。因为在整个抢锁过程当中,咱们都是不响应中断的。那若是在抢锁的过程当中发生了中断怎么办呢,总不能伪装没看见呀。AQS的作法简单的记录有没有有发生过中断,若是返回的时候发现曾经发生过中断,则在退出acquire方法以前,就调用selfInterrupt自我中断一下,就好像将这个发生在抢锁过程当中的中断“推迟”到抢锁结束之后再发生同样。

从上面的简单介绍中能够看出,除了获取锁的逻辑 tryAcquire(arg)由子类实现外, 其他方法均由AQS实现。

接下来咱们重点来看 FairSync 所实现的获取锁的逻辑:

tryAcquire

tryAcquire 获取锁的逻辑其实很简单——判断当前锁有没有被占用:

  1. 若是锁没有被占用, 尝试以公平的方式获取锁
  2. 若是锁已经被占用, 检查是否是锁重入

获取锁成功返回true, 失败则返回false

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 首先获取当前锁的状态
    int c = getState(); 
    
    // c=0 说明当前锁是avaiable的, 没有被任何线程占用, 能够尝试获取
    // 由于是实现公平锁, 因此在抢占以前首先看看队列中有没有排在本身前面的Node
    // 若是没有人在排队, 则经过CAS方式获取锁, 就能够直接退出了
    if (c == 0) {
        if (!hasQueuedPredecessors() 
        /* 为了阅读方便, hasQueuedPredecessors源码就直接贴在这里了, 这个方法的本质其实是检测本身是否是head节点的后继节点,即处在阻塞队列第一位的节点
            public final boolean hasQueuedPredecessors() {
                Node t = tail; 
                Node h = head;
                Node s;
                return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
            }
        */
        && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 将当前线程设置为占用锁的线程
            return true;
        }
    }
    
    // 若是 c>0 说明锁已经被占用了
    // 对于可重入锁, 这个时候检查占用锁的线程是否是就是当前线程,是的话,说明已经拿到了锁, 直接重入就行
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        /* setState方法以下:
        protected final void setState(int newState) {
            state = newState;
        }
        */
        return true;
    }
    
    // 到这里说明有人占用了锁, 而且占用锁的不是当前线程, 则获取锁失败
    return false;
}

从这里能够看出,获取锁其实主要就是干一件事:

将state的状态经过CAS操做由0改写成1

因为是CAS操做,必然是只有一个线程能执行成功。则执行成功的线程即获取了锁,在这以后,才有权利将exclusiveOwnerThread的值设成本身,从而“坐上铁王座”。
另外对于可重入锁,若是当前线程已是获取了锁的线程了,它还要注意增长锁的重入次数。

值得一提的是,这里修改state状态的操做,一个用了CAS方法compareAndSetState,一个用了普通的setState方法。这是由于用CAS操做时,当前线程尚未得到锁,因此可能存在多线程同时在竞争锁的状况;而调用setState方法时,是在当前线程已是持有锁的状况下,所以对state的修改是安全的,只须要普通的方法就能够了。

所以,在多线程条件下看源码时,咱们必定要时刻在心中问本身:

这段代码是不是线程安全的?同一时刻是否可能有多个线程在执行这行代码?

addWaiter

若是执行到此方法, 说明前面尝试获取锁的tryAcquire已经失败了, 既然获取锁已经失败了, 就要将当前线程包装成Node,加到等待锁的队列中去, 由于是FIFO队列, 因此天然是直接加在队尾。
方法调用为:

addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //将当前线程包装成Node
    // 这里咱们用注释的形式把Node的构造函数贴出来
    // 由于传入的mode值为Node.EXCLUSIVE,因此节点的nextWaiter属性被设为null
    /*
        static final Node EXCLUSIVE = null;
        
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    */
    Node pred = tail;
    // 若是队列不为空, 则用CAS方式将当前节点设为尾节点
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    // 代码会执行到这里, 只有两种状况:
    //    1. 队列为空
    //    2. CAS失败
    // 注意, 这里是并发条件下, 因此什么都有可能发生, 尤为注意CAS失败后也会来到这里
    enq(node); //将节点插入队列
    return node;
}

可见,每个处于独占锁模式下的节点,它的nextWaiter必定是null。
在这个方法中,咱们首先会尝试直接入队,可是由于目前是在并发条件下,因此有可能同一时刻,有多个线程都在尝试入队,致使compareAndSetTail(pred, node)操做失败——由于有可能其余线程已经成为了新的尾节点,致使尾节点再也不是咱们以前看到的那个pred了。

若是入队失败了,接下来咱们就须要调用enq(node)方法,在该方法中咱们将经过自旋+CAS的方式,确保当前节点入队。

enq

能执行到这个方法,说明当前线程获取锁已经失败了,咱们已经把它包装成一个Node,准备把它扔到等待队列中去,可是在这一步又失败了。这个失败的缘由多是如下两种之一:

  1. 等待队列如今是空的,没有线程在等待。
  2. 其余线程在当前线程入队的过程当中率先完成了入队,致使尾节点的值已经改变了,CAS操做失败。

在该方法中, 咱们使用了死循环, 即以自旋方式将节点插入队列,若是失败则不停的尝试, 直到成功为止, 另外, 该方法也负责在队列为空时, 初始化队列,这也说明,队列是延时初始化的(lazily initialized):

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 若是是空队列, 首先进行初始化
        // 这里也能够看出, 队列不是在构造的时候初始化的, 而是延迟到须要用的时候再初始化, 以提高性能
        if (t == null) { 
            // 注意,初始化时使用new Node()方法新建了一个dummy节点
            if (compareAndSetHead(new Node()))
                tail = head; // 这里仅仅是将尾节点指向dummy节点,并无返回
        } else {
        // 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这里尤为要注意的是,当队列为空时,咱们初始化队列并无使用当前传进来的节点,而是:
新建了一个空节点!!!
新建了一个空节点!!!
新建了一个空节点!!!

在新建完空的头节点以后,咱们并无当即返回,而是将尾节点指向当前的头节点,而后进入下一轮循环。
在下一轮循环中,尾节点已经不为null了,此时再将咱们包装了当前线程的Node加到这个空节点后面。

这就意味着,在这个等待队列中,头结点是一个“哑节点”,它不表明任何等待的线程。
head节点不表明任何线程,它就是一个空节点!!!
head节点不表明任何线程,它就是一个空节点!!!
head节点不表明任何线程,它就是一个空节点!!!

尾分叉

在继续往下以前,咱们先分析enq方法中一个比较有趣的现象,我把它叫作尾分叉。咱们着重看将当前节点设置成尾节点的操做:

} else {
// 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
    node.prev = t;
    if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
    }
}

与将大象放到冰箱里须要三步同样,将一个节点node添加到sync queue的末尾也须要三步:

  1. 设置node的前驱节点为当前的尾节点:node.prev = t
  2. 修改tail属性,使它指向当前节点
  3. 修改原来的尾节点,使它的next指向当前节点

set tail

可是须要注意的,这里的三步并非一个原子操做,第一步很容易成功;而第二步因为是一个CAS操做,在并发条件下有可能失败,第三步只有在第二步成功的条件下才执行。这里的CAS保证了同一时刻只有一个节点能成为尾节点,其余节点将失败,失败后将回到for循环中继续重试。

因此,当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其余线程只能完成第一步,因而就出现了尾分叉:
尾分叉

注意,这里第三步是在第二步执行成功后才执行的,这就意味着,有可能即便咱们已经完成了第二步,将新的节点设置成了尾节点,此时原来旧的尾节点的next值可能仍是null(由于尚未来的及执行第三步),因此若是此时有线程恰巧从头节点开始向后遍历整个链表,则它是遍历不到新加进来的尾节点的,可是这显然是不合理的,由于如今的tail已经指向了新的尾节点。
另外一方面,当咱们完成了第二步以后,第一步必定是完成了的,因此若是咱们从尾节点开始向前遍历,已经能够遍历到全部的节点。这也就是为何咱们在AQS相关的源码中,有时候经常会出现从尾节点开始逆向遍历链表——由于一个节点要能入队,则它的prev属性必定是有值的,可是它的next属性可能暂时尚未值。

至于那些“分叉”的入队失败的其余节点,在下一轮的循环中,它们的prev属性会从新指向新的尾节点,继续尝试新的CAS操做,最终,全部节点都会经过自旋不断的尝试入队,直到成功为止。

addWaiter总结

至此,咱们就完成了addWaiter(Node.EXCLUSIVE)方法的完整的分析,该方法并不设计到任何关于锁的操做,它就是解决了并发条件下的节点入队问题。具体来讲就是该方法保证了将当前线程包装成Node节点加入到等待队列的队尾,若是队列为空,则会新建一个哑节点做为头节点,再将当前节点接在头节点的后面。

addWaiter(Node.EXCLUSIVE)方法最终返回了表明了当前线程的Node节点,在返回的那一刻,这个节点必然是当时的sync queue的尾节点。

不过值得注意的是,enq方法也是有返回值(虽然这里咱们并无使用它的返回值),可是它返回的是node节点的前驱节点,这个返回值虽然在addWaiter方法中并无使用,可是在其余地方会被用到。

咱们再回到获取锁的逻辑中:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

当addWaiter(Node.EXCLUSIVE)执行完毕后,节点如今已经被成功添加到sync queue中了,接下来将执行acquireQueued方法。

acquireQueued

该方法是最复杂的一个方法, 也是最难啃的骨头, 看代码以前首先简单的说明几点:

(1) 能执行到该方法, 说明addWaiter 方法已经成功将包装了当前Thread的节点添加到了等待队列的队尾
(2) 该方法中将再次尝试去获取锁
(3) 在再次尝试获取锁失败后, 判断是否须要把当前线程挂起

为何前面获取锁失败了, 这里还要再次尝试获取锁呢?
首先, 这里再次尝试获取锁是基于必定的条件的,即:

当前节点的前驱节点就是HEAD节点

由于咱们知道,head节点就是个哑节点,它不表明任何线程,或者表明了持有锁的线程,若是当前节点的前驱节点就是head节点,那就说明当前节点已是排在整个等待队列最前面的了。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 在当前节点的前驱就是HEAD节点时, 再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //在获取锁失败后, 判断是否须要把当前线程挂起
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,这里又来了个自旋操做,咱们一段段来看:

final Node p = node.predecessor();
// 在当前节点的前驱就是HEAD节点时, 再次尝试获取锁
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

首先咱们获取尾节点的前驱节点(由于上一步中返回的就是尾节点,而且这个节点就是表明了当前线程的Node)。
若是前驱节点就是head节点,那说明当前线程已经排在了队列的最前面,因此这里咱们再试着去获取锁。若是这一次获取成功了,即tryAcquire方法返回了true, 则咱们将进入if代码块,调用setHead方法:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

这个方法将head指向传进来的node,而且将node的thread和prev属性置为null, 以下图所示:
setHead

能够看出,这个方法的本质是丢弃原来的head,将head指向已经得到了锁的node。可是接着又将该node的thread属性置为null了,这某种意义上致使了这个新的head节点又成为了一个哑节点,它不表明任何线程。为何要这样作呢,由于在tryAcquire调用成功后,exclusiveOwnerThread属性就已经记录了当前获取锁的线程了,此处没有必要再记录。这某种程度上就是将当前线程从等待队列里面拿出来了,是一个变相的出队操做。

还有另一个特色是,这个setHead方法只是个普通方法,并无像以前enq方法中那样采用compareAndSetHead方法,这是为何呢? 同咱们以前分析setState方法同样:

由于这里不会产生竞争!

在enq方法中,当咱们设置头节点的时候,是新建一个哑节点并将它做为头节点,这个时候,可能多个线程都在执行这一步,所以咱们须要经过CAS操做保证只有一个线程能成功。
在acquireQueued方法里,因为咱们在调用到setHead的时,已经经过tryAcquire方法得到了锁,这意味着:

  1. 此时没有其余线程在建立新的头节点——由于很明显此时队列并非空的,不会执行到建立头节点的代码
  2. 此时能执行setHead的只有一个线程——由于要执行到setHead, 必然是tryAcquire已经返回了true, 而同一时刻,只有一个线程能获取到锁

综上,在整个if语句内的代码即便不加锁,也是线程安全的,不须要采用CAS操做。

接下来咱们再来看看另外一种状况,即p == head && tryAcquire(arg)返回了false,此时咱们须要判断是否须要将当前线程挂起:

shouldParkAfterFailedAcquire

从函数名也能够看出, 该方法用于决定在获取锁失败后, 是否将线程挂起.

决定的依据就是前驱节点的waitStatus值。

(有没发现一直到如今,前面的分析中咱们都没有用到waitStatus的值,终于在这里要用到了)

咱们先来回顾一下waitStatus有哪些状态值:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

一共有四种状态,可是咱们在开篇的时候就说过,在独占锁锁的获取操做中,咱们只用到了其中的两个——CANCELLEDSIGNAL
固然,前面咱们在建立节点的时候并无给waitStatus赋值,所以每个节点最开始的时候waitStatus的值都被初始化为0,即不属于上面任何一种状态。

那么CANCELLEDSIGNAL表明什么意思呢?

CANCELLED状态很好理解,它表示Node所表明的当前线程已经取消了排队,即放弃获取锁了。

SIGNAL这个状态就有点意思了,它不是表征当前节点的状态,而是当前节点的下一个节点的状态。
当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继节点)已经被挂起了(或者立刻就要被挂起了),所以在当前节点释放了锁或者放弃获取锁时,若是它的waitStatus属性为SIGNAL,它还要完成一个额外的操做——唤醒它的后继节点。

有意思的是,SIGNAL这个状态的设置经常不是节点本身给本身设的,而是后继节点设置的,这里给你们打个比方:

好比说出去吃饭,在人多的时候常常要排队取号,你取到了8号,前面还有7我的在等着进去,你就和排在你前面的7号讲“哥们,我如今排在你后面,队伍这么长,估计一时半会儿也轮不到我,我去那边打个盹,一会轮到你进去了(release)或者你不想等了(cancel), 麻烦你都叫醒我”,说完,你就把他的waitStatus值设成了SIGNAL

换个角度讲,当咱们决定要将一个线程挂起以前,首先要确保本身的前驱节点的waitStatus为SIGNAL,这就至关于给本身设一个闹钟再去睡,这个闹钟会在恰当的时候叫醒本身,不然,若是一直没有人来叫醒本身,本身可能就一直睡到天荒地老了。

理解了CANCELLEDSIGNAL这两个状态的含义后,咱们再来看看shouldParkAfterFailedAcquire是怎么用的:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 得到前驱节点的ws
    if (ws == Node.SIGNAL)
        // 前驱节点的状态已是SIGNAL了,说明闹钟已经设了,能够直接睡了
        return true;
    if (ws > 0) {
        // 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点已经取消了等待锁(因为超时或者中断等缘由)
        // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点
        // 而后咱们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面 (是否是很开心!!!)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 前驱节点的状态既不是SIGNAL,也不是CANCELLED
        // 用CAS设置前驱节点的ws为 Node.SIGNAL,给本身定一个闹钟
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

能够看出,shouldParkAfterFailedAcquire所作的事情无外乎:

  • 若是为前驱节点的waitStatus值为 Node.SIGNAL 则直接返回 true
  • 若是为前驱节点的waitStatus值为 Node.CANCELLED (ws > 0), 则跳过那些节点, 从新寻找正常等待中的前驱节点,而后排在它后面,返回false
  • 其余状况, 将前驱节点的状态改成 Node.SIGNAL, 返回false

注意了,这个函数只有在当前节点的前驱节点的waitStatus状态自己就是SIGNAL的时候才会返回true, 其余时候都会返回false, 咱们再回到这个方法的调用处:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 咱们在这里!在这里!!在这里!!!
            // 咱们在这里!在这里!!在这里!!!
            // 咱们在这里!在这里!!在这里!!!
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

能够看出,当shouldParkAfterFailedAcquire返回false后,会继续回到循环中再次尝试获取锁——这是由于此时咱们的前驱节点可能已经变了(搞很差前驱节点就变成head节点了呢)。

当shouldParkAfterFailedAcquire返回true,即当前节点的前驱节点的waitStatus状态已经设为SIGNAL后,咱们就能够安心的将当前线程挂起了,此时咱们将调用parkAndCheckInterrupt:

parkAndCheckInterrupt

到这个函数已是最后一步了, 就是将线程挂起, 等待被唤醒

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程被挂起,停在这里再也不往下执行了
    return Thread.interrupted();
}

注意!LockSupport.park(this)执行完成后线程就被挂起了,除非其余线程unpark了当前线程,或者当前线程被中断了,不然代码是不会再往下执行的,后面的Thread.interrupted()也不会被执行,那后面这个Thread.interrupted()是干什么用的呢? 咱们下一篇再讲。

总结

  1. AQS中用state属性表示锁,若是能成功将state属性经过CAS操做从0设置成1即获取了锁
  2. 获取了锁的线程才能将exclusiveOwnerThread设置成本身
  3. addWaiter负责将当前等待锁的线程包装成Node,并成功地添加到队列的末尾,这一点是由它调用的enq方法保证的,enq方法同时还负责在队列为空时初始化队列。
  4. acquireQueued方法用于在Node成功入队后,继续尝试获取锁(取决于Node的前驱节点是否是head),或者将线程挂起
  5. shouldParkAfterFailedAcquire方法用于保证当前线程的前驱节点的waitStatus属性值为SIGNAL,从而保证了本身挂起后,前驱节点会负责在合适的时候唤醒本身。
  6. parkAndCheckInterrupt方法用于挂起当前线程,并检查中断状态。
  7. 若是最终成功获取了锁,线程会从lock()方法返回,继续往下执行;不然,线程会阻塞等待。

(完)

下一篇: 逐行分析AQS源码(2)——独占锁的释放

查看更多系列文章:系列文章目录

相关文章
相关标签/搜索