逐行分析AQS源码(4)——Condition接口实现

前言

本篇文章是基于线程间的同步与通讯(4)——Lock 和 Condtion 这篇文章写的,在那篇文章中,咱们分析了Condition接口所定义的方法,本篇咱们就来看看AQS对于Condition接口的这些接口方法的具体实现。java

下文中笔者将假设读者已经阅读过那篇文章,或者已经了解了相关的背景知识。node

系列文章目录segmentfault

概述

咱们在前面介绍Conditon的时候说过,Condition接口的await/signal机制是设计用来代替监视器锁的wait/notify机制 的,所以,与监视器锁的wait/notify机制对照着学习有助于咱们更好的理解Conditon接口:数组

Object 方法 Condition 方法 区别
void wait() void await()
void wait(long timeout) long awaitNanos(long nanosTimeout) 时间单位,返回值
void wait(long timeout, int nanos) boolean await(long time, TimeUnit unit) 时间单位,参数类型,返回值
void notify() void signal()
void notifyAll() void signalAll()
- void awaitUninterruptibly() Condition独有
- boolean awaitUntil(Date deadline) Condition独有

这里先作一下说明,本文说wait方法时,是泛指wait()wait(long timeout)wait(long timeout, int nanos) 三个方法,当须要指明某个特定的方法时,会带上相应的参数。一样的,说notify方法时,也是泛指notify(),notifyAll()方法,await方法和signal方法以此类推。数据结构

首先,咱们经过wait/notify机制来类比await/signal机制:并发

  1. 调用wait方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之相似,调用await方法的线程首先必须得到lock锁
  2. 调用wait方法的线程会释放已经得到的监视器锁,进入当前监视器锁的等待队列(wait set)中;与之相似,调用await方法的线程会释放已经得到的lock锁,进入到当前Condtion对应的条件队列中。
  3. 调用监视器锁的notify方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与锁竞争,并在得到锁后,从wait方法处恢复执行;与之相似,调用Condtion的signal方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在得到锁后,从await方法处开始恢复执行。

实战

因为前面咱们已经学习过了监视器锁的wait/notify机制,await/signal的用法基本相似。在正式分析源码以前,咱们先来看一个使用condition的实例:less

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产者方法,往数组里面写数据
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); //数组已满,没有空间时,挂起等待,直到数组“非满”(notFull)
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            // 由于放入了一个数据,数组确定不是空的了
            // 此时唤醒等待这notEmpty条件上的线程
            notEmpty.signal(); 
        } finally {
            lock.unlock();
        }
    }

    // 消费者方法,从数组里面拿数据
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 数组是空的,没有数据可拿时,挂起等待,直到数组非空(notEmpty)
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            // 由于拿出了一个数据,数组确定不是满的了
            // 此时唤醒等待这notFull条件上的线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

这是java官方文档提供的例子,是一个典型的生产者-消费者模型。这里在同一个lock锁上,建立了两个条件队列notFull, notEmpty。当数组已满,没有存储空间时,put方法在notFull条件上等待,直到数组“not full”;当数组空了,没有数据可读时,take方法在notEmpty条件上等待,直到数组“not empty”,而notEmpty.signal()notFull.signal()则用来唤醒等待在这个条件上的线程。函数

注意,上面所说的,在notFull notEmpty条件上等待事实上是指线程在条件队列(condition queue)上等待,当该线程被相应的signal方法唤醒后,将进入到咱们前面三篇介绍的sync queue中去争锁,争到锁后才能能await方法处返回。这里接牵涉到两种队列了——condition queuesync queue,它们都定义在AQS中。源码分析

为了防止你们被AQS中的队列弄晕,这里咱们先理理清:学习

同步队列 vs 条件队列

sync queue

首先,在逐行分析AQS源码(1)——独占锁的获取这篇中咱们说过,全部等待锁的线程都会被包装成Node扔到一个同步队列中。该同步队列以下:
dummy head

sync queue是一个双向链表,咱们使用prevnext属性来串联节点。可是在这个同步队列中,咱们一直没有用到nextWaiter属性,即便是在共享锁模式下,这一属性也只做为一个标记,指向了一个空节点,所以,在sync queue中,咱们不会用它来串联节点。

condtion queue

每建立一个Condtion对象就会对应一个Condtion队列,每个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:
Conditon queue
可见,每个Condition对象对应一个Conditon队列,每一个Condtion队列都是独立的,互相不影响的。在上图中,若是咱们对当前线程调用了notFull.await(), 则当前线程就会被包装成Node加到notFull队列的末尾。

值得注意的是,condition queue是一个单向链表,在该链表中咱们使用nextWaiter属性来串联链表。可是,就像在sync queue中不会使用nextWaiter属性来串联链表同样,在condition queue中,也并不会用到prev, next属性,它们的值都为null。也就是说,在条件队列中,Node节点真正用到的属性只有三个:

  • thread:表明当前正在等待某个条件的线程
  • waitStatus:条件的等待状态
  • nextWaiter:指向条件队列中的下一个节点

既然这里又提到了waitStatus,咱们这里再回顾一下它的取值范围:

volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

在条件队列中,咱们只须要关注一个值便可——CONDITION。它表示线程处于正常的等待状态,而只要waitStatus不是CONDITION,咱们就认为线程再也不等待了,此时就要从条件队列中出队。

sync queue 和 conditon queue的联系

通常状况下,等待锁的sync queue和条件队列condition queue是相互独立的,彼此之间并无任何关系。可是,当咱们调用某个条件队列的signal方法时,会将某个或全部等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程同样须要去争锁,若是没有抢到,则一样要被加到等待锁的sync queue中去,此时节点就从condition queue中被转移到sync queue中:
condtion queue to sync queue

可是,这里尤为要注意的是,node是被一个一个转移过去的,哪怕咱们调用的是signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在sync queue的末尾。

同时要注意的是,咱们在sync queue中只使用prevnext来串联链表,而不使用nextWaiter;咱们在condition queue中只使用nextWaiter来串联链表,而不使用prevnext.事实上,它们就是两个使用了一样的Node数据结构的彻底独立的两种链表。所以,将节点从condition queue中转移到sync queue中时,咱们须要断开原来的连接(nextWaiter),创建新的连接(prev, next),这某种程度上也是须要将节点一个一个地转移过去的缘由之一。

入队时和出队时的锁状态

sync queue是等待锁的队列,当一个线程被包装成Node加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操做是将获取到锁的节点设为新的dummy head,并将thread属性置为null)。

condition队列是等待在特定条件下的队列,由于调用await方法时,必然是已经得到了lock锁,因此在进入condtion队列线程必然是已经获取了锁;在被包装成Node扔进条件队列中后,线程将释放锁,而后挂起;当处于该队列中的线程被signal方法唤醒后,因为队列中的节点在以前挂起的时候已经释放了锁,因此必须先去再次的竞争锁,所以,该节点会被添加到sync queue中。所以,条件队列在出队时,线程并不持有锁。

因此事实上,这两个队列的锁状态正好相反:

  • condition queue:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到sync queue
  • sync queue:入队时没有锁 -> 在队列中争锁 -> 离开队列时得到了锁

经过上面的介绍,咱们对条件队列已经有了感性的认识,接下来就让咱们进入到本篇的重头戏——源码分析:

CondtionObject

AQS对Condition这个接口的实现主要是经过ConditionObject,上面已经说个,它的核心实现就是是一个条件队列,每个在某个condition上等待的线程都会被封装成Node对象扔进这个条件队列。

核心属性

它的核心属性只有两个:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

这两个属性分别表明了条件队列的队头和队尾,每当咱们新建一个conditionObject对象,都会对应一个条件队列。

构造函数

public ConditionObject() { }

构造函数啥也没干,可见,条件队列是延时初始化的,在真正用到的时候才会初始化。

Condition接口方法实现

await()第一部分分析

public final void await() throws InterruptedException {
    // 若是当前线程在调动await()方法前已经被中断了,则直接抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装成Node添加到条件队列
    Node node = addConditionWaiter();
    // 释放当前线程所占用的锁,保存当前的锁状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 若是当前队列不在同步队列中,说明刚刚被await, 尚未人调用signal方法,则直接将当前线程挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 线程将在这里被挂起,中止运行
        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
        // 因此检查下线程被唤醒的缘由,若是是由于中断被唤醒,则跳出while循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 第一部分就分析到这里,下面的部分咱们到第二部分再看, 先把它注释起来
    /*
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    */
}

咱们已经在上面的代码注释中描述了大致的流程,接下来咱们详细来看看await方法中所调用方法的具体实现。

首先是将当前线程封装成Node扔进条件队列中的addConditionWaiter方法:

addConditionWaiter

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若是尾节点被cancel了,则先遍历整个链表,清除全部被cancel的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程包装成Node扔进条件队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    /*
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
    */
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

首先咱们要思考的是,存在两个不一样的线程同时入队的状况吗?不存在。为何呢?由于前面说过了,能调用await方法的线程必然是已经得到了锁,而得到了锁的线程只有一个,因此这里不存在并发,所以不须要CAS操做。

在这个方法中,咱们就是简单的将当前线程封装成Node加到条件队列的末尾。这和将一个线程封装成Node加入等待队列略有不一样:

  1. 节点加入sync queuewaitStatus的值为0,但节点加入condition queuewaitStatus的值为Node.CONDTION
  2. sync queue的头节点为dummy节点,若是队列为空,则会先建立一个dummy节点,再建立一个表明当前节点的Node添加在dummy节点的后面;而condtion queue 没有dummy节点,初始化时,直接将firstWaiterlastWaiter直接指向新建的节点就好了。
  3. sync queue是一个双向队列,在节点入队后,要同时修改当前节点的前驱前驱节点的后继;而在condtion queue中,咱们只修改了前驱节点的nextWaiter,也就是说,condtion queue是做为单向队列来使用的。

若是入队时发现尾节点已经取消等待了,那么咱们就不该该接在它的后面,此时须要调用unlinkCancelledWaiters来剔除那些已经取消等待的线程:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

该方法将从头节点开始遍历整个队列,剔除其中waitStatus不为Node.CONDTION的节点,这里使用了两个指针firstWaitertrail来分别记录第一个和最后一个waitStatus不为Node.CONDTION的节点,这些都是基础的链表操做,很容易理解,这里再也不赘述了。

fullyRelease

在节点被成功添加到队列的末尾后,咱们将调用fullyRelease来释放当前线程所占用的锁:

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

首先,当咱们调用这个方法时,说明当前线程已经被封装成Node扔进条件队列了。在该方法中,咱们经过release方法释放锁,还记得release方法吗,咱们在逐行分析AQS源码(2)——独占锁的释放中已经详细讲过了,这里再也不赘述了。

值得注意的是,这是一次性释放了全部的锁,即对于可重入锁而言,不管重入了几回,这里是一次性释放完的,这也就是为何该方法的名字叫fullyRelease。但这里尤为要注意的是release(savedState)方法是有可能抛出IllegalMonitorStateException的,这是由于当前线程可能并非持有锁的线程。可是咱前面不是说,只有持有锁的线程才能调用await方法吗?既然fullyRelease方法在await方法中,为啥当前线程还有可能并非持有锁的线程呢?

虽然话是这么说,可是在调用await方法时,咱们其实并无检测Thread.currentThread() == getExclusiveOwnerThread(),换句话说,也就是执行到fullyRelease这一步,咱们才会检测这一点,而这一点检测是由AQS子类实现tryRelease方法来保证的,例如,ReentrantLock对tryRelease方法的实现以下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

当发现当前线程不是持有锁的线程时,咱们就会进入finally块,将当前Node的状态设为Node.CANCELLED,这也就是为何上面的addConditionWaiter在添加新节点前每次都会检查尾节点是否已经被取消了。

在当前线程的锁被彻底释放了以后,咱们就能够调用LockSupport.park(this)把当前线程挂起,等待被signal了。可是,在挂起当前线程以前咱们先用isOnSyncQueue确保了它不在sync queue中,这是为何呢?当前线程不是在一个和sync queue无关的条件队列中吗?怎么可能会出如今sync queue中的状况?

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

为了解释这一问题,咱们先来看看signal方法

signalAll()

在看signalAll以前,咱们首先要区分调用signalAll方法的线程与signalAll方法要唤醒的线程(等待在对应的条件队列里的线程):

  • 调用signalAll方法的线程自己是已经持有了锁,如今准备释放锁了;
  • 在条件队列里的线程是已经在对应的条件上挂起了,等待着被signal唤醒,而后去争锁。

首先,与调用notify时线程必须是已经持有了监视器锁相似,在调用condition的signal方法时,线程也必须是已经持有了lock锁:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

该方法首先检查当前调用signal方法的线程是否是持有锁的线程,这是经过isHeldExclusively方法来实现的,该方法由继承AQS的子类来实现,例如,ReentrantLock对该方法的实现为:

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

由于exclusiveOwnerThread保存了当前持有锁的线程,这里只要检测它是否是等于当前线程就好了。
接下来先经过firstWaiter是否为空判断条件队列是否为空,若是条件队列不为空,则调用doSignalAll方法:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

首先咱们经过lastWaiter = firstWaiter = null;将整个条件队列清空,而后经过一个do-while循环,将原先的条件队列里面的节点一个一个拿出来(令nextWaiter = null),再经过transferForSignal方法一个一个添加到sync queue的末尾:

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 * @param node the node
 * @return true if successfully transferred (else the node was
 * cancelled before signal)
 */
final boolean transferForSignal(Node node) {
    // 若是该节点在调用signal方法前已经被取消了,则直接跳过这个节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 若是该节点在条件队列中正常等待,则利用enq方法将该节点添加至sync queue队列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

transferForSignal方法中,咱们先使用CAS操做将当前节点的waitStatus状态由CONDTION设为0,若是修改不成功,则说明该节点已经被CANCEL了,则咱们直接返回,操做下一个节点;若是修改为功,则说明咱们已经将该节点从等待的条件队列中成功“唤醒”了,但此时该节点对应的线程并无真正被唤醒,它还要和其余普通线程同样去争锁,所以它将被添加到sync queue的末尾等待获取锁。

咱们这里经过enq方法将该节点添加进sync queue的末尾。关于该方法,咱们在逐行分析AQS源码(1)——独占锁的获取中已经详细讲过了,这里再也不赘述。不过这里尤为注意的是,enq方法将node节点添加进队列时,返回的是node的前驱节点

在将节点成功添加进sync queue中后,咱们获得了该节点在sync queue中的前驱节点。咱们前面说过,在sync queque中的节点都要靠前驱节点去唤醒,因此,这里咱们要作的就是将前驱节点的waitStatus设为Node.SIGNAL, 这一点和shouldParkAfterFailedAcquire所作的工做相似:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

所不一样的是,shouldParkAfterFailedAcquire将会向前查找,跳过那些被cancel的节点,而后将找到的第一个没有被cancel的节点的waitStatus设成SIGNAL,最后再挂起。而在transferForSignal中,当前Node所表明的线程自己就已经被挂起了,因此这里作的更像是一个复合操做——只要前驱节点处于被取消的状态或者没法将前驱节点的状态修成Node.SIGNAL,那咱们就将Node所表明的线程唤醒,但这个条件并不意味着当前lock处于可获取的状态,有可能线程被唤醒了,可是锁仍是被占有的状态,不过这样作至少是无害的,由于咱们在线程被唤醒后还要去争锁,若是抢不到锁,则大不了再次被挂起。

值得注意的是,transferForSignal是有返回值的,可是咱们在这个方法中并无用到,它将在signal()方法中被使用。

在继续往下看signal()方法以前,这里咱们再总结一下signalAll()方法:

  1. 将条件队列清空(只是令lastWaiter = firstWaiter = null,队列中的节点和链接关系仍然还存在)
  2. 将条件队列中的头节点取出,使之成为孤立节点(nextWaiter,prev,next属性都为null)
  3. 若是该节点处于被Cancelled了的状态,则直接跳过该节点(因为是孤立节点,则会被GC回收)
  4. 若是该节点处于正常状态,则经过enq方法将它添加到sync queue的末尾
  5. 判断是否须要将该节点唤醒(包括设置该节点的前驱节点的状态为SIGNAL),若有必要,直接唤醒该节点
  6. 重复2-5,直到整个条件队列中的节点都被处理完

signal()

signalAll()方法不一样,signal()方法只会唤醒一个节点,对于AQS的实现来讲,就是唤醒条件队列中第一个没有被Cancel的节点,弄懂了signalAll()方法,signal()方法就很容易理解了,由于它们大同小异:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

首先依然是检查调用该方法的线程(即当前线程)是否是已经持有了锁,这一点和上面的signalAll()方法同样,所不同的是,接下来调用的是doSignal方法:

private void doSignal(Node first) {
    do {
        // 将firstWaiter指向条件队列队头的下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将条件队列原来的队头从条件队列中断开,则此时该节点成为一个孤立的节点
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

这个方法也是一个do-while循环,目的是遍历整个条件队列,找到第一个没有被cancelled的节点,并将它添加到条件队列的末尾。若是条件队列里面已经没有节点了,则将条件队列清空(firstWaiter=lasterWaiter=null)。

在这里,咱们用的依然用的是transferForSignal方法,可是用到了它的返回值,只要节点被成功添加到sync queue中,transferForSignal就返回true, 此时while循环的条件就不知足了,整个方法就结束了,即调用signal()方法,只会唤醒一个线程。

总结: 调用signal()方法会从当前条件队列中取出第一个没有被cancel的节点添加到sync队列的末尾。

await()第二部分分析

前面咱们已经分析了signal方法,它会将节点添加进sync queue队列中,并要么当即唤醒线程,要么等待前驱节点释放锁后将本身唤醒,不管怎样,被唤醒的线程要从哪里恢复执行呢?固然是被挂起的地方呀,咱们在哪里被挂起的呢?还记得吗?固然是调用了await方法的地方,以await()方法为例:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 咱们在这里被挂起了,被唤醒后,将从这里继续往下运行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

这里值得注意的是,当咱们被唤醒时,其实并不知道是由于什么缘由被唤醒,有多是由于其余线程调用了signal方法,也有多是由于当前线程被中断了。

可是,不管是被中断唤醒仍是被signal唤醒,被唤醒的线程最后都将离开condition queue,进入到sync queue中,这一点咱们在下面分析源代码的时候详细说。

随后,线程将在sync queue中利用进行acquireQueued方法进行“阻塞式”争锁,抢到锁就返回,抢不到锁就继续被挂起。所以,当await()方法返回时,必然是保证了当前线程已经持有了lock锁。

另外有一点这里咱们提早说明一下,这一点对于咱们下面理解源码很重要,那就是:

若是从线程被唤醒,到线程获取到锁这段过程当中发生过中断,该怎么处理?

咱们前面分析中断的时候说过,中断对于当前线程只是个建议,由当前线程决定怎么对其作出处理。在acquireQueued方法中,咱们对中断是不响应的,只是简单的记录抢锁过程当中的中断状态,并在抢到锁后将这个中断状态返回,交于上层调用的函数处理,而这里“上层调用的函数”就是咱们的await()方法。

那么await()方法是怎么对待这个中断的呢?这取决于:

中断发生时,线程是否已经被signal过?

若是中断发生时,当前线程并无被signal过,则说明当前线程还处于条件队列中,属于正常在等待中的状态,此时中断将致使当前线程的正常等待行为被打断,进入到sync queue中抢锁,所以,在咱们从await方法返回后,须要抛出InterruptedException,表示当前线程由于中断而被唤醒。

若是中断发生时,当前线程已经被signal过了,则说明这个中断来的太晚了,既然当前线程已经被signal过了,那么就说明在中断发生前,它就已经正常地被从condition queue中唤醒了,因此随后即便发生了中断(注意,这个中断能够发生在抢锁以前,也能够发生在抢锁的过程当中),咱们都将忽略它,仅仅是在await()方法返回后,再自我中断一下,补一下这个中断。就好像这个中断是在await()方法调用结束以后才发生的同样。这里之因此要“补一下”这个中断,是由于咱们在用Thread.interrupted()方法检测是否发生中断的同时,会将中断状态清除,所以若是选择了忽略中断,则应该在await()方法退出后将它设成原来的样子。

关于“这个中断来的太晚了”这一点若是你们不太容易理解的话,这里打个比方:这就比如咱们去饭店吃饭,都快吃完了,有一个菜到如今尚未上,因而咱们经常会把服务员叫来问:这个菜有没有在作?要是还没作咱们就不要了。而后服务员会跑到厨房去问,以后跑回来讲:对不起,这个菜已经下锅在炒了,请再耐心等待一下。这里,这个“这个菜咱们不要了”(发起的中断)就来的太晚了,由于菜已经下锅了(已经被signal过了)。

理清了上面的概念,咱们再来看看await()方法是怎么作的,它用中断模式interruptMode这个变量记录中断事件,该变量有三个值:

  1. 0 : 表明整个过程当中一直没有中断发生。
  2. THROW_IE : 表示退出await()方法时须要抛出InterruptedException,这种模式对应于中断发生在signal以前
  3. REINTERRUPT : 表示退出await()方法时只须要再自我中断如下,这种模式对应于中断发生在signal以后,即中断来的太晚了。
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

接下来咱们就从线程被唤醒的地方继续往下走,一步步分析源码:

状况1:中断发生时,线程尚未被signal过

线程被唤醒后,咱们将首先使用checkInterruptWhileWaiting方法检测中断的模式:

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

这里假设已经发生过中断,则Thread.interrupted()方法必然返回true,接下来就是用transferAfterCancelledWait进一步判断是否发生了signal:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

上面已经说过,判断一个node是否被signal过,一个简单有效的方法就是判断它是否离开了condition queue, 进入到sync queue中。

换句话说,只要一个节点的waitStatus仍是Node.CONDITION,那就说明它尚未被signal过。
因为如今咱们分析状况1,则当前节点的waitStatus必然是Node.CONDITION,则会成功执行compareAndSetWaitStatus(node, Node.CONDITION, 0),将该节点的状态设置成0,而后调用enq(node)方法将当前节点添加进sync queue中,而后返回true
这里值得注意的是,咱们此时并无断开node的nextWaiter,因此最后必定不要忘记将这个连接断开。

再回到transferAfterCancelledWait调用处,可知,因为transferAfterCancelledWait将返回true,如今checkInterruptWhileWaiting将返回THROW_IE,这表示咱们在离开await方法时应当要抛出THROW_IE异常。

再回到checkInterruptWhileWaiting的调用处:

public final void await() throws InterruptedException {
    /*
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 咱们如今在这里!!!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

interruptMode如今为THROW_IE,则咱们将执行break,跳出while循环。

接下来咱们将执行acquireQueued(node, savedState)进行争锁,注意,这里传入的须要获取锁的重入数量是savedState,即以前释放了多少,这里就须要再次获取多少:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 若是线程获取不到锁,则将在这里被阻塞住
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued咱们在前面的文章中已经详细分析过了,它是一个阻塞式的方法,获取到锁则退出,获取不到锁则会被挂起。该方法只有在最终获取到了锁后,才会退出,而且退出时会返回当前线程的中断状态,若是咱们在获取锁的过程当中又被中断了,则会返回true,不然会返回false。可是其实这里返回true仍是false已经不重要了,由于前面已经发生过中断了,咱们就是由于中断而被唤醒的不是吗?因此不管如何,咱们在退出await()方法时,必然会抛出InterruptedException

咱们这里假设它获取到了锁了,则它将回到上面的调用处,因为咱们这时的interruptMode = THROW_IE,则会跳过if语句。
接下来咱们将执行:

if (node.nextWaiter != null) 
    unlinkCancelledWaiters();

上面咱们说过,当前节点的nextWaiter是有值的,它并无和原来的condition队列断开,这里咱们已经获取到锁了,根据逐行分析AQS源码(1)——独占锁的获取中的分析,咱们经过setHead方法已经将它的thread属性置为null,从而将当前线程从sync queue"移除"了,接下来应当将它从condition队列里面移除。因为condition队列是一个单向队列,咱们没法获取到它的前驱节点,因此只能从头开始遍历整个条件队列,而后找到这个节点,再移除它。

然而,事实上呢,咱们并无这么作。由于既然已经必须从头开始遍历链表了,咱们就干脆一次性把链表中全部没有在等待的节点都拿出去,因此这里调用了unlinkCancelledWaiters方法,该方法咱们在前面await()第一部分的分析的时候已经讲过了,它就是简单的遍历链表,找到全部waitStatus不为CONDITION的节点,并把它们从队列中移除。

节点被移除后,接下来就是最后一步了——汇报中断状态:

if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

这里咱们的interruptMode=THROW_IE,说明发生了中断,则将调用reportInterruptAfterWait

/**
 * Throws InterruptedException, reinterrupts current thread, or
 * does nothing, depending on mode.
 */
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

能够看出,在interruptMode=THROW_IE时,咱们就是简单的抛出了一个InterruptedException

至此,状况1(中断发生于signal以前)咱们就分析完了,这里咱们简单总结一下:

  1. 线程由于中断,从挂起的地方被唤醒
  2. 随后,咱们经过transferAfterCancelledWait确认了线程的waitStatus值为Node.CONDITION,说明并无signal发生过
  3. 而后咱们修改线程的waitStatus为0,并经过enq(node)方法将其添加到sync queue
  4. 接下来线程将在sync queue中以阻塞的方式获取,若是获取不到锁,将会被再次挂起
  5. 线程在sync queue中获取到锁后,将调用unlinkCancelledWaiters方法将本身从条件队列中移除,该方法还会顺便移除其余取消等待的锁
  6. 最后咱们经过reportInterruptAfterWait抛出了InterruptedException

由此能够看出,一个调用了await方法挂起的线程在被中断后不会当即抛出InterruptedException,而是会被添加到sync queue中去争锁,若是争不到,仍是会被挂起;
只有争到了锁以后,该线程才得以从sync queuecondition queue中移除,最后抛出InterruptedException

因此说,一个调用了await方法的线程,即便被中断了,它依旧仍是会被阻塞住,直到它获取到锁以后才能返回,并在返回时抛出InterruptedException。中断对它意义更多的是体如今将它从condition queue中移除,加入到sync queue中去争锁,从这个层面上看,中断和signal的效果其实很像,所不一样的是,在await()方法返回后,若是是由于中断被唤醒,则await()方法须要抛出InterruptedException异常,表示是它是被非正常唤醒的(正常唤醒是指被signal唤醒)。

状况2:中断发生时,线程已经被signal过了

这种状况对应于“中断来的太晚了”,即REINTERRUPT模式,咱们在拿到锁退出await()方法后,只须要再自我中断一下,不须要抛出InterruptedException。

值得注意的是这种状况其实包含了两个子状况:

  1. 被唤醒时,已经发生了中断,但此时线程已经被signal过了
  2. 被唤醒时,并无发生中断,可是在抢锁的过程当中发生了中断

下面咱们分别来分析:

状况2.1:被唤醒时,已经发生了中断,但此时线程已经被signal过了

对于这种状况,与前面中断发生于signal以前的主要差异在于transferAfterCancelledWait方法:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //线程A执行到这里,CAS操做将会失败
        enq(node);
        return true;
    }
    // 因为中断发生前,线程已经被signal了,则这里只须要等待线程成功进入sync queue便可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

在这里,因为signal已经发生过了,则由咱们以前分析的signal方法可知,此时当前节点的waitStatus一定不为Node.CONDITION,他将跳过if语句。此时当前线程可能已经在sync queue中,或者正在进入到sync queue的路上

为何这里会出现“正在进入到sync queue的路上”的状况呢? 这里咱们解释下:

假设当前线程为线程A, 它被唤醒以后检测到发生了中断,来到了transferAfterCancelledWait这里,而另外一个线程B在这以前已经调用了signal方法,该方法会调用transferForSignal将当前线程添加到sync queue的末尾:

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 线程B执行到这里,CAS操做将会成功
        return false; 
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

由于线程A和线程B是并发执行的,而这里咱们分析的是“中断发生在signal以后”,则此时,线程B的compareAndSetWaitStatus先于线程A执行。这时可能出现线程B已经成功修改了node的waitStatus状态,可是还没来得及调用enq(node)方法,线程A就执行到了transferAfterCancelledWait方法,此时它发现waitStatus已经不是Condition,可是其实当前节点尚未被添加到sync node队列中,所以,它接下来将经过自旋,等待线程B执行完transferForSignal方法。

线程A在自旋过程当中会不断地判断节点有没有被成功添加进sync queue,判断的方法就是isOnSyncQueue

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

该方法很好理解,只要waitStatus的值还为Node.CONDITION,则它必定还在condtion队列中,天然不可能在sync里面;而每个调用了enq方法入队的线程:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { //即便这一步失败了next.prev必定是有值的
                t.next = node; // 若是t.next有值,说明上面的compareAndSetTail方法必定成功了,则当前节点成为了新的尾节点
                return t; // 返回了当前节点的前驱节点
            }
        }
    }
}

哪怕在设置compareAndSetTail这一步失败了,它的prev必然也是有值的,所以这两个条件只要有一个知足,就说明节点必然不在sync queue队列中。

另外一方面,若是node.next有值,则说明它不只在sync queue中,而且在它后面还有别的节点,则只要它有值,该节点必然在sync queue中。
若是以上都不知足,说明这里出现了尾部分叉(关于尾部分叉,参见这里)的状况,咱们就从尾节点向前寻找这个节点:

/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

这里固然仍是有可能出现从尾部反向遍历找不到的状况,可是不用担忧,咱们还在while循环中,不管如何,节点最后总会入队成功的。最终,transferAfterCancelledWait将返回false。

再回到transferAfterCancelledWait调用处:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

则这里,因为transferAfterCancelledWait返回了false,则checkInterruptWhileWaiting方法将返回REINTERRUPT,这说明咱们在退出该方法时只须要再次中断。

再回到checkInterruptWhileWaiting方法的调用处:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //咱们在这里!!!
            break;
    }
    //当前interruptMode=REINTERRUPT,不管这里是否进入if体,该值不变
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

此时,interruptMode的值为REINTERRUPT,咱们将直接跳出while循环。

接下来就和上面的状况1同样了,咱们依然仍是去争锁,这一步依然是阻塞式的,获取到锁则退出,获取不到锁则会被挂起。

另外因为如今interruptMode的值已经为REINTERRUPT,所以不管在争锁的过程当中是否发生过中断interruptMode的值都仍是REINTERRUPT

接着就是将节点从condition queue中剔除,与状况1不一样的是,在signal方法成功将node加入到sync queue时,该节点的nextWaiter已是null了,因此这里这一步不须要执行。

再接下来就是报告中断状态了:

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

注意,这里并无抛出中断异常,而只是将当前线程再中断一次。

至此,状况2.1(被唤醒时,已经发生了中断,但此时线程已经被signal过了)咱们就分析完了,这里咱们简单总结一下:

  1. 线程从挂起的地方被唤醒,此时既发生过中断,又发生过signal
  2. 随后,咱们经过transferAfterCancelledWait确认了线程的waitStatus值已经不为Node.CONDITION,说明signal发生于中断以前
  3. 而后,咱们经过自旋的方式,等待signal方法执行完成,确保当前节点已经被成功添加到sync queue
  4. 接下来线程将在sync queue中以阻塞的方式获取锁,若是获取不到,将会被再次挂起
  5. 最后咱们经过reportInterruptAfterWait将当前线程再次中断,可是不会抛出InterruptedException
状况2.2:被唤醒时,并无发生中断,可是在抢锁的过程当中发生了中断

这种状况就比上面的状况简单一点了,既然被唤醒时没有发生中断,那基本能够确信线程是被signal唤醒的,可是不要忘记还存在“假唤醒”这种状况,所以咱们依然仍是要检测被唤醒的缘由。

那么怎么区分究竟是假唤醒仍是由于是被signal唤醒了呢?

若是线程是由于signal而被唤醒,则由前面分析的signal方法可知,线程最终都会离开condition queue 进入sync queue中,因此咱们只须要判断被唤醒时,线程是否已经在sync queue中便可:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);  // 咱们在这里,线程将在这里被唤醒
        // 因为如今没有发生中断,因此interruptMode目前为0
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

线程被唤醒时,暂时尚未发生中断,因此这里interruptMode = 0, 表示没有中断发生,因此咱们将继续while循环,这时咱们将经过isOnSyncQueue方法判断当前线程是否已经在sync queue中了。因为已经发生过signal了,则此时node必然已经在sync queue中了,因此isOnSyncQueue将返回true,咱们将退出while循环。

不过这里插一句,若是isOnSyncQueue检测到当前节点不在sync queue中,则说明既没有发生中断,也没有发生过signal,则当前线程是被“假唤醒”的,那么咱们将再次进入循环体,将线程挂起。

退出while循环后接下来仍是利用acquireQueued争锁,由于前面没有发生中断,则interruptMode=0,这时,若是在争锁的过程当中发生了中断,则acquireQueued将返回true,则此时interruptMode将变为REINTERRUPT

接下是判断node.nextWaiter != null,因为在调用signal方法时已经将节点移出了队列,全部这个条件也不成立。

最后就是汇报中断状态了,此时interruptMode的值为REINTERRUPT,说明线程在被signal后又发生了中断,这个中断发生在抢锁的过程当中,这个中断来的太晚了,所以咱们只是再次自我中断一下。

至此,状况2.2(被唤醒时,并无发生中断,可是在抢锁的过程当中发生了中断)咱们就分析完了,这种状况和2.1很像,区别就是一个是在唤醒后就被发现已经发生了中断,一个个唤醒后没有发生中断,可是在抢锁的过成中发生了中断,但不管如何,这两种状况都会被归结为“中断来的太晚了”,中断模式为REINTERRUPT,状况2.2的总结以下:

  1. 线程被signal方法唤醒,此时并无发生过中断
  2. 由于没有发生过中断,咱们将从checkInterruptWhileWaiting处返回,此时interruptMode=0
  3. 接下来咱们回到while循环中,由于signal方法保证了将节点添加到sync queue中,此时while循环条件不成立,循环退出
  4. 接下来线程将在sync queue中以阻塞的方式获取,若是获取不到锁,将会被再次挂起
  5. 线程获取到锁返回后,咱们检测到在获取锁的过程当中发生过中断,而且此时interruptMode=0,这时,咱们将interruptMode修改成REINTERRUPT
  6. 最后咱们经过reportInterruptAfterWait将当前线程再次中断,可是不会抛出InterruptedException

这里咱们再总结如下状况2(中断发生时,线程已经被signal过了),这种状况对应于中断发生signal以后,咱们无论这个中断是在抢锁以前就已经发生了仍是抢锁的过程当中发生了,只要它是在signal以后发生的,咱们就认为它来的太晚了,咱们将忽略这个中断。所以,从await()方法返回的时候,咱们只会将当前线程从新中断一下,而不会抛出中断异常。

状况3: 一直没有中断发生

这种状况就更简单了,它的大致流程和上面的状况2.2差很少,只是在抢锁的过程当中也没有发生异常,则interruptMode为0,没有发生过中断,所以不须要汇报中断。则线程就从await()方法处正常返回。

await()总结

至此,咱们总算把await()方法完整的分析完了,这里咱们对整个方法作出总结:

  1. 进入await()时必须是已经持有了锁
  2. 离开await()时一样必须是已经持有了锁
  3. 调用await()会使得当前线程被封装成Node扔进条件队列,而后释放所持有的锁
  4. 释放锁后,当前线程将在condition queue中被挂起,等待signal或者中断
  5. 线程被唤醒后会将会离开condition queue进入sync queue中进行抢锁
  6. 若在线程抢到锁以前发生过中断,则根据中断发生在signal以前仍是以后记录中断模式
  7. 线程在抢到锁后进行善后工做(离开condition queue, 处理中断异常)
  8. 线程已经持有了锁,从await()方法返回

await() 方法流程

在这一过程当中咱们尤为要关注中断,如前面所说,中断和signal所起到的做用都是将线程从condition queue中移除,加入到sync queue中去争锁,所不一样的是,signal方法被认为是正常唤醒线程,中断方法被认为是非正常唤醒线程,若是中断发生在signal以前,则咱们在最终返回时,应当抛出InterruptedException;若是中断发生在signal以后,咱们就认为线程自己已经被正常唤醒了,这个中断来的太晚了,咱们直接忽略它,并在await()返回时再自我中断一下,这种作法至关于将中断推迟至await()返回时再发生。

awaitUninterruptibly()

在前面咱们分析的await()方法中,中断起到了和signal一样的效果,可是中断属于将一个等待中的线程非正常唤醒,可能即便线程被唤醒后,也抢到了锁,可是却发现当前的等待条件并无知足,则仍是得把线程挂起。所以咱们有时候并不但愿await方法被中断,awaitUninterruptibly()方法即实现了这个功能:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true; // 发生了中断后线程依旧留在了condition queue中,将会再次被挂起
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

首先,从方法签名上就能够看出,这个方法不会抛出中断异常,咱们拿它和await()方法对比一下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())  // 不一样之处
        throw new InterruptedException(); // 不一样之处
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;  // 不一样之处
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  // 不一样之处
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  // 不一样之处
        interruptMode = REINTERRUPT;  // 不一样之处
    if (node.nextWaiter != null)  // 不一样之处
        unlinkCancelledWaiters(); // 不一样之处
    if (interruptMode != 0) // 不一样之处
        reportInterruptAfterWait(interruptMode); // 不一样之处
}

因而可知,awaitUninterruptibly()全程忽略中断,即便是当前线程由于中断被唤醒,该方法也只是简单的记录中断状态,而后再次被挂起(由于并无并无任何操做将它添加到sync queue中)

要使当前线程离开condition queue去争锁,则必须是发生了signal事件。

最后,当线程在获取锁的过程当中发生了中断,该方法也是不响应,只是在最终获取到锁返回时,再自我中断一下。能够看出,该方法和“中断发生于signal以后的”REINTERRUPT模式的await()方法很像。

至此,该方法咱们就分析完了,若是你以前await()方法已经弄懂了,这个awaitUninterruptibly()方法就很容易理解了。它的核心思想是:

  1. 中断虽然会唤醒线程,可是不会致使线程离开condition queue,若是线程只是由于中断而被唤醒,则他将再次被挂起
  2. 只有signal方法会使得线程离开condition queue
  3. 调用该方法时或者调用过程当中若是发生了中断,仅仅会在该方法结束时再自我中断如下,不会抛出InterruptedException

awaitNanos(long nanosTimeout)

前面咱们看的方法,不管是await()仍是awaitUninterruptibly(),它们在抢锁的过程当中都是阻塞式的,即一直到抢到了锁才能返回,不然线程仍是会被挂起,这样带来一个问题就是线程若是长时间抢不到锁,就会一直被阻塞,所以咱们有时候更须要带超时机制的抢锁,这一点和带超时机制的wait(long timeout)是很像的,咱们直接来看源码:

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);*/
    final long deadline = System.nanoTime() + nanosTimeout;
    /*int interruptMode = 0;
    while (!isOnSyncQueue(node)) */{
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        /*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;*/
        nanosTimeout = deadline - System.nanoTime();
    }
    /*if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);*/
    return deadline - System.nanoTime();
}

该方法几乎和await()方法同样,只是多了超时时间的处理,咱们上面已经把和await()方法相同的部分注释起来了,只留下了不一样的部分,这样它们的区别就变得更明显了。

该方法的主要设计思想是,若是设定的超时时间还没到,咱们就将线程挂起;超过等待的时间了,咱们就将线程从condtion queue转移到sync queue中。注意这里对于超时时间有一个小小的优化——当设定的超时时间很短时(小于spinForTimeoutThreshold的值),咱们就是简单的自旋,而不是将线程挂起,以减小挂起线程和唤醒线程所带来的时间消耗。

不过这里还有一处值得注意,就是awaitNanos(0)的意义,咱们在线程间的同步与通讯(2)——wait, notify, notifyAll曾经提到过,wait(0)的含义是无限期等待,而咱们在awaitNanos(long nanosTimeout)方法中是怎么处理awaitNanos(0)的呢?

if (nanosTimeout <= 0L) {
    transferAfterCancelledWait(node);
    break;
}

从这里能够看出,若是设置的等待时间自己就小于等于0,当前线程是会直接从condition queue中转移到sync queue中的,并不会被挂起,也不须要等待signal,这一点确实是更复合逻辑。若是须要线程只有在signal发生的条件下才会被唤醒,则应该用上面的awaitUninterruptibly()方法。

await(long time, TimeUnit unit)

看完awaitNanos(long nanosTimeout)再看await(long time, TimeUnit unit)方法就更简单了,它就是在awaitNanos(long nanosTimeout)的基础上多了对于超时时间的时间单位的设置,可是在内部实现上仍是会把时间转成纳秒去执行,这里咱们直接拿它和上面的awaitNanos(long nanosTimeout)方法进行对比,只给出不一样的部分:

public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;*/
    /*boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {*/
            timedout = transferAfterCancelledWait(node);
            /*break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);*/
    return !timedout;
}

能够看出,这两个方法主要的差异就体如今返回值上面,awaitNanos(long nanosTimeout)的返回值是剩余的超时时间,若是该值大于0,说明超时时间还没到,则说明该返回是由signal行为致使的,而await(long time, TimeUnit unit)的返回值就是transferAfterCancelledWait(node)的值,咱们知道,若是调用该方法时,node尚未被signal过则返回true,node已经被signal过了,则返回false。所以当await(long time, TimeUnit unit)方法返回true,则说明在超时时间到以前就已经发生过signal了,该方法的返回是由signal方法致使的而不是超时时间。

综上,调用await(long time, TimeUnit unit)其实就等价于调用awaitNanos(unit.toNanos(time)) > 0 方法,关于这一点,咱们在介绍condition接口的时候也已经提过了。

awaitUntil(Date deadline)

awaitUntil(Date deadline)方法与上面的几种带超时的方法也基本相似,所不一样的是它的超时时间是一个绝对的时间,咱们直接拿它来和上面的await(long time, TimeUnit unit)方法对比:

public final boolean awaitUntil(Date deadline) throws InterruptedException {
    long abstime = deadline.getTime();
    /*if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) */{
        if (System.currentTimeMillis() > abstime) {
            /*timedout = transferAfterCancelledWait(node);
            break;
        }*/
        LockSupport.parkUntil(this, abstime);
        /*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;*/
    }
    /*
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;*/
}

可见,这里大段的代码都是重复的,区别就是在超时时间的判断上使用了绝对时间,其实这里的deadline就和awaitNanos(long nanosTimeout)以及await(long time, TimeUnit unit)内部的deadline变量是等价的,另外就是在这个方法中,没有使用spinForTimeoutThreshold进行自旋优化,由于通常调用这个方法,目的就是设定一个较长的等待时间,不然使用上面的相对时间会更方便一点。

至此,AQS对于Condition接口的实现咱们就所有分析完了。

(完)

系列文章目录

相关文章
相关标签/搜索