咱们都知道Object的wait和notify/notifyAll()方法是与对象监视器配合完成线程之间的等待/通讯机制,而Condition是与Lock配合完成等待通知机制,前者是java底层级别的,后者则是语言级别的。
二者也存在着必定的区别:java
咱们先来看下Condition接口吧,只是简单的定义了一些经常使用方法:node
public interface Condition { void await() throws InterruptedException;//使线程处于等待状态,响应中断 void awaitUninterruptibly(); //使线程处于等待状态,不响应中断 long awaitNanos(long nanosTimeout) throws InterruptedException; //限定等待的时间,响应中断 void signal();//唤醒一个等待的线程,从等待队列转移到同步队列 void signalAll();//唤醒全部的线程,从等待队列转移到同步队列 }
而Condition接口的具体实现是在AQS中的,类名叫作ConditionObjectweb
public class ConditionObject implements Condition, java.io.Serializable
其实Condition对象内部是维护了一个等待队列的,等待队列的是用单向的链表构成的,当使用await()方法,线程就会被添加到等待队列的尾部。 以下图
同时,咱们须要注意的是ConditionObject中的两个属性,分别指向第一个等待的线程和最后一个等待的线程。数据结构
private transient Node firstWaiter; private transient Node lastWaiter;
咱们先来看看await()方法的源码,具体以下:ide
public final void await() throws InterruptedException { //判断线程是否被中断 if (Thread.interrupted()) throw new InterruptedException(); //将线程加入等待队列中 Node node = addConditionWaiter(); //充分释放持有的锁 int savedState = fullyRelease(node); //中断标志 int interruptMode = 0; //是否从等待队列中转移到同步队列中 while (!isOnSyncQueue(node)) { //若是还在等待队列中,阻塞该线程 LockSupport.park(this); //判断是否被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //这个是AQS中的方法,以前介绍过了,这里就不赘述了 //尝试得到锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //unlinkCancelledWaiters遍历全部的节点,删除标记为取消的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //查看是否中断 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
咱们经过上面的代码,有几点是须要注意的:svg
咱们先来看下addConditionWaiter()方法,源码分析
private Node addConditionWaiter() { //得到最后一个线程 Node t = lastWaiter; // 判断最后一个线程是不是Condition状态 if (t != null && t.waitStatus != Node.CONDITION) { //若是是,那就遍历一遍等待队列,清除掉全部不是condition状态的节点 unlinkCancelledWaiters(); //从新得到最后一个线程 t = lastWaiter; } //根据当前线程的信息,建立一个节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); //判断等待队列是不是空队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; //设置最后一个节点是当前线程 lastWaiter = node; return node; }
咱们从上面的方法中就能够看到:addConditionWaiter()就是将当前线程加入到等待队列的最后一个位置。
接着,咱们来看fullyRelease方法,下面是它的源码。ui
final int fullyRelease(Node node) { //表示是否释放锁失败 boolean failed = true; try { //由于是重入锁,getState()得到重入了多少重 int savedState = getState(); //release方法是AQS中的方法,释放指定参数的锁 if (release(savedState)) { //释放成功 failed = false; return savedState; } else { //释放失败 throw new IllegalMonitorStateException(); } } finally { //失败则标记为取消状态 if (failed) node.waitStatus = Node.CANCELLED; } }
经过上面fullyRelease方法,咱们能够看到首先会经过getState()方法,获取重入了多少锁,而后经过release方法释放掉。
接着咱们来看下isOnSyncQueue()方法this
final boolean isOnSyncQueue(Node node) { //判断当前状态是否为CONDITION,或者前驱节点是否为null if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //判断是否有后继节点 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }
看完了这个源码,咱们再回到最开始的await()方法中的一个片断:spa
//是否从等待队列中转移到同步队列中 while (!isOnSyncQueue(node)) { //若是还在等待队列中,阻塞该线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
这个咱们前面已经讲过了。可是还有几点须要注意的:
至此,await()方法咱们就分析完了,后面的一些方法就比较容易了
经过上面分析await()方法以后,其实await()带超时机制的方法其实实现也差很少,只是加上了一个超时等待的判断而已,若是超过限定时间了,就直接返回,例如:await(long time, TimeUnit unit)、awaitNanos(long nanosTimeout)。
下面咱们来看一下不响应中断的awaitUninterruptibly方法
public final void awaitUninterruptibly() { //将当前线程加入等待队列中 Node node = addConditionWaiter(); //充分释放锁 int savedState = fullyRelease(node); //中断标志位 boolean interrupted = false; //判断是否处于同步队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
其实和上面的await()方法进行对比,awaitUninterruptibly和await()方法惟一的区别就是while循环中没有了由于中断而退出while循环的条件。
咱们来一块儿看下signal()方法的实现原理
public final void signal() { //查看是否锁是否被当前线程独占 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //获取等待队列中的第一个线程 Node first = firstWaiter; if (first != null) //调用doSignal()方法唤醒线程 doSignal(first); }
上面只是进行了简单的判断而已,调用了doSignal()方法:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //将头节点从等待队列中移除 first.nextWaiter = null; //调用transferForSignal对头节点进行唤醒 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
能够看到,真正的处理逻辑在transferForSignal中,下面来看一下
final boolean transferForSignal(Node node) { //尝试将Condition状态修改成0状态 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //enq是AQS中的方法,将节点添加到同步队列中的队尾 Node p = enq(node); //得到状态 int ws = p.waitStatus; //unpark()方法唤醒线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
从这个代码咱们能够清晰地看到,是如何将线程从等待队列移动到同步队列中的。
signalAll()方法是用来一次性将等待队列中的线程清空,所有逐一按顺序转移到同步队列当中,具体实现其实也是差很少,利用for循环遍历每个在等待队列中的节点。
private void doSignalAll(Node first) { //清空变量,便于gc lastWaiter = firstWaiter = null; do { //得到第二个节点 Node next = first.nextWaiter; first.nextWaiter = null; //这个方法前面已经讲过,添加等待队列的头节点到同步队列的尾部 transferForSignal(first); first = next; } while (first != null); }
public static void main(String[] args) { //锁实例 ReentrantLock lock = new ReentrantLock(); //Condition对象 Condition condition = lock.newCondition(); //线程1 Thread thread1 = new Thread(){ @Override public void run() { System.out.println("线程1还没执行await()方法"); try { lock.lock(); condition.await(); System.out.println("线程1被唤醒"); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } }; //线程2 Thread thread2 = new Thread(){ @Override public void run() { System.out.println("线程2还没执行signal()方法"); lock.lock(); condition.signal(); System.out.println("线程2尝试唤醒线程1"); lock.unlock(); } }; thread1.start(); thread2.start(); }
经过上面的代码,咱们使用线程1调用await()方法进行等待,使用线程2调用signal()方法进行唤醒,执行结果也是如咱们所想。你们应该不难理解。
咱们经过上面的图来从新梳理一遍流程: