深刻浅出AQS源码解析

最近一直在研究AQS的源码,但愿能够更深入的理解AQS的实现原理。虽然网上有不少关于AQS的源码分析,可是看完之后感受仍是只知其一;不知其二。因而,我将本身的整个理解过程记录下来了,但愿对你们有所帮助。java

基本原理

AQS是Java中锁的基础,主要由两个队列组成。一个队列是同步队列,另外一个是条件队列node

同步队列的原理

  • 同步队列的队列头部是head,队列尾部是tail节点,head节点是一个空节点,同步队列是一个双向链表,经过nextprev链接全部节点
  • 全部的线程在竞争锁的时候都会建立一个Node节点,线程与节点绑定在一块儿,(若是是同步锁和排他锁不一样之处是经过nextWaiter来区分的)而且添加到同步队列的尾部
  • head的第一个节点获取锁,其他节点都须要等待被唤醒
  • 同步队列中的节点会存在取消和null的状况(如:线程超时中断、线程更新节点的中间态),被取消和null的节点不能被唤醒,将会被视为无效节点
  • 一个线程只能被有效的前驱节点(取消和null的节点除外)唤醒
  • 持有锁的线程只能是有一个,其余有效节点对应的线程都会被挂起

条件队列的原理

  • 一个同步队列能够对应多个条件队列
  • 条件队列是一个单向链表,经过nextWaiter来链接起来,条件队列的头节点是firstWaiter,尾节点是lastWaiter
  • 某个条件队列中知足条件的节点(被signalsignalAll方法唤醒的节点)才会被转移到同步队列
  • 条件队列中的被转移到同步队列的节点是从头节点开始,条件队列中被阻塞的线程会添加到队列的尾部

同步队列的实现

首先,了解如下同步队列中队列的节点Node的数据结构安全

static final class Node {
        /** 共享锁的标识 */
        static final Node SHARED = new Node();
        /** 排他锁的标识 */
        static final Node EXCLUSIVE = null;

        /** 线程取消 */
        static final int CANCELLED =  1;
        /** 持有锁的线程的后继线程被挂起 */
        static final int SIGNAL    = -1;
        /** 条件队列标识 */
        static final int CONDITION = -2;
        /**
         * 共享锁状况下,通知全部其余节点
         */
        static final int PROPAGATE = -3;

        /**
         * waitStatus的取值以下:
         *   SIGNAL(-1): 当前节点的后继节点应该被挂起
         *   CANCELLED(1): 当前节点被取消
         *   CONDITION(-2): 当前节点在条件队列
         *   PROPAGATE(-3): 释放共享锁时须要通知全部节点
         *   0: 初始值
         *
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 节点对应的线程
         */
        volatile Thread thread;

        /**
         * 在共享锁的状况下,该节点的值为SHARED
         * 在排他锁的状况下,该节点的值为EXCLUSIVE
         * 在条件队列的状况下,连接的是下一个等待条件的线程
         */
        Node nextWaiter;
}

其次,咱们来看一下同步队列的链表结构
同步队列链表数据结构

接着,咱们根据同步队列的原理来分析如下acquirerelease须要作哪些事情:源码分析

实现acquire功能须要作的事情

  1. 建立一个Node节点node(该节点多是排他锁,也能够能是共享锁)
  2. node添加到同步队列尾部,若是同步队列为空(初始状况下),须要先建立一个空的头节点,而后再添加到队列的尾部
  3. 若是node的前驱节点是head,说明node是第一个节点,可以获取锁,须要将head修改为node,释放前驱节点的资源
  4. 若是node的前驱节点不是head,说明获取锁失败,须要检测是否须要将node绑定的线程挂起,分如下几种状况:
    • 若是nodewaitStatus已经被设置为SIGNAL 表示须要被挂起
    • 若是nodewaitStatus设置为CANCEL表示该节点已经被取消,须要被去掉,并修改 nodeprev,直到连接上一个有效的节点为止
    • 不然将nodewaitStatus设置为SIGNAL,表示即将要被挂起
  5. 若是须要将node绑定的线程挂起,则让出CPU,直到当前驱节点来唤起node才会开始继续从步骤3开始执行

与acquire功能相关的代码

  • acquire方法:获取排他锁
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire(arg):对外提供的一个扩展方法,经常使用的锁都要实现这个方法,具体实现与锁相关ui

  2. addWaiter(Node.EXCLUSIVE): 建立一个排他锁节点,并将该节点添加到同步队列尾部,代码以下:this

private Node addWaiter(Node mode) {
        // 建立一个node,EXCLUSIVE类型
        Node node = new Node(mode);

        for (;;) {
            // 获取尾节点
            Node oldTail = tail;
            if (oldTail != null) {
                // 设置即将成为尾节点的前驱
                node.setPrevRelaxed(oldTail);
                // CAS操做设置尾节点
                if (compareAndSetTail(oldTail, node)) {
                    // 将新尾节点的前驱节点与新的尾节点关联起来
                    oldTail.next = node;
                    // 返回添加的节点
                    // 这个节点如今不必定是尾节点,由于若是有多个线程调用这个方法时,
                    // 可能还有节点添加在这个节点后面
                    return node;
                }
            } else {
                // 若是队列为空,初始化头节点
                initializeSyncQueue();
            }
        }
    }
  1. acquireQueued同步队列中的节点获取排他锁
final boolean acquireQueued(final Node node, int arg) {
        try {
            // 线程是否中断
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 若是前驱节点是头节点,获取锁
                if (p == head && tryAcquire(arg)) {
                    // 修改头节点
                    setHead(node);
                    // 释放头节点的资源
                    p.next = null; // help GC
                    // 返回线程中断的状态
                    // 这也是该方法惟一的返回值
                    // 没有获取锁的线程会一直执行该方法直到获取锁之后再返回
                    return interrupted;
                }
                // 获取锁失败后是否须要将线程挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 线程挂起并返回是否被中断
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 取消该节点
            cancelAcquire(node);
            throw t;
        }
    }
  1. shouldParkAfterFailedAcquire:检测线程获取锁失败之后是否须要被挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 状态已经设置成SIGNAL,能够直接挂起该节点
             */
            return true;
        // 节点被取消
        if (ws > 0) {
            /*
             * 找到pred第一个有效的前驱节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // pred多是一个新的节点,须要将pred的next重写设置为node
            pred.next = node;
        } else {
            /*
             * CAS操做将pred节点的状态设置为SIGNAL
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        // 只有当pred节点的waitStatus已是SIGNAL状态时,才能够安全的挂起线程
        // 不然须要不能被挂起
        return false;
    }
  1. parkAndCheckInterrupt:将当前线程挂起,并检测当前线程是否中断
private final boolean parkAndCheckInterrupt() {
        // 线程挂起
        LockSupport.park(this);
        // 检测线程是否中断
        return Thread.interrupted();
    }
  1. cancelAcquire:取消节点
private void cancelAcquire(Node node) {
        // 若是节点为空,什么都不作
        if (node == null)
            return;
        // 释放线程
        node.thread = null;

        // 从后往前过滤掉全部的被取消的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 有效前驱节点的nex节点
        Node predNext = pred.next;

        // 将node设置为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 若是是尾节点,设置新的尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            // 将新的尾节点的后续设置为null
            pred.compareAndSetNext(predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            // 若是前驱节点的线程不为null而且waitStatus为SIGNAL
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 将node设置成pred的后继节点
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 唤起node节点的后继节点
                // 由于node节点已经释放锁了
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
  1. unparkSuccessor:唤醒后继节点
private void unparkSuccessor(Node node) {
        /*
         * 获取node节点的waitStatus
         */
        int ws = node.waitStatus;
       // 用CSA操做将waitStatus设置成初始状态
       // 无论设置是否成功,都无所谓,由于该节点即将被销毁
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        /*
         * 获取node的后继节点
         */
        Node s = node.next;
        // 若是后继节点为null或者被取消,
        // 经过从同步队列的尾节点开始一直往前找到一个有效的后继节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 若是后继节点不为空
        if (s != null)
            LockSupport.unpark(s.thread);// 唤醒后继节点的线程
    }

acquire方法相似的还有acquireInterruptiblytryAcquireNanosacquireSharedacquireSharedInterruptiblytryAcquireSharedNanos,咱们都一一分析如下线程

  • acquireInterruptibly方法:获取可中断的排他锁
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 若是线程中断,直接返回
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg); // 中断式的获取锁
    }
  1. doAcquireInterruptibly:可中断式的获取锁
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
       // 建立一个排他节点加入同步队列
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 若是前驱节点是头节点,说明已经获取的锁
                if (p == head && tryAcquire(arg)) {
                    // 修改头节点
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                // 若是没有获取锁,检测是否须要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 若是发现线程已经被中断,须要抛出异常
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireNanos方法:超时中断获取排他锁
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 线程中断直接返回
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout); // 超时获取排他锁
    }
  1. doAcquireNanos:超时获取排他锁
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 若是超时直接返回
        if (nanosTimeout <= 0L)
            return false;
        // 获取超时时长
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加一个排他节点到同步队列尾部
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                 // 获取前驱节点
                final Node p = node.predecessor();
                // 已经获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 若是超时了就取消
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                // 检测节点是否须要被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 若是须要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                    // 线程挂起nanosTimeout时间
                    LockSupport.parkNanos(this, nanosTimeout); 
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • acquireShared方法:获取共享锁
public final void acquireShared(int arg) {
        // 对外提供的一个扩展方法,经常使用的锁都要实现这个方法,
        // 该方法的实现与锁的用途有关
        if (tryAcquireShared(arg) < 0) 
            doAcquireShared(arg); // 获取共享锁
    }
  1. doAcquireShared:获取共享锁
private void doAcquireShared(int arg) {
        // 添加一个共享节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 返回结果大于等于0表示获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 设置头节点并广播通知其余获取共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        // 若是线程被中断,将该线程中断
                        // 共享锁会被多个线程获取,若是须要中断
                        // 全部获取共享锁的线程都要被中断
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                // 检测是否须要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 挂起并中断
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  1. setHeadAndPropagate:设置头节点并广播其余节点来获取锁
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 记录旧的头节点
        setHead(node);// 设置新的头节点
        /*
         * 若是头节点为null或者是否是取消状态,尝试唤醒后继节点
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // node节点的next是SHARED,即共享锁
            if (s == null || s.isShared())
                // 唤起获取共享锁的线程
                doReleaseShared();
        }
    }
  1. doReleaseShared:唤醒等待共享锁的节点
private void doReleaseShared() {
        /*
         * 唤醒时是从头节点开始先唤醒第一个共享节点,
         * 第一个共享节点被唤醒后会在doAcquireShared方法里继续执行(以前就是在这个方法里被挂起的)
         * 第一个共享节点若是获取锁会调用setHeadAndPropagate方法修改头节点,而后再调用doReleaseShared方法
         * 唤醒第二个共享节点,以此类推,最后把全部的共享节点都唤醒
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                // 获取头节点的状态
                int ws = h.waitStatus;
                // 若是头节点是SIGNAL,须要将状态设置为0,表示已经即将被唤醒
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // 若是失败了说明有其余线程在修改头节点,须要继续重试
                    unparkSuccessor(h); // 唤醒头节点的后继节点
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // 将头节点状态从0设置成PROPAGATE,若是失败了继续,由于也有其余获取共享锁的线程在更改头节点
            }
            // 若是头节点未改变(由于没有后继节点须要等待共享锁),跳出循环
            if (h == head)
                break;
        }
    }
  1. selfInterrupt:中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • acquireSharedInterruptibly方法:可中断的获取共享锁
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 若是线程被中断抛出异常
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg); // 可中断的方式获取共享锁
    }
  1. doAcquireSharedInterruptibly:可中断的方式后去共享锁
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加共享锁节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取共享锁之后修改头节点,通知其余等待共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 线程获取共享锁失败后须要挂起,而且发现线程被中断,因此抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireSharedNanos方法:超时中断获取共享锁
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted()) // 线程若是中断了,直接抛出异常
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout); // 超时获取共享锁
    }
  1. doAcquireSharedNanos:超时的方式获取中断锁
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 超时直接返回
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加共享节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取锁,修改头节点,通知全部其余等待共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) {
                    // 超时取消节点
                    cancelAcquire(node);
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 若是须要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                    // 线程挂起nanosTimeout时间
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException(); // 中断了抛出异常
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }

实现release功能须要作的事情

  1. 释放当前获取锁的线程持有的资源
  2. 唤醒有效的一个后继节点

与release功能相关的代码

  • release方法:释放排他锁
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 头节点不能是一个中间态
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • release方法:释放共享锁
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 释放共享锁,从头节点开始一个一个的释放
            // 若是存在多个共享节点在同步队列时,doReleaseShared方式实际上是递归调用
            doReleaseShared();
            return true;
        }
        return false;
    }

至此,将全部获取锁和释放锁的方法相关的源码所有分析完3d

条件队列的实现

咱们来看一下条件队列的链表结构
条件队列的链表结构code

实现await功能须要作的事情

  1. 建立一个CONDITION类型的节点,将该节点添加到条件队列
  2. 释放已经获取的锁(由于只有当前线程先获取了锁才可能再调用Condition.await()方法)
  3. 若是没法获取锁,线程挂起

与await功能相关的代码

  • await方法:等待条件
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException(); // 若是线程中断,直接抛出异常
            // 建立一个CONDITION类型的节点,将该节点添加到条件队列尾部
            Node node = addConditionWaiter();
            // 释放锁
            // 在调用await方法以前都会调用lock方法,这个时候已经获取锁了
            // 有时候锁仍是可重入的,因此须要将全部的资源都释放掉
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 若是节点再也不同步队列,所有都要挂起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 若是在等待期间发生过中断(不论是调用signal以前仍是以后),直接退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 让线程尝试去获取锁,若是没法获取锁就挂起
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清除全部在条件队列中是取消状态的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 发生中断,上报中断状况
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. addConditionWaiter:在条件队列中添加一个节点
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 清除条件队列中无效的节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 建立一个节点
            Node node = new Node(Node.CONDITION);
            // 添加到条件队列尾部
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
  1. unlinkCancelledWaiters:清除在条件队列中被取消的节点
private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            // 遍历条件队列将全部不是CONDITION状态的节点所有清除掉
            // 这些节点都是取消状态的节点
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
  1. fullyRelease:释放线程持有的全部锁资源
final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            // 释放全部的资源
            // 若是是可重入锁,savedState就是重入的次数
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            // 发生异常就取消该节点
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
  1. isOnSyncQueue:判断节点是否在同步队列
final boolean isOnSyncQueue(Node node) {
        // waitStatus是CONDITION或者node没有前驱节点,说明node不在同步队列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // 有后继节点必定在同步队列
            return true;
        /*
         * 在同步队列中查找node,看是否在同步队列中
         */
        return findNodeFromTail(node);
    }
  1. findNodeFromTail:在同步队列中查找节点
private boolean findNodeFromTail(Node node) {
        // 从尾节点开始查找
        for (Node p = tail;;) {
            if (p == node) // 找到了
                return true;
            if (p == null) // 找到头了还没找到
                return false;
            p = p.prev;
        }
    }
  1. checkInterruptWhileWaiting:检测中断的状况
private int checkInterruptWhileWaiting(Node node) {
            // 没有发生中断返回0
            // 调用signal以前发生中断返回THROW_IE
            // 调用signal以后发生中断返回REINTERRUPT
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
  1. transferAfterCancelledWait:清除在条件队列中被取消的节点
// 只有线程处于中断状态,才会调用此方法
// 若是须要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true,若是此线程在 signal 以前被取消,不然返回false
final boolean transferAfterCancelledWait(Node node) {
  
        // 用 CAS 将节点状态设置为 0 
        // 若是这步 CAS 成功,说明是 signal 方法以前发生的中断,
       // 由于若是 signal 先发生的话,signal 中会将 waitStatus 设置为 0
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node); // 将节点放入阻塞队列
            return true;
        }
        // 到这里是由于 CAS 失败,确定是由于 signal 方法已经将 waitStatus 设置为了 0
        // signal 方法会将节点转移到阻塞队列,可是可能还没完成,这边自旋等待其完成
        // 固然,这种事情仍是比较少的吧:signal 调用以后,没完成转移以前,发生了中断
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
  1. enq:把节点添加到同步队列
private Node enq(Node node) {
        // 无限循环,将节点添加到同步队列尾部
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                // 若是同步队列为空,初始化
                initializeSyncQueue();
            }
        }
    }
  1. reportInterruptAfterWait:中断处理
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            // 若是是THROW_IE状态,抛异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT) // 再次中断,由于中断状态被使用过一次
                selfInterrupt();
        }

awaitNanosawaitUntilawait(long time, TimeUnit unit)这几个方法的总体逻辑是同样的,就再也不分析了

实现signal功能须要作的事情

  1. 将条件队列中的节点加入同步队列
  2. 唤醒线程

与signal功能相关的代码

  • signal方法:唤醒等待条件的节点
public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 获取条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                // 唤醒等待条件的节点
                doSignal(first); 
        }
  1. doSignal:唤醒等待条件的节点
private void doSignal(Node first) {
            do {
                // 去掉无效的节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&  // 将节点转移到同步队列
                     (first = firstWaiter) != null);
        }
  1. transferForSignal:将节点转移到同步队列
final boolean transferForSignal(Node node) {
        /*
         * 取消的节点不须要转移
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        /*
         * 将节点加入同步队列尾部
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程
        // 若是 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
        // 节点入队后,须要把前驱节点的状态设为SIGNAL
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            // 若是前驱节点取消或者 CAS 失败,会进到这里唤醒线程
            LockSupport.unpark(node.thread);
        return true;
    }
  • signalAlll方法:唤醒全部等待条件的节点
public final void signalAll() {
            // 若是是当前线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                // 唤醒全部等待条件的节点
                doSignalAll(first);
        }
  1. doSignalAll:唤醒全部等待条件的节点
// 将全部的节点都转移到同步队列
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

如今将与AQS相关的核心代码都整理了一遍,里面若是有描述不清晰或者不许确的地方但愿你们能够帮忙指出!

相关文章
相关标签/搜索