详解Condition的await和signal等待/通知机制

1.Condition简介

任何一个java对象都自然继承于Object类,在线程间实现通讯的每每会应用到Object的几个方法,好比wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()几个方法实现等待/通知机制,一样的, 在java Lock体系下依然会有一样的方法实现等待/通知机制。从总体上来看Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具备更高的可控制性和扩展性。二者除了在使用方式上不一样外,在功能特性上仍是有不少的不一样:java

  1. Condition可以支持不响应中断,而经过使用Object方式不支持;
  2. Condition可以支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
  3. Condition可以支持超时时间的设置,而Object不支持

参照Object的wait和notify/notifyAll方法,Condition也提供了一样的方法:node

针对Object的wait方法面试

  1. void await() throws InterruptedException:当前线程进入等待状态,若是其余线程调用condition的signal或者signalAll方法而且当前线程获取Lock从await方法返回,若是在等待状态中被中断会抛出被中断异常;
  2. long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时
  3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
  4. boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

针对Object的notify/notifyAll方法编程

  1. void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,若是在同步队列中可以竞争到Lock则能够从等待方法中返回。
  2. void signalAll():与1的区别在于可以唤醒全部等待在condition上的线程

2.Condition实现原理分析

2.1 等待队列

要想可以深刻的掌握condition仍是应该知道它的实现原理,如今咱们一块儿来看看condiiton的源码。建立一个condition对象是经过lock.newCondition(),而这个方法其实是会new出一个ConditionObject对象,该类是AQS(AQS的实现原理的文章)的一个内部类,有兴趣能够去看看。前面咱们说过,condition是要和lock配合使用的也就是condition和Lock是绑定在一块儿的,而lock的实现原理又依赖于AQS,天然而然ConditionObject做为AQS的一个内部类无可厚非。咱们知道在锁机制的实现上,AQS内部维护了一个同步队列,若是是独占式锁的话,全部获取锁失败的线程的尾插入到同步队列,一样的,condition内部也是使用一样的方式,内部维护了一个 等待队列,全部调用condition.await方法的线程会加入到等待队列中,而且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量:并发

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
复制代码

这样咱们就能够看出来ConditionObject经过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类,其节点状态和相关属性能够去看AQS的实现原理的文章,若是您仔细看完这篇文章对condition的理解易如反掌,对lock体系的实现也会有一个质的提高。Node类有这样一个属性:less

//后继节点
Node nextWaiter;
复制代码

进一步说明,等待队列是一个单向队列,而在以前说AQS时知道同步队列是一个双向队列。接下来咱们用一个demo,经过debug进去看是否是符合咱们的猜测:ide

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}
复制代码

这段代码没有任何实际意义,甚至很臭,只是想说明下咱们刚才所想的。新建了10个线程,没有线程先获取锁,而后调用condition.await方法释放锁将当前线程加入到等待队列中,经过debug控制当走到第10个线程的时候查看firstWaiter即等待队列中的头结点,debug模式下情景图以下:post

debug模式下情景图

从这个图咱们能够很清楚的看到这样几点:1. 调用condition.await方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2....Thread-8;2. 等待队列是一个单向队列。经过咱们的猜测而后进行实验验证,咱们能够得出等待队列的示意图以下图所示:学习

等待队列的示意图

同时还有一点须要注意的是:咱们能够屡次调用lock.newCondition()方法建立多个condition对象,也就是一个lock能够持有多个等待队列。而在以前利用Object的方式其实是指在对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。示意图以下:ui

AQS持有多个Condition.png

如图所示,ConditionObject是AQS的内部类,所以每一个ConditionObject可以访问到AQS提供的方法,至关于每一个Condition都拥有所属同步器的引用。

2.2 await实现原理

当调用condition.await()方法后会使得当前获取lock的线程进入到等待队列,若是该线程可以从await()方法返回的话必定是该线程获取了与condition相关联的lock。接下来,咱们仍是从源码的角度去看,只有熟悉了源码的逻辑咱们的理解才是最深的。await()方法源码为:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	// 1. 将当前线程包装成Node,尾插入到等待队列中
    Node node = addConditionWaiter();
	// 2. 释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
		// 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
	// 4. 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
	// 5. 处理被中断的状况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
复制代码

代码的主要逻辑请看注释,咱们都知道当当前线程调用condition.await()方法后,会使得当前线程释放lock而后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到得到了lock后才会从await方法返回,或者在等待时被中断会作中断处理。那么关于这个实现过程咱们会有这样几个问题:1. 是怎样将当前线程添加到等待队列中去的?2.释放锁的过程?3.怎样才能从await方法退出?而这段代码的逻辑就是告诉咱们这三个问题的答案。具体请看注释,在第1步中调用addConditionWaiter将当前线程添加到等待队列中,该方法源码为:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
	//将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
		//尾插入
        t.nextWaiter = node;
	//更新lastWaiter
    lastWaiter = node;
    return node;
}
复制代码

这段代码就很容易理解了,将当前节点包装成Node,若是等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,不然,更新lastWaiter(尾节点)便可。就是经过尾插入的方式将当前线程封装的Node插入到等待队列中便可,同时能够看出等待队列是一个不带头结点的链式队列,以前咱们学习AQS时知道同步队列是一个带头结点的链式队列,这是二者的一个区别。将当前节点插入到等待对列以后,会使当前线程释放lock,由fullyRelease方法实现,fullyRelease源码为:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
			//成功释放同步状态
            failed = false;
            return savedState;
        } else {
			//不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码

这段代码就很容易理解了,调用AQS的模板方法release方法释放AQS的同步状态而且唤醒在同步队列中头结点的后继节点引用的线程,若是释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await方法退出?如今回过头再来看await方法有这样一段逻辑:

while (!isOnSyncQueue(node)) {
	// 3. 当前线程进入到等待状态
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
复制代码

很显然,当线程第一次调用condition.await()方法时,会进入到这个while()循环中,而后经过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件天然而然的是要先退出这个while循环,出口就只剩下两个地方:1. 逻辑走到break退出while循环;2. while循环中的逻辑判断为false。再看代码出现第1种状况的条件是当前等待的线程被中断后代码会走到break退出,第二种状况是当前节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环。总结下,就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState),这个方法在介绍AQS的底层实现时说过了,若感兴趣的话能够去看这篇文章,该方法的做用是在自旋过程当中线程不断尝试获取同步状态,直至成功(线程获取到lock)。这样也说明了退出await方法必须是已经得到了condition引用(关联)的lock。到目前为止,开头的三个问题咱们经过阅读源码的方式已经彻底找到了答案,也对await方法的理解加深。await方法示意图以下图:

await方法示意图

如图,调用condition.await方法的线程必须是已经得到了lock,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。

超时机制的支持

condition还额外支持了超时机制,使用者可调用方法awaitNanos,awaitUtil。这两个方法的实现原理,基本上与AQS中的tryAcquire方法一模一样,关于tryAcquire能够仔细阅读这篇文章的第3.4部分

不响应中断的支持

要想不响应中断能够调用condition.awaitUninterruptibly()方法,该方法的源码为:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}
复制代码

这段方法与上面的await方法基本一致,只不过减小了对中断的处理,并省略了reportInterruptAfterWait方法抛被中断的异常。

2.3 signal/signalAll实现原理

调用condition的signal或者signalAll方法能够将等待队列中等待时间最长的节点移动到同步队列中,使得该节点可以有机会得到lock。按照等待队列是先进先出(FIFO)的,因此等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition的signal方法是将头节点移动到同步队列中。咱们来经过看源码的方式来看这样的猜测是否是对的,signal方法源码为:

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取等待队列中第一个节点,以后的操做都是针对这个节点
	Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
复制代码

signal方法首先会检测当前线程是否已经获取lock,若是没有获取lock会直接抛出异常,若是获取的话再获得等待队列的头指针引用的节点,以后的操做的doSignal方法也是基于该节点。下面咱们来看看doSignal方法作了些什么事情,doSignal方法源码为:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
		//1. 将头结点从等待队列中移除
        first.nextWaiter = null;
		//2. while中transferForSignal方法对头结点作真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
复制代码

具体逻辑请看注释,真正对头节点作处理的逻辑在transferForSignal放,该方法源码为:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
	//1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
	//2.将该节点移入到同步队列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
复制代码

关键逻辑请看注释,这段代码主要作了两件事情1.将头结点的状态更改成CONDITION;2.调用enq方法,将该节点尾插入到同步队列中,关于enq方法请看AQS的底层实现这篇文章。如今咱们能够得出结论:调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出。signal执行示意图以下图:

signal执行示意图

signalAll

sigllAll与sigal方法的区别体如今doSignalAll方法上,前面咱们已经知道doSignal方法只会对等待队列的头节点进行操做,,而doSignalAll的源码为:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
复制代码

该方法只不过期间等待队列中的每个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每个线程。

3. await与signal/signalAll的结合思考

文章开篇提到等待/通知机制,经过使用condition提供的await和signal/signalAll方法就能够实现这种机制,而这种机制可以解决最经典的问题就是“生产者与消费者问题”,关于“生产者消费者问题”以后会用单独的一篇文章进行讲解,这也是面试的高频考点。await和signal和signalAll方法就像一个开关控制着线程A(等待方)和线程B(通知方)。它们之间的关系能够用下面一个图来表现得更加贴切:

condition下的等待通知机制.png

如图,线程awaitThread先经过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另外一个线程signalThread经过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread可以有机会移入到同步队列中,当其余线程释放lock后使得线程awaitThread可以有机会获取lock,从而使得线程awaitThread可以从await方法中退出执行后续操做。若是awaitThread获取lock失败会直接进入到同步队列

3. 一个例子

咱们用一个很简单的例子说说condition的用法:

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "当前条件不知足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知条件知足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}
复制代码

输出结果为:

Thread-0当前条件不知足等待
Thread-0接收到通知,条件知足
复制代码

开启了两个线程waiter和signaler,waiter线程开始执行的时候因为条件不知足,执行condition.await方法使该线程进入等待状态同时释放锁,signaler线程获取到锁以后更改条件,并通知全部的等待线程后释放锁。这时,waiter线程获取到锁,并因为signaler线程更改了条件此时相对于waiter来讲条件知足,继续执行。

参考文献

《java并发编程的艺术》

相关文章
相关标签/搜索