并发编程中条件变量Condition的源码分析

扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,便可关注微信公众号,阅读更多Spring源码分析Java并发编程文章。java

微信公众号

队列同步器AQS是经过管程模型来实现的,在管程模型中,咱们提到了两个队列:入口等待队列条件变量等待队列。在AQS中同步队列对应管程模型中的入口等待队列条件等待队列对应管程模型中的条件变量等待队列。关于AQS的同步队列的设计原理及源码实现能够阅读这两篇文章:队列同步器(AQS)的设计原理队列同步器(AQS)源码分析。今天将详细分析AQS中等待队列的设计原理和源码。(关于管程的介绍能够参考这篇文章:管程:并发编程的基石node

简介

  • 在并发领域中须要解决的两个问题:互斥同步,互斥指的是同一时刻只容许一个线程访问共享资源,这一点AQS的同步队列已经帮助咱们解决了。同步指的是线程间如何进行通讯和协做,那么AQS又是如何来解决同步问题的呢?答案就是今天的主角:Condition
  • Condition是JUC包下的一个接口,它的一个实现类ConditionObject是队列同步器AbstractQueuedSynchronizer(后面简称AQS)的一个内部类。它须要与Lock一块儿使用,经过Lock.newCondition()来建立实例。
  • 在Object类中有三个native方法:wait()、notify()、notifyAll(),它们须要与synchronize关键字一块儿使用,用它来实现线程之间的通讯。而Condition提供了await()、signal()、signalAll()三个方法,它们分别与Object类的这三个方法对应,也是用来实现线程以前的通讯的,可是它们在功能和使用法方式上存在部分差别。具体差别能够参考下表。(表格来源于《Java并发编程的艺术》一书第5章第6节)
对比项 Object Condition
使用前提 使用synchronize获取到锁 使用Lock实例的lock()方法获取到锁
使用方式 object.wait()、object.notify()等 须要使用Lock接口的实例对象来建立,Lock.newCondition()
等待队列个数 支持1个 能够支持多个,使用Lock实例new多个Condition便可
线程进入等待队列后是否响应中断 不支持 支持
超时等待 支持 支持
线程释放锁后等待到未来的某个时间点 不支持 支持
唤醒等待队列中的一个线程 支持,notify() 支持,signal()
唤醒等待队列中的全部线程 支持,notifyAll() 支持,signalAll()

AQS中如何设计等待对列

数据结构

  • AQS中的同步队列的数据结构是Node元素造成的一个双向链表,一样,等待队列也是以Node元素造成的一个链表,只不过是单向链表。在 队列同步器(AQS)的设计原理 这篇文章中已经介绍过Node的属性了,今天再来复习一下它的属性以及用途。
属性名 做用
Node prev 同步队列中,当前节点的前一个节点,若是当前节点是同步队列的头结点,那么prev属性为null
Node next 同步队列中,当前节点的后一个节点,若是当前节点是同步队列的尾结点,那么next属性为null
Node thread 当前节点表明的线程,若是当前线程获取到了锁,那么当前线程所表明的节点必定处于同步队列的队首,且thread属性为null,至于为何要将其设置为null,这是AQS特地设计的。
int waitStatus 当前线程的等待状态,有5种取值。0表示初始值,1表示线程被取消,-1表示当前线程处于等待状态,-2表示节点处于等待队列中,-3表示下一次共享式同步状态获取将会无条件地被传播下去
Node nextWaiter 等待队列中,该节点的下一个节点
  • AQS等待队列的实现主要依靠的是Node节点中nextWaiter属性和waitStatus属性来实现的,其中waitStatus=-2时,表示线程是处于等待队列中。(同步队列依靠prev和next属性来实现双向链表)。Condition包含两个属性:firstWaiterlastWaiter,分别表示等待队列的队首和队尾。等待队列遵循先进先出的原则(FIFO)。下图为Condition等待队列的示意图。

等待队列示意图

实现原理

  • synchronize实现锁的原理也是经过管程模型实现的,可是synchronize实现的锁,只支持一个等待队列,而Condition能够支持多个等待队列。在AQS中,等待队列和同步队列的示意图以下。

同步队列和等待队列

  • 当线程经过Lock获取锁之后,调用Condition的await()方法时,当前线程会先将本身封装成一个Node节点,而后将这个节点添加到等待队列中;而后释放锁;最后调用LockSopport.park()方法将本身挂起。可使用以下示意图表示。

await()示意图

  • 当调用Condition的signal()方法时,首先会移除等待队列的firstWaiter节点,而后将firstWaiter节点加入到同步队列中。示意图以下。

signal()示意图

  • 若是是调用Condition的signalAll()方法,那么就会将Condition等待队列中全部Node节点移到同步队列中

源码分析

理解了上面的等待队列的数据结构和实现原理,接下来就结合源码看看具体的实现。接下来将分析await()方法和signal()方法的源码。编程

await()方法

当调用condition.await()方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的await()方法。该方法的源码以下。设计模式

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程加入到等待队列中
    Node node = addConditionWaiter();
    // 彻底释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 判断同步节点是否在同步队列中,
    // 若是在当前线程释放锁后,锁被其余线程抢到了,此时当前节点就不在同步队列中了。
    // 若是锁没有被抢占到,节点就会再同步队列中(当前线程是持有锁的线程,因此它是头结点)
    while (!isOnSyncQueue(node)) {
        // 若是节点不在同步队列中,将当前线程park
        LockSupport.park(this);
        /** * 当被唤醒之后,接着从下面开始执行。醒来后会判断本身在等待过程当中有没有被中断过。 * checkInterruptWhileWaiting()方法返回0表示没有被中断过 * 返回-1表示须要抛出异常 * 返回1表示须要重置中断标识 */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 若是线程在同步队列中,那么就尝试去获取锁,若是获取不到,就会加入到同步队列中,并阻塞
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 当条件等待队列中还有其余线程在等待时,须要判断条件等待队列中有没有线程被取消,若是有,则将它们清除
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
复制代码
  • 在await()方法中,当前线程会先调用addConditionWaiter()方法,将本身封装成一个Node后,加入到等待队列中。而后调用fullyRelease()方法释放锁,接着经过判断当前线程是否在等待队列中,若是不在等待队列中,就将本身park
  • 在addConditionWaiter()方法中,主要干了两件事。一:将等待队列中处于取消状态的节点删除,即waitStatus=1的Node节点;二:将本身加入到等待队列,令Condition的lastWaiter属性等于当前线程所表明的节点。(注意:此时若是等待队列没有进行初始化时,会先进行初始化)。addConditionWaiter()方法的源码以下。
/** * Adds a new waiter to wait queue. * @return its new wait node * 将当前线程加入到等待队列中 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 建立一个节点,节点的waitStatus等于-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是等待队列尚未初始化,即等跌队列中尚未任何元素,那么此时firstWaiter和lastWaiter均为null
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node; // 令旧的队尾节点nextWaiter属性等于当前线程的节点,这样就维护了队列的先后关系
    lastWaiter = node;
    return node;
}
复制代码
  • 对于fullyRelease()方法,从方法名看,就知道它的做用是彻底释放锁。这里为何要彻底释放锁呢?由于对于重入锁而言,锁可能被重入了屡次,此时同步变量state的值大于1,而调用await()方法时,要让当前线程将锁释放掉,因此须要将state的值减为0,所以这里取名为fullyRelease()。fullyRelease()最终仍是调用AQS的release()方法来释放锁。fullyRelease()方法的源码以下。
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取同步变量state的值
        int savedState = getState();
        // 释放锁,注意此时将同步变量的值传入进去了,若是是重入锁,且被重入过,那么此时savedState的值大于1
        // 此时释放锁时,会将同步变量state的值减为0。(一般可重入锁在释放锁时,每次只会将state减1,重入了几回就要释放几回,在这里是一会儿所有释放)。
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 当线程没有获取到锁时,调用该方法会释放失败,会抛出异常。
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码
  • 在await()方法中,当释放完锁之后,紧接着经过isOnSyncQueue()判断当前线程的节点是否是在同步队列中,若是isOnSyncQueue()方法返回true,表示节点在同步队列中,若是返回false,表示当前节点不在同步队列中。当返回false时,会进入到while循环中,此时会调用LockSupport.park()方法,让当前线程挂起。因为当前线程在释放锁之后会唤醒同步队列中的线程去抢锁,若是有线程抢到锁,那么当前线程就确定不会在同步队列了(同步队列的首节点变化了),因此此时isOnSyncQueue()方法大几率返回的是false,所以会进入到while()方法中。若是当前线程在同步队列中,或者从park()处醒来后(醒来后会出如今同步队列中,由于signal()或者signalAll()方法会将节点移到同步队列),就会执行到await()方法后面的逻辑,即acquireQueued()方法,该方法就是去尝试获取锁,若是获取到就会返回,获取不到就阻塞。
  • isOnSyncQueue()的源码以及注释以下。
final boolean isOnSyncQueue(Node node) {
    // 若是节点的waitStatus=-2时,节点确定不在同步队列中,由于只有在等待队列时,才会为-2。
    // 若是节点的前驱节点为空时,有两种状况:
    // 1. 当前节点是同步队列的首节点,首节点是已经获取到锁的线程,能够认为线程不在同步队列中
    // 2. 当前节点在等待队列中,等待队列中的节点在建立时,没有给prev属性赋值
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 在上一个判断的前提条件下,若是后置节点不为空,那么当前节点确定在同步队列中。
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
    // 以上状况均不属于,那么就从同步队列的尾部开始遍历,找到同步队列中是否含有node节点
    return findNodeFromTail(node);
}
复制代码
  • 关于await()方法,总结起来就是三个步骤:1. 将本身加到等待队列;2. 释放锁;3. 将本身park()。在Condition接口中还提供了几个重载的await()方法,它们在await()方法的基础上添加了部分功能,例如超时等待、不响应中断的等待等功能,但大体逻辑和await()方法相似,有兴趣的朋友能够去研究下。

signal()方法

  • 当调用condition.signal()方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的signal()方法。该方法的源码以下。
public final void signal() {
    // 先判断当前线程有没有获取到锁,若是没有获取到锁就来调用signal()方法,就会抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
复制代码
  • 在signal()方法中会判断当期线程是不是锁的拥有者,若是不是,就抛出异常。这就是为何说调用await()和signal()、signalAll()方法的前提条件是:当前线程获取到了锁,由于这几个方法在执行具体逻辑以前都判断了当前线程是否等于持有锁的线程。从代码中能够看到,signal()方法的具体逻辑是在doSignal()方法中实现的。doSignal()方法的源码以下。
private void doSignal(Node first) {
    do {
        // 令firstWaiter等于条件等待队列中的下一个节点。
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        /** *调用transferForSignal()方法,是将节点从条件等待队列中移到同步队列中. * 当transferForSignal()返回true时,表示节点被成功移到同步队列了。返回false,表示移动失败,当节点所表示的线程被取消时,会返回false * 当transferForSignal()返回true时,do...while循环结束。返回false时,继续。为何要这样呢? * 由于当transferForSignal()返回false表示条件等待队列中的,队列的头结点的状态时取消状态,不能将它移到同步队列中,随意须要继续从条件等待队列找没有被取消的节点。 */
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
复制代码
  • doSignal()方法的做用就是将等待队列的头节点从等待队列中移除,而后调用transferForSignal()方法将其加入到同步队列中,当transferForSignal()返回true时,表示节点被成功移到了同步队列中,返回false表示移动失败,只有一种状况会移动失败,那就是线程被取消了。transferForSignal()方法的源码以下。
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) * 将节点从条件等待队列移到同步队列 */
final boolean transferForSignal(Node node) {
    /* * If cannot change waitStatus, the node has been cancelled. */
    // 将节点的waitStatus的值从-2改成-1。这里若是出现CAS失败,说明节点的waitStatus值被修改过,在条件等待队列中,只有当线程被取消后,才会去修改waitStatus
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    // 当加入到同步队列后,须要将当前节点的前一个节点的waitStatus设置为-1,表示队列中还有线程在等待
    // 若是前驱节点的waitStatus大于0表示线程被取消,须要将当前线程唤醒
    // 或者修改前驱节点的waitStatus是失败,也须要去唤醒当前线程
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
复制代码
  • signal()方法总结:将等待队列的头节点移动到同步队列中。细心的朋友可能会发现,调用signal()方法,并不会让当前线程释放锁,因此当线程调用完signal()方法后,当前线程会将本身的业务逻辑执行完,直到在当前线程中调用了lock.unlock()方法,才会释放锁。

signalAll()方法

  • signalAll()方法的做用就是唤醒等待队列中的全部节点,而signal()方法只唤醒等待队列的第一个节点。当调用condition.signalAll()方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的signalAll()方法。该方法的源码以下。
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 核心逻辑在doSignalAll()方法
        doSignalAll(first);
}
复制代码
  • 在signalAll()方法中会调用doSignalAll()方法。doSignalAll()方法的源码以下。
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    // 经过do...while循环,遍历等待队列的全部节点
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        // transferForSignal()方法将节点移到到同步队列
        transferForSignal(first);
        first = next;
    } while (first != null);
}
复制代码
  • 能够发现,doSignalAll()方法经过do...while循环,遍历等待队列的全部节点,循环调用transferForSignal()方法,将等待队列中的节点所有移动到同步队列中。

总结

  • 本文主要介绍了AQS中等待队列的功能,对比了它与Object中对应方法的异同点。而后经过分析等待队列的数据结构,并结合示意图展现了await()方法和signal()方法的原理。最后经过具体的源码实现,详细分析了await()、signal()、signalAll()这三个方法。
  • Condition使用起来十分方便,开发人员只须要经过Lock.newCondition()方法就能建立Condition实例,屡次调用Lock.newCondition()方法,那么就会建立多个条件等待队列
  • Condition的应用十分普遍,咱们常常接触的有界队列LinkedBlockingQueue就是经过Condition来实现的,有兴趣的朋友能够先去阅读下源码。

相关推荐

微信公众号
相关文章
相关标签/搜索