Java并发编程知识点总结(十二)——Condition中await和signal通知机制


若是尚未看过AQS源码的朋友,建议先看一下AQS的源码,由于原理分析中会涉及到一些AQS的一些方法,若是看过会比较好理解。若是尚未看过的朋友,能够看一下个人这篇文章: AQS源码分析之独占锁AQS源码分析之共享锁

(一)、Object和Condition方法比较

咱们都知道Object的wait和notify/notifyAll()方法是与对象监视器配合完成线程之间的等待/通讯机制,而Condition是与Lock配合完成等待通知机制,前者是java底层级别的,后者则是语言级别的。
二者也存在着必定的区别:java

  1. Condition可以支持不响应中断,而使用Object方式不支持;
  2. Condition可以支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个等待队列;
  3. Condition可以支持超时时间的设置,而Object不支持;

(二)、await()方法具体实现

2.1 底层数据结构

咱们先来看下Condition接口吧,只是简单的定义了一些经常使用方法:node

public interface Condition {
    void await() throws InterruptedException;//使线程处于等待状态,响应中断
    void awaitUninterruptibly(); //使线程处于等待状态,不响应中断
    long awaitNanos(long nanosTimeout) throws InterruptedException; //限定等待的时间,响应中断
    void signal();//唤醒一个等待的线程,从等待队列转移到同步队列
    void signalAll();//唤醒全部的线程,从等待队列转移到同步队列
}

而Condition接口的具体实现是在AQS中的,类名叫作ConditionObjectweb

public class ConditionObject implements Condition, java.io.Serializable

其实Condition对象内部是维护了一个等待队列的,等待队列的是用单向的链表构成的,当使用await()方法,线程就会被添加到等待队列的尾部。 以下图
在这里插入图片描述
同时,咱们须要注意的是ConditionObject中的两个属性,分别指向第一个等待的线程和最后一个等待的线程。数据结构

private transient Node firstWaiter;
        private transient Node lastWaiter;

2.2 await()方法具体实现

咱们先来看看await()方法的源码,具体以下:ide

public final void await() throws InterruptedException {
			//判断线程是否被中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //将线程加入等待队列中
            Node node = addConditionWaiter();
            //充分释放持有的锁
            int savedState = fullyRelease(node);
            //中断标志
            int interruptMode = 0;
            //是否从等待队列中转移到同步队列中
            while (!isOnSyncQueue(node)) {
            	//若是还在等待队列中,阻塞该线程
                LockSupport.park(this);
                //判断是否被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //这个是AQS中的方法,以前介绍过了,这里就不赘述了
            //尝试得到锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //unlinkCancelledWaiters遍历全部的节点,删除标记为取消的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //查看是否中断
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

咱们经过上面的代码,有几点是须要注意的:svg

  1. addConditionWaiter()印证了咱们刚刚说的,这里维护着一个等待队列
  2. fullyRelease()是重点,它的主要功能是释放持有的锁,也就是说,要调用await()方法,必须先蚩尤锁,下面会具体进行解释。
  3. isOnSyncQueue()这个方法用来判断是否从等待队列加入到AQS的同步队列中,这也是个重点,下面会具体说

咱们先来看下addConditionWaiter()方法,源码分析

private Node addConditionWaiter() {
			//得到最后一个线程
            Node t = lastWaiter;
            // 判断最后一个线程是不是Condition状态
            if (t != null && t.waitStatus != Node.CONDITION) {
                //若是是,那就遍历一遍等待队列,清除掉全部不是condition状态的节点
                unlinkCancelledWaiters();
                //从新得到最后一个线程
                t = lastWaiter;
            }
            //根据当前线程的信息,建立一个节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //判断等待队列是不是空队列
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            //设置最后一个节点是当前线程
            lastWaiter = node;
            return node;
        }

咱们从上面的方法中就能够看到:addConditionWaiter()就是将当前线程加入到等待队列的最后一个位置。
接着,咱们来看fullyRelease方法,下面是它的源码。ui

final int fullyRelease(Node node) {
		//表示是否释放锁失败
        boolean failed = true;
        try {
        	//由于是重入锁,getState()得到重入了多少重
            int savedState = getState();
            //release方法是AQS中的方法,释放指定参数的锁
            if (release(savedState)) {
            	//释放成功
                failed = false;
                return savedState;
            } else {
            	//释放失败
                throw new IllegalMonitorStateException();
            }
        } finally {
        	//失败则标记为取消状态
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

经过上面fullyRelease方法,咱们能够看到首先会经过getState()方法,获取重入了多少锁,而后经过release方法释放掉。
接着咱们来看下isOnSyncQueue()方法this

final boolean isOnSyncQueue(Node node) {
		//判断当前状态是否为CONDITION,或者前驱节点是否为null
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
         //判断是否有后继节点
        if (node.next != null) // If has successor, it must be on queue
            return true;

        return findNodeFromTail(node);
    }

看完了这个源码,咱们再回到最开始的await()方法中的一个片断:spa

//是否从等待队列中转移到同步队列中
            while (!isOnSyncQueue(node)) {
            	//若是还在等待队列中,阻塞该线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

这个咱们前面已经讲过了。可是还有几点须要注意的:

  1. 这个while循环退出的条件有两个 1)、isOnSyncQueue()方法返回true 2)、线程被中断
  2. isOnSyncQueue()方法若是返回false,那么说明线程已经从等待队列中退出,而且加入到同步队列中了。具体示意图以下,和咱们前面说的其实同样。
    在这里插入图片描述
  3. 若是程序被中断了,一样也会退出这个while循环

至此,await()方法咱们就分析完了,后面的一些方法就比较容易了

(三)、await()其余方法实现

经过上面分析await()方法以后,其实await()带超时机制的方法其实实现也差很少,只是加上了一个超时等待的判断而已,若是超过限定时间了,就直接返回,例如:await(long time, TimeUnit unit)、awaitNanos(long nanosTimeout)。
下面咱们来看一下不响应中断的awaitUninterruptibly方法

public final void awaitUninterruptibly() {
			//将当前线程加入等待队列中
            Node node = addConditionWaiter();
            //充分释放锁
            int savedState = fullyRelease(node);
            //中断标志位
            boolean interrupted = false;
            //判断是否处于同步队列中
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

其实和上面的await()方法进行对比,awaitUninterruptibly和await()方法惟一的区别就是while循环中没有了由于中断而退出while循环的条件。

(四)、signal()方法实现原理

咱们来一块儿看下signal()方法的实现原理

public final void signal() {
 			//查看是否锁是否被当前线程独占
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取等待队列中的第一个线程
            Node first = firstWaiter;
            if (first != null)
            	//调用doSignal()方法唤醒线程
                doSignal(first);
        }

上面只是进行了简单的判断而已,调用了doSignal()方法:

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //将头节点从等待队列中移除
                first.nextWaiter = null;
                //调用transferForSignal对头节点进行唤醒
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

能够看到,真正的处理逻辑在transferForSignal中,下面来看一下

final boolean transferForSignal(Node node) {
		//尝试将Condition状态修改成0状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

		//enq是AQS中的方法,将节点添加到同步队列中的队尾
        Node p = enq(node);
        //得到状态
        int ws = p.waitStatus;
        //unpark()方法唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

从这个代码咱们能够清晰地看到,是如何将线程从等待队列移动到同步队列中的。
在这里插入图片描述

(五)、signalAll()方法实现原理

signalAll()方法是用来一次性将等待队列中的线程清空,所有逐一按顺序转移到同步队列当中,具体实现其实也是差很少,利用for循环遍历每个在等待队列中的节点。

private void doSignalAll(Node first) {
       		//清空变量,便于gc
            lastWaiter = firstWaiter = null;
            do {
            	//得到第二个节点
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                //这个方法前面已经讲过,添加等待队列的头节点到同步队列的尾部
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

(六)、Condition通知机制的具体实现

public static void main(String[] args) {
 		//锁实例
        ReentrantLock lock = new ReentrantLock();
        //Condition对象
        Condition condition = lock.newCondition();
		//线程1
        Thread thread1 = new Thread(){
            @Override
            public void run() {
                System.out.println("线程1还没执行await()方法");
                try {
                    lock.lock();
                    condition.await();
                    System.out.println("线程1被唤醒");
                    lock.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
		//线程2
        Thread thread2 = new Thread(){
            @Override
            public void run() {
                System.out.println("线程2还没执行signal()方法");
                lock.lock();
                condition.signal();
                System.out.println("线程2尝试唤醒线程1");
                lock.unlock();
            }
        };
        thread1.start();
        thread2.start();

    }

在这里插入图片描述
经过上面的代码,咱们使用线程1调用await()方法进行等待,使用线程2调用signal()方法进行唤醒,执行结果也是如咱们所想。你们应该不难理解。

(七)、总结

在这里插入图片描述
咱们经过上面的图来从新梳理一遍流程:

  1. 线程1处于同步队列中,得到锁成功。
  2. 线程1在执行过程当中调用Condition.await()方法,线程1释放锁,进入等待队列中
  3. 当线程1成为等待队列头部时,而且其余线程调用signal()方法时,会从新进入到同步队列的尾部进行排队
  4. 线程1在同步队列中从新尝试获取锁。

参考文章:详解Condition的await和signal等待/通知机制