ReentrantLock 源码分析以及 AQS (一)

前言

JDK1.5 以后发布了JUC(java.util.concurrent),用于解决多线程并发问题。AQS 是一个特别重要的同步框架,不少同步类都借助于 AQS 实现了对线程同步状态的管理。java

AQS 中最主要的就是独占锁和共享锁的获取和释放,以及提供了一些可中断的获取锁,超时等待锁等方法。node

ReentranLock 是基于 AQS 独占锁的一个实现。ReentrantReadWriteLock 是基于 AQS 共享锁的一个读写锁实现。原本打算一篇文章里面写完独占锁和共享锁,可是发现篇幅太长了,也不易于消化。安全

所以,本篇就先结合 ReentrantLock 源码,分析 AQS 的独占锁获取和释放。以及 ReentrantLock 的公平锁和非公平锁实现。数据结构

下一篇再写 ReentrantReadWriteLock 读写锁源码,以及 AQS 共享锁的获取和释放。多线程

在正式讲解源码以前,墙裂建议读者作一些准备工做,最好对如下知识有必定的了解,这样阅读起来源码会比较轻松(由于,我当初刚开始接触多线程时,直接看 AQS 简直是一脸懵逼,就像读天书同样。。)。并发

  1. 了解双向链表的数据结构,以及队列的入队出队等操做。
  2. LockSupport 的 park,unpark 方法,以及对线程的 interrupt 几个方法了解(可参考:LockSupport的 park 方法是怎么响应中断的?)。
  3. 对 CAS 和自旋机制有必定的了解。

AQS 同步队列

AQS 内部维护了一个 FIFO(先进先出)的双向队列。它的内部是用双向链表来实现的,每一个数据节点(Node)中都包含了当前节点的线程信息,还有它的先后两个指针,分别指向前驱节点和后继节点。下边看一下 Node 的属性和方法:框架

static final class Node {
    //能够认为是一种标记,代表了这个 node 是以共享模式在同步队列中等待
    static final Node SHARED = new Node();
    //也是一种标记,代表这个 node 是以独占模式在同步队列中等待
    static final Node EXCLUSIVE = null;

    /** waitStatus 常量值 */
    //说明当前节点被取消,缘由有多是超时,或者被中断。
    //节点被取消的状态是不可逆的,也就是说此节点会一直停留在取消状态,不会转变。
    static final int CANCELLED =  1;
    //说明后继节点的线程被 park 阻塞,所以当前线程须要在释放锁或者被取消时,唤醒后继节点
    static final int SIGNAL    = -1;
    //说明线程在 condition 条件队列等待
    static final int CONDITION = -2;
    //在共享模式中用,代表下一个共享线程应该无条件传播
    static final int PROPAGATE = -3;

    
    //当前线程的等待状态,除了以上四种值,还有一个值 0 为初始化状态(条件队列的节点除外)。
    //注意这个值修改时是经过 CAS ,以保证线程安全。
    volatile int waitStatus;

    //前驱节点
    volatile Node prev;

    //后继节点
    volatile Node next;

    //当前节点中的线程,经过构造函数初始化,出队时会置空(这个后续说,重点强调)
    volatile Thread thread;

    //有两种状况。1.在 condition 条件队列中的后一个节点 
    //2. 一个特殊值 SHARED 用于代表当前是共享模式(由于条件队列只存在于独占模式)
    Node nextWaiter;

    //是不是共享模式,理由同上
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //返回前驱节点,若是为空抛出空指针
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

另外,在 AQS 类中,还会记录同步队列的头结点和尾结点:函数

//同步队列的头结点,是懒加载的,即不会当即建立一个同步队列,
    //只有当某个线程获取不到锁,须要排队的时候,才会初始化头结点
    private transient volatile Node head;

    //同步队列的尾结点,一样是懒加载。
    private transient volatile Node tail;

独占锁

这部分就结合 ReentrantLock 源码分析 AQS 的独占锁是怎样得到和释放锁的。源码分析

非公平锁

首先,咱们从 ReentrantLock 开始分析,它有两个构造方法,一个构造,能够传入一个 boolean 类型的参数,代表是用公平锁仍是非公平锁模式。另外一个构造方法,不传入任何参数,则默认用非公平锁。学习

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSync 和 FairSync 都继承自 Sync ,它们都是 ReentranLock 的内部类。 而Sync 类又继承自 AQS (AbstractQueuedSynchronizer)。

static final class NonfairSync extends Sync {
}

static final class FairSync extends Sync {
}

abstract static class Sync extends AbstractQueuedSynchronizer {
}

知道了它们之间的继承关系,咱们就从非公平锁的加锁方法做为入口,跟踪源码。由于非公平锁的流程讲明白以后,公平锁大体流程都同样,只是多了一个条件判断(这个,一下子后边细讲,会作对比)。

NonfairSync.lock

咱们看下公平锁的获取锁的方法:

final void lock() {
    //经过 CAS 操做把 state 设置为 1
    if (compareAndSetState(0, 1))
        //若是设值成功,说明加锁成功,保存当前得到锁的线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //若是加锁失败,则执行 AQS 的acquire 方法
        acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire

这个方法的逻辑是:

  1. 经过 tryAcquire 方法,尝试获取锁,若是成功,则返回 true,失败返回 false 。
  2. tryAcquire 失败以后,会先调用 addWaiter 方法,把当前线程封装成 node 节点,加入同步队列(独占模式)。
  3. acquireQueued 方法会把刚加入队列的 node 做为参数,经过自旋去得到锁。

tryAcquire

这是一个模板方法,具体的实现须要看它的子类,这里对应的就是 ReentrantLock.NonfairSync.tryAcquire 方法。咱们看一下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    //当前线程
    final Thread current = Thread.currentThread();
    //获取当前的同步状态,若为 0 ,表示无锁状态。若大于 0,表示已经有线程抢到了锁。
    int c = getState();
    if (c == 0) {
        //而后经过 CAS 操做把 state 的值改成 1。
        if (compareAndSetState(0, acquires)) {
            // CAS 成功以后,保存当前得到锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是 state 大于0,则判断当前线程是不是得到锁的线程,是的话,可重入。
    else if (current == getExclusiveOwnerThread()) {
        //因为 ReentrantLock 是可重入的,因此每重入一次 state 就加 1 。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter

若是获取锁失败以后,就会调用 addWaiter 方法,把当前线程加入同步队列。

private Node addWaiter(Node mode) {
    //把当前线程封装成 Node ,而且是独占模式
    Node node = new Node(Thread.currentThread(), mode);
    //尝试快速入队,若是失败,则会调用 enq 入队方法。enq 会初始化队列。
    Node pred = tail;
    //若是 tail 不为空,说明当前队列中已经有节点
    if (pred != null) { 
        //把当前 node 的 prev 指针指向 tail
        node.prev = pred;
        //经过 CAS 把 node 设置为 tail,即添加到队尾
        if (compareAndSetTail(pred, node)) {
            //把旧的 tail 节点的 next 指针指向当前 node
            pred.next = node;
            return node;
        }
    }
    //当 tail 为空时,把 node 添加到队列,若是须要的话,先进行队列初始化
    enq(node);
    //入队成功以后,返回当前 node
    return node;
}

enq

经过自旋,把当前节点加入到队列中

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //若是 tail为空,说明队列未初始化
        if (t == null) { 
            //建立一个空节点,经过 CAS把它设置为头结点
            if (compareAndSetHead(new Node()))
                //此时只有一个 head头节点,所以把 tail也指向它
                tail = head;
        } else {
            //第二次自旋时,tail不为空,因而把当前节点的 prev指向 tail节点
            node.prev = t;
            //经过 CAS把 tail节点设置为当前 node节点
            if (compareAndSetTail(t, node)) {
                //把旧的 tail节点的 next指向当前 node
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

入队成功以后,就会调用 acquireQueued 方法自旋抢锁。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的前驱节点
            final Node p = node.predecessor();
            //若是前驱节点就是 head 节点,就调用 tryAcquire 方法抢锁
            if (p == head && tryAcquire(arg)) {
                //若是抢锁成功,就把当前 node 设置为头结点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //抢锁成功后,会把线程中断标志返回出去,终止for循环
                return interrupted;
            }
            //若是抢锁失败,就根据前驱节点的 waitStatus 状态判断是否须要把当前线程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                //线程被挂起时,判断是否被中断过
                parkAndCheckInterrupt())
                //注意此处,若是被线程被中断过,须要把中断标志从新设置一下
                interrupted = true;
        }
    } finally {
        if (failed)
            //若是抛出异常,则取消锁的获取,进行出队操做
            cancelAcquire(node);
    }
}

setHead

经过代码,咱们能够看到,当前的同步队列中,只有第二个节点才有资格抢锁。若是抢锁成功,则会把它设置为头结点。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

须要注意的是,这个方法,会把头结点的线程设置为 null 。想一下,为何?

由于,此时头结点的线程已经抢锁成功,须要出队了。天然的,队列中也就不该该存在这个线程了。

PS:由 enq 方法,还有 setHead 方法,咱们能够发现,头结点的线程老是为 null。这是由于,头结点要么是刚初始化的空节点,要么是抢到锁的线程出队了。所以,咱们也经常把头结点叫作虚拟节点(不存储任何线程)。

shouldParkAfterFailedAcquire

以上是抢锁成功的状况,那么抢锁失败了呢?这时,咱们须要判断是否应该把当前线程挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取当前节点的前驱节点的 waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //若是 ws = -1 ,说明当前线程能够被前驱节点正常唤醒,因而就能够安全的 park了
        return true;
    if (ws > 0) {
        //若是 ws > 0,说明前驱节点被取消,则会从当前节点依次向前查找,
        //直到找到第一个没有被取消的节点,把那个节点的 next 指向当前 node
        //这一步,是为了找到一个能够把当前线程唤起的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //若是 ws 为 0,或者 -3(共享锁状态),则把它设置为 -1 
        //返回 false,下次自旋时,就会判断等于 -1,返回 true了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

若是 shouldParkAfterFailedAcquire 返回 true,说明当前线程须要被挂起。所以,就执行此方法,同时检查线程是否被中断。

private final boolean parkAndCheckInterrupt() {
    //把当前线程挂起,则 acquireQueued 方法的自旋就会暂停,等待前驱节点 unpark
    LockSupport.park(this);
    //返回当前节点是否被中断的标志,注意此方法会把线程的中断标志清除。
    //所以,返回上一层方法时,须要设置 interrupted = true 把中断标志从新设置,以便上层代码能够处理中断
    return Thread.interrupted();
}

想一下,为何抢锁失败后,须要判断是否把线程挂起?

由于,若是抢不到锁,而且还不把线程挂起,acquireQueued 方法就会一直自旋下去,这样你的CPU能受得了吗。

cancelAcquire

当不停的自旋抢锁时,若发生了异常,就会调用此方法,取消正在尝试获取锁的线程。node 的位置分为三种状况,见下面注释,

private void cancelAcquire(Node node) {

    if (node == null)
        return;

    // node 再也不指向任何线程
    node.thread = null;

    Node pred = node.prev;
    //从当前节点不断的向前查找,直到找到一个有效的前驱节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    //把 node 的 ws 设置为 -1 
    node.waitStatus = Node.CANCELLED;

    // 1.若是 node 是 tail,则把 tail 更新为 node,并把 pred.next 指向 null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //2.若是 node 既不是 tail,也不是 head 的后继节点,就把 node的前驱节点的 ws 设置为 -1
        //最后把 node 的前驱节点的 next 指向 node 的后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //3.若是 node是 head 的后继节点,则直接唤醒 node 的后继节点。
            //这个也很好理解,由于 node 是队列中惟一有资格尝试获取锁的节点,
            //它放弃了资格,固然有义务把后继节点唤醒,以让后继节点尝试抢锁。
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

unparkSuccessor

这个唤醒方法就比较简单了,

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从尾结点向前依次遍历,直到找到距离当前 node 最近的一个有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //把这个有效节点的线程唤醒,
        //唤醒以后,当前线程就能够继续自旋抢锁了,(回到 park 的地方)
        LockSupport.unpark(s.thread);
}

下面画一个流程图更直观的查看整个获取锁的过程。

公平锁

公平锁和非公平锁的总体流程大体相同,只是在抢锁以前先判断一下是否已经有人排在前面,若是有的话,就不执行抢锁。咱们经过源码追踪到 FairSync.tryAcquire 方法。会发现,多了一个 hasQueuedPredecessors 方法。

hasQueuedPredecessors

这个方法判断逻辑稍微有点复杂,有多种状况。

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. 若是 h == t,说明 h 和 t 都为空(此时队列还未初始化)或者它们是同一个节点(说明队列已经初始化,而且只有一个节点,此时为 enq 方法第一次自旋成功后)。此时,返回false。
  2. 若是 h != t,则判断 head.next == null 是否成立,若是成立,则返回 true。这种状况发生在有其余线程第一次入队时。在 AQS 的 enq 入队方法,设置头结点成功以后 compareAndSetHead(new Node()) ,还未执行 tail = head 时(仔细想想为何?)。此时 tail = null , head = new Node(),head.next = null。
  3. 若是 h != t,而且 head.next != null,说明此时队列中至少已经有两个节点,则判断 head.next 是不是当前线程。若是是,返回 false(注意是 false哦,由于用了 !),不然返回 true 。

总结:以上几种状况,只有最终返回 false 时,才会继续往下执行。由于 false,说明没有线程排在当前线程前面,因而经过 CAS 尝试把 state 值设置为 1。若成功,则方法返回。若失败,一样须要去排队。

公平锁和非公平锁区别

举个例子来对比公平锁和非公平锁。好比,如今到饭点了,你们都到食堂打饭。把队列中的节点比做排队打饭的人,每一个打饭窗口都有一个管理员,只有排队的人从管理员手中抢到锁,才有资格打饭。打饭的过程就是线程执行的过程。

若是,你发现前面没有人在排队,那么就能够直接从管理员手中拿到锁,而后打饭。对于公平锁来讲,若是你前面有人在打饭,那么你就要排队到他后面(图中B),等他打完以后,把锁还给管理员。那么,你就能够从管理员手中拿到锁,而后打饭了。后面的人依次排队。这就是FIFO先进先出的队列模型。

对于非公平锁来讲,若是你是图中的 B,当 A 把锁还给管理员后,有可能有另一个 D 插队过来直接把锁抢走。那么,他就能够打饭,你只能继续等待了。

因此,能够看出来。公平锁是严格按照排队的顺序来的,先来后到嘛,你来的早,就能够早点获取锁。优势是,这样不会形成某个线程等待时间过长,由于你们都是中规中矩的在排队。而缺点呢,就是会频繁的唤起线程,增长 CPU的开销。

非公平锁的优势是吞吐量大,由于有可能正好锁可用,而后线程来了,直接抢到锁了,不用排队了,这样也减小了 CPU 唤醒排队线程的开销。 可是,缺点也很明显,你说我排队排了好长时间了,终于轮到我打饭了,凭什么其余人刚过来就插到我前面,比我还先打到饭,也太不公平了吧,后边一大堆排队的人更是怨声载道。这要是每一个人来了都插到我前面去,我岂不是要饿死了。

独占锁的释放

咱们从 ReentrantLock 的 unlock 方法看起:

public void unlock() {
    //调用 AQS 的 release 方法
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //若是头结点不为空,而且 ws 不为 0,则唤起后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这段逻辑比较简单,当线程释放锁以后,就会唤醒后继节点。 unparkSuccessor 已讲,再也不赘述。而后看下 tryRelease 方法,公平锁和非公平锁走的是同一个方法。

protected final boolean tryRelease(int releases) {
    //每释放一次锁,state 值就会减 1,由于以前可能有锁的重入
    int c = getState() - releases;
    //若是当前线程不是抢到锁的线程,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //只有 state 的值减到 0 的时候,才会所有释放锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

由于,ReentrantLock 支持锁的重入,因此每次重入 state 值都会加 1,相应的每次释放锁, state 的值也会减 1 。因此,这也是为何每一个 lock 方法最后都要有一个 unlock 方法释放锁,它们的个数须要保证相同。

当 state 值为 0 的时候,说明锁彻底释放。其余线程才能够有机会抢到锁。

结语

以上已经讲解了独占锁主要的获取方法 acquire ,另外还有一些其余相关方法,再也不赘述,由于主要逻辑都是同样的,只有部分稍有不一样,只要理解了 acquire ,这些都是相通的。如 acquireInterruptibly 方法,它能够在获取锁的时候响应中断。还有超时获取锁的方法 doAcquireNanos 能够设定获取锁的超时时间,超时以后就返回失败。

下篇预告:分析 ReentrantReadWriteLock 读写锁源码,以及 AQS 共享锁的获取和释放,敬请期待。

若是本文对你有用,欢迎点赞,评论,转发。

学习是枯燥的,也是有趣的。我是「烟雨星空」,欢迎关注,可第一时间接收文章推送。

相关文章
相关标签/搜索