并发编程之 AQS 源码剖析

前言

JDK 1.5 的 java.util.concurrent.locks 包中都是锁,其中有一个抽象类 AbstractQueuedSynchronizer (抽象队列同步器),也就是 AQS, 咱们今天就来看看该类。java

1.结构

类结构

咱们看看该类的结构,该类被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的内部类所继承,而这些内部类都是这些锁的真正实现,不管是公平锁仍是非公平锁。node

也就是说,这些锁的真正实现都是该类来实现的。那么,咱们就从这些锁开始看看是如何实现从锁到解锁的。less

2. 重入锁的 lock 方法

咱们先看看重入锁 ReentranLock 的 lock 方法。工具

public void lock() {
        sync.lock();
    }

复制代码

该方法调用了内部类的 sync 抽象类的 lock 方法,该方法的实现有公平锁和非公平锁。咱们看看公平锁是如何实现的:ui

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
复制代码

调用了 acquire 方法,该方法就是 AQS 的的方法,由于 sync 继承了 AQS,而公平锁继承了 Sync,等于间接继承了 AQS,咱们看看该方法。this

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

该方法JDK注释 :spa

以独占模式获取对象,若是被中断则停止。经过先检查中断状态,而后至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。不然在成功以前,或者线程被中断以前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可使用此方法来实现 Lock.lockInterruptibly() 方法。操作系统

楼主来简单说一下该方法的做用:该方法会试图获取锁,若是获取不到,就会被加入等待队列等待被唤醒,这个其实和咱们以前分析的 synchronized 是差很少的。线程

咱们仔细看看该方法,首先是 tryAcquire 方法,也就是尝试获取锁,该方法是须要被写的,父类默认的方法是抛出异常。如何重写呢?抽象类定义一个标准:若是返回 true,表示获取锁成功,反之失败。设计

tryAcquire

咱们回到 acquire 方法,若是获取锁成功,就直接返回了,若是失败了,则继续后面的操做,也就是将线程放入等待队列中:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

咱们先看看 addWaiter(Node.EXCLUSIVE) 方法:

/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

复制代码

该方法注释:将当前线程放入到队列节点。参数呢?参数有2种,Node.EXCLUSIVE 是独占锁,Node.SHARED 是分享锁。

在 Node 类种定义了这两个常量:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
复制代码

独占锁是null,共享锁是空对象。

咱们看看该方法的步骤:

  1. 建立一个当前线程的 Node 对象(nextWaiter 属性为 null, thread 属性为 当前线程)。
  2. 获取到末端节点,若是末端节点不为 null,则将末端节点设置为刚刚建立的节点的 prev 属性。 2.1. 经过 CAS 设置末端节点为新的节点。若是成功,将刚刚建立的节点设置为老末端节点的next节点。最后返回。
  3. 若是 tail 末端节点是null,则调用enq 方法。建立一个末端节点,而后,将刚刚建立的末端节点设置为新节点的 prev 属性(此时的末端节点就是 head 头节点)。最后返回刚刚建立的 node 节点。

咱们看看 enq 方法的实现:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码

该方法步骤以下:

  1. 死循环,获取到末端节点,若是是null,则使用CAS建立一个头节点(头节点此时也是null),并将头节点赋值末端节点。
  2. 因为刚刚CAS 成功,走else 逻辑,将末端节点赋值给新节点的 prev 属性,使用CAS设置新的末端节点为刚刚建立的 node对象。而后返回node 对象。

该方法主要就是初始化头节点和末端节点,并将新的节点追加到末端节点并更新末端节点。

咱们会到 addWaiter 方法中,该方法主要做用就是根据当前线程建立一个 node 对象,并追加到队列的末端。

咱们再回到 acquire 方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

复制代码

addWaiter 方法会返回刚刚建立的node 对象,而后调用 acquireQueued 方法,咱们进入该方法查看:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

该方法步骤以下:

  1. 死循环。先获取 node 对象 prev 节点,若是该节点和 head 相等,说明是他是第二个节点,那么此时就能够尝试获取锁了。 1.1 若是获取锁成功,就设置当前节点为 head 节点(同时设置当前node的线程为null,prev为null),并设置他的 prev 节点的 next 节点为 null(帮助GC回收)。最后,返回等待过程当中是否中断的布尔值。
  2. 若是上面的两个条件不成立,则调用 shouldParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法。这两个方法的目的就是将当前线程挂起。而后等待被唤醒或者被中断。稍后,咱们仔细查看这两个方法。
  3. 若是挂起后被当前线程唤醒,则再度循环,判断是该节点的 prev 节点是不是 head,通常来说,当你被唤醒,说明你别准许去拿锁了,也就是 head 节点完成了任务释放了锁。而后重复步骤 1。最后返回。

咱们看看 shouldParkAfterFailedAcquire 方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)

            return true;
        if (ws > 0) {

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
     
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码

该方法步骤以下:

  1. 获取去上一个节点的等待状态,若是状态是 SIGNAL -1,就直接返回 true,表示能够挂起并休息。
  2. 若是 waitStatus 大于 0, 则循环检查 prev 节点的 prev 的waitStatus,知道遇到一个状态不大于0。该字段有4个状态,分别是 CANCELLED = 1,SIGNAL = -1, CONDITION = -2, PROPAGATE = -3,也就是说,若是大于 0,就是取消状态。那么,往上找到那个不大于0的节点后怎么办?将当前节点指向 那个节点的 next 节点,也就是说,那些大于0 状态的节点都失效这里,随时会被GC回收。
  3. 若是不大于0 也不是 -1,则将上一个节点的状态设置为有效, 也就是 -1.最后返回 false。注意,在acquireQueued 方法中,返回 false 后会继续循环,此时 pred 节点已是 -1 了,所以最终会返回 true。

再看 parkAndCheckInterrupt 方法(挂起并检查是否中断):

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码

该方法很是的简单,就是将当前线程挂起,等到有别的线程唤醒(一般是 head 节点中线程),而后返回当前线程是不是被中断了,注意,该方法会清除中断状态。

回到 acquireQueued 方法,总结一下该方法,该方法就是将刚刚建立的线程节点挂起,而后等待唤醒,若是被唤醒了,则将本身设置为 head 节点。最后,返回是否被中断。

再回到 acquire 方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

复制代码

在该方法中,若是获取锁失败并被唤醒,且被中断了,那么就执行 selfInterrupt 方法:

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
复制代码

将当前线程设置中断状态位。

好了,到这里,整个lock 方法,咱们基本就分析完了,能够说,整个方法就是将线程放入到等待队列并挂起而后等待 head 节点唤醒。其中,tryAcquire 方法高频出现,该方法具体实现由子类实现,好比 重入锁,读写锁,线程池的 worker,其中 CountDown 和 Semaphore 实现的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定义的?就是返回 true 表示拿到锁了,返回 false 表示拿锁失败,具体如何实现AQS管不了。但他们都依赖一个极其重要的字段 ------- state。

楼主有必要说说这个字段,该字段定义了当前同步器的状态,若是你们知道 pv 原语的话,应该很好理解这个字段,该字段在 AQS 中是如何定义的:

/** * The synchronization state. */
    private volatile int state;
复制代码

volatile。该字段可能会被多个线程修改,所以,须要设置为 volatile ,保证变量的可见性。

咱们能够看看 重入锁中的公平锁是如何使用该字段的。

/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
复制代码

该方法重写了 tryAcquire 方法,步骤以下:

  1. 获取当前线程,获取锁(同步器)的状态。
  2. 若是同步器等于0,就 CAS 设置 state 为 1,表示同步器被占用了,而且设置同步器的持有线程为当前线程(为了判断重入)。最后返回拿锁成功 true。
  3. 若是不是0,而且当前线程就是同步器的持有线程,说明是重入。那么就将 state 加1,最后返回 true。因此说,当你重入一次,就须要解锁一次,不然下个线程永远拿不到锁。
  4. 若是都不是,返回 false ,表示拿锁失败。

从这里,咱们能够看到, statei 字段很是的重要,判断锁是否被持有彻底根据这个字段来的。这点必定要注意,而这个设计和操做系统的 pv 由殊途同归之妙。

那么看完了拿锁,再看看解锁,咱们能够先猜测一下如何设计,首先确定是要将 state 字段设置为 0,才能让下个线程拿锁,而后呢?唤醒等待队列中的下个线程。让他尝试拿锁。那到底 doug lea 是否是这么设计的呢?咱们来看看。

3. 重入锁的 unlock 方法

该方法调用了AQS 的 release 方法:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

复制代码

首先尝试释放,若是成功,则唤醒下一个线程。

咱们先看看 tryRelease 方法 (须要重写):

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

复制代码

该方法步骤以下:

  1. 计算同步器状态减去1后的值。
  2. 判断同步器线程和当前线程是否相同,若是不一样,抛出监视器状态异常。
  3. 判断状态是不是 0,也就是说,若是是0,表示没有线程持有锁了,那么就是设置 free 为 true,而且设置同步器的 thread 属性为null,
  4. 最后设置 state 为 计算的值,这里须要考虑重入。最后返回。

能够看到,若是 state 不是 0 的话,就会返回 false ,后面的步骤就没有了,也就是说,重入锁解锁的时候不会唤醒下一个线程。

若是解锁成功,执行下面的步骤,若是 head 头节点不是 null 而且他的状态不是0,说明有线程能够唤醒,执行 unparkSuccessor 方法。

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

复制代码

该方法步骤以下:

  1. 获取到头节点的状态。
  2. 若是小于0,CAS 设置状态为0。
  3. 获取到头节点的next 节点,判断是否为null,或者 next 节点是否大于0,若是是null 或者大于0,则从末端节点开始向上查找,直到找到状态小于等于0 的节点。
  4. 最后唤醒该节点的线程。

这个时候,等待在 acquireQueued 方法中,准确的说是 parkAndCheckInterrupt 方法中的 线程被唤醒,开始继续循环,尝试拿锁(须要修改 state 变量),并设置本身为 head。

这里还有一个漏掉的地方,就是 waitStatus 变量,何时会大于等于0? 该变量默认是 0,大于 0 的状态是被取消的状态。何时会被取消呢? 在acquireQueued 方法中,若是方法没有正常结束,则会执行 finally 中的 cancelAcquire 方法,该方法会将状态变成 1,也就是取消状态。

4 总结

此次咱们分析 AQS,也就是锁的的真正实现,只分析了 lock 方法和 unlock 方法,这两个方法是重入锁的基础。CountDown 和 Semaphore 是共享锁,可是基本原理相同,只是将 state 的数字加大即可以实现。而和重入锁等锁相关联的 Condition 则是经过 LockSupport 工具类直接挂起当前线程,并将当前线程添加到等待队列中,当调用 Condition 的 signal 方法时,则唤醒队列中的第一个线程。具体源码咱们有机会再分析。

总之,java 重入锁的实现基于 AQS,而 AQS 主要基于 state 变量和队列来实现。实现原理和 pv原语 相似。

good luck!!!!!

相关文章
相关标签/搜索