Java并发编程,Condition的await和signal等待通知机制

Condition简介

Object类是Java中全部类的父类, 在线程间实现通讯的每每会应用到Object的几个方法: wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll() 实现等待/通知机制,一样的, 在Java Lock体系下依然会有一样的方法实现等待/通知机制。 从总体上来看Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,Condition与Lock配合完成等待/通知机制, 前者是Java底层级别的,后者是语言级别的,具备更高的可控制性和扩展性。 二者除了在使用方式上不一样外,在功能特性上仍是有不少的不一样:node

  • Condition可以支持不响应中断,而经过使用Object方式不支持git

  • Condition可以支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个github

  • Condition可以支持超时时间的设置,而Object不支持bash

参照Object的wait和notify/notifyAll方法,Condition也提供了一样的方法:并发

  • 针对Object的wait方法学习

void await() throws InterruptedException//当前线程进入等待状态,若是在等待状态中被中断会抛出被中断异常long awaitNanos(long nanosTimeout)//当前线程进入等待状态直到被通知,中断或者超时boolean await(long time, TimeUnit unit)throws InterruptedException//同第二种,支持自定义时间单位boolean awaitUntil(Date deadline) throws InterruptedException//当前线程进入等待状态直到被通知,中断或者到了某个时间复制代码
  • 针对Object的notify/notifyAll方法ui

void signal()//唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,若是在同步队列中可以竞争到Lock则能够从等待方法中返回。void signalAll()//与1的区别在于可以唤醒全部等待在condition上的线程复制代码

Condition实现原理分析

等待队列

建立一个Condition对象是经过lock.newCondition(), 而这个方法其实是会建立ConditionObject对象,该类是AQS的一个内部类。 Condition是要和Lock配合使用的也就是Condition和Lock是绑定在一块儿的,而lock的实现原理又依赖于AQS, 天然而然ConditionObject做为AQS的一个内部类无可厚非。 咱们知道在锁机制的实现上,AQS内部维护了一个同步队列,若是是独占式锁的话, 全部获取锁失败的线程的尾插入到同步队列, 一样的,Condition内部也是使用一样的方式,内部维护了一个等待队列, 全部调用condition.await方法的线程会加入到等待队列中,而且线程状态转换为等待状态。 另外注意到ConditionObject中有两个成员变量:this

/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;复制代码

ConditionObject经过持有等待队列的头尾指针来管理等待队列。 注意Node类复用了在AQS中的Node类,Node类有这样一个属性:spa

//后继节点Node nextWaiter;复制代码

等待队列是一个单向队列,而在以前说AQS时知道同步队列是一个双向队列。线程

等待队列示意图:

13_00.png


注意: 咱们能够屡次调用lock.newCondition()方法建立多个Condition对象,也就是一个lLock能够持有多个等待队列。 利用Object的方式其实是指在对象Object对象监视器上只能拥有一个同步队列和一个等待队列; 并发包中的Lock拥有一个同步队列和多个等待队列。示意图以下:

13_01.png


ConditionObject是AQS的内部类, 所以每一个ConditionObject可以访问到AQS提供的方法,至关于每一个Condition都拥有所属同步器的引用。

await实现原理

当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列, 若是该线程可以从await()方法返回的话必定是该线程获取了与condition相关联的lock。 await()方法源码以下:

public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();	// 1. 将当前线程包装成Node,尾插法插入到等待队列中
    Node node = addConditionWaiter();	// 2. 释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);    int interruptMode = 0;    while (!isOnSyncQueue(node)) {	// 3. 当前线程进入到等待状态
        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;
    }	// 4. 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();	// 5. 处理被中断的状况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}复制代码

当前线程调用condition.await()方法后,会使得当前线程释放lock而后加入到等待队列中, 直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去, 直到得到了lock后才会从await方法返回,或者在等待时被中断会作中断处理。

addConditionWaiter()将当前线程添加到等待队列中,其源码以下:

private Node addConditionWaiter() {    Node t = lastWaiter;    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }	//将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);    if (t == null) //t==null,同步队列为空的状况
        firstWaiter = node;    else
//尾插法
        t.nextWaiter = node;	//更新lastWaiter
    lastWaiter = node;    return node;
}复制代码

这里经过尾插法将当前线程封装的Node插入到等待队列中, 同时能够看出等待队列是一个不带头结点的链式队列,以前咱们学习AQS时知道同步队列是一个带头结点的链式队列。

将当前节点插入到等待对列以后,使用fullyRelease(0)方法释放当前线程释放lock,源码以下:

final int fullyRelease(Node node) {    boolean failed = true;    try {        int savedState = getState();        if (release(savedState)) {	//成功释放同步状态
            failed = false;            return savedState;
        } else {	//不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}复制代码

调用AQS的模板方法release()方法释放AQS的同步状态而且唤醒在同步队列中头结点的后继节点引用的线程, 若是释放成功则正常返回,若失败的话就抛出异常。

如何从await()方法中退出?再看await()方法有这样一段代码:

while (!isOnSyncQueue(node)) {	// 3. 当前线程进入到等待状态
    LockSupport.park(this);    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)        break;
}复制代码

当线程第一次调用condition.await()方法时, 会进入到这个while()循环中,而后经过LockSupport.park(this)方法使得当前线程进入等待状态, 那么要想退出这个await方法就要先退出这个while循环,退出while循环的出口有2个:


  1. break退出while循环


  1. while循环中的逻辑判断为false

第1种状况的条件是当前等待的线程被中断后会走到break退出,

第2种状况是当前节点被移动到了同步队列中,(即另外线程调用的condition的signal或者signalAll方法), while中逻辑判断为false后结束while循环。

当退出while循环后就会调用acquireQueued(node, savedState),该方法的做用是 在自旋过程当中线程不断尝试获取同步状态,直至成功(线程获取到lock)。

这样就说明了退出await方法必须是已经得到了Condition引用(关联)的Lock。

await方法示意图以下:

13_02.png


调用condition.await方法的线程必须是已经得到了lock,也就是当前线程是同步队列中的头结点。 调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。

超时机制的支持

condition还额外支持了超时机制,使用者可调用方法awaitNanos,awaitUtil。 这两个方法的实现原理,基本上与AQS中的tryAcquire方法一模一样。

不响应中断的支持

调用condition.awaitUninterruptibly()方法,该方法的源码为:

public final void awaitUninterruptibly() {    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    boolean interrupted = false;    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        if (Thread.interrupted())
            interrupted = true;
    }    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}复制代码

与上面的await方法基本一致,只不过减小了对中断的处理, 并省略了reportInterruptAfterWait方法抛被中断的异常。

signal和signalAll实现原理

调用Condition的signal或者signalAll方法能够将 等待队列中等待时间最长的节点移动到同步队列中,使得该节点可以有机会得到lock。 按照等待队列是先进先出(FIFO)的, 因此等待队列的头节点必然会是等待时间最长的节点, 也就是每次调用condition的signal方法是将头节点移动到同步队列中。 signal()源码以下:

public final void signal() {    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    //2. 获取等待队列中第一个节点,以后的操做都是针对这个节点
Node first = firstWaiter;    if (first != null)
        doSignal(first);
}复制代码

signal方法首先会检测当前线程是否已经获取lock, 若是没有获取lock会直接抛出异常,若是获取的话再获得等待队列的头指针引用的节点,doSignal方法也是基于该节点。 doSignal方法源码以下:

private void doSignal(Node first) {    do {        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;	//1. 将头结点从等待队列中移除
        first.nextWaiter = null;	//2. while中transferForSignal方法对头结点作真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}复制代码

真正对头节点作处理的是transferForSignal(),该方法源码以下:

final boolean transferForSignal(Node node) {	//1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;	//2.将该节点移入到同步队列中去
    Node p = enq(node);    int ws = p.waitStatus;    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;
}复制代码

这段代码主要作了两件事情:

  • 1.将头结点的状态更改成CONDITION

  • 2.调用enq方法,将该节点尾插入到同步队列中

调用condition的signal的前提条件是 当前线程已经获取了lock,该方法会使得等待队列中的头节点(等待时间最长的那个节点)移入到同步队列, 而移入到同步队列后才有机会使得等待线程被唤醒, 即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出。

signal方法示意图以下:

13_03.png


signalAll

sigllAll与sigal方法的区别体如今doSignalAll方法上。doSignalAll()的源码以下:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;    do {        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}复制代码

doSignal方法只会对等待队列的头节点进行操做,而doSignalAll方法将等待队列中的每个节点都移入到同步队列中, 即“通知”当前调用condition.await()方法的每个线程。

await与signal和signalAll的结合

await和signal和signalAll方法就像一个开关控制着线程A(等待方)和线程B(通知方)。 它们之间的关系能够用下面一个图来表现得更加贴切:

13_04.png


线程awaitThread先经过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列, 而另外一个线程signalThread经过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法, 使得线程awaitThread可以有机会移入到同步队列中, 当其余线程释放lock后使得线程awaitThread可以有机会获取lock, 从而使得线程awaitThread可以从await方法中退出,而后执行后续操做。 若是awaitThread获取lock失败会直接进入到同步队列。

相关文章
相关标签/搜索