先来回顾一下java中的等待/通知机制java
咱们有时会遇到这样的场景:线程A执行到某个点的时候,由于某个条件condition不知足,须要线程A暂停;等到线程B修改了条件condition,使condition知足了线程A的要求时,A再继续执行。node
最简单的实现方法就是将condition设为一个volatile的变量,当A线程检测到条件不知足时就自旋,相似下面:安全
public class Test { private static volatile int condition = 0; public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { while (!(condition == 1)) { // 条件不知足,自旋 } System.out.println("a executed"); } }); A.start(); Thread.sleep(2000); condition = 1; } }
这种方式的问题在于自旋很是耗费CPU资源,固然若是在自旋的代码块里加入Thread.sleep(time)将会减轻CPU资源的消耗,可是若是time设的太大,A线程就不能及时响应condition的变化,若是设的过小,依然会形成CPU的消耗。less
所以,java在Object类里提供了wait()和notify()方法,使用方法以下:ide
class Test1 { private static volatile int condition = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { synchronized (lock) { while (!(condition == 1)) { try { lock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } System.out.println("a executed by notify"); } } }); A.start(); Thread.sleep(2000); condition = 1; synchronized (lock) { lock.notify(); } } }
经过代码能够看出,在使用一个对象的wait()、notify()方法前必需要获取这个对象的锁。源码分析
当线程A调用了lock对象的wait()方法后,线程A将释放持有的lock对象的锁,而后将本身挂起,直到有其余线程调用notify()/notifyAll()方法或被中断。能够看到在lock.wait()前面检测condition条件的时候使用了一个while循环而不是if,那是由于当有其余线程把condition修改成知足A线程的要求并调用notify()后,A线程会从新等待获取锁,获取到锁后才从lock.wait()方法返回,而在A线程等待锁的过程当中,condition是有可能再次变化的。ui
由于wait()、notify()是和synchronized配合使用的,所以若是使用了显示锁Lock,就不能用了。因此显示锁要提供本身的等待/通知机制,Condition应运而生。this
咱们用Condition实现上面的例子:spa
class Test2 { private static volatile int condition = 0; private static Lock lock = new ReentrantLock(); private static Condition lockCondition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { lock.lock(); try { while (!(condition == 1)) { lockCondition.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } System.out.println("a executed by condition"); } }); A.start(); Thread.sleep(2000); condition = 1; lock.lock(); try { lockCondition.signal(); } finally { lock.unlock(); } } }
能够看到经过 lock.newCondition() 能够得到到 lock 对应的一个Condition对象lockCondition ,lockCondition的await()、signal()方法分别对应以前的Object的wait()和notify()方法。总体上和Object的等待通知是相似的。线程
上面咱们看到了Condition实现的等待通知和Object的等待通知是很是相似的,而Condition提供的等待通知功能更强大,最重要的一点是,一个lock对象能够经过屡次调用 lock.newCondition() 获取多个Condition对象,也就是说,在一个lock对象上,能够有多个等待队列,而Object的等待通知在一个Object上,只能有一个等待队列。用下面的例子说明,下面的代码实现了一个阻塞队列,当队列已满时,add操做被阻塞有其余线程经过remove方法删除元素;当队列已空时,remove操做被阻塞直到有其余线程经过add方法添加元素。
public class BoundedQueue1<T> { public List<T> q; //这个列表用来存队列的元素 private int maxSize; //队列的最大长度 private Lock lock = new ReentrantLock(); private Condition addConditoin = lock.newCondition(); private Condition removeConditoin = lock.newCondition(); public BoundedQueue1(int size) { q = new ArrayList<>(size); maxSize = size; } public void add(T e) { lock.lock(); try { while (q.size() == maxSize) { addConditoin.await(); } q.add(e); removeConditoin.signal(); //执行了添加操做后唤醒因队列空被阻塞的删除操做 } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } public T remove() { lock.lock(); try { while (q.size() == 0) { removeConditoin.await(); } T e = q.remove(0); addConditoin.signal(); //执行删除操做后唤醒因队列满而被阻塞的添加操做 return e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } }
下面来分析Condition源码
以前咱们介绍AQS的时候说过,AQS的同步排队用了一个隐式的双向队列,同步队列的每一个节点是一个AbstractQueuedSynchronizer.Node实例。
Node的主要字段有:
状态 | 值 | 含义 |
CANCELLED | 1 | 当前节点由于超时或中断被取消同步状态获取,该节点进入该状态不会再变化 |
SIGNAL | -1 | 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,须要通知后继节点继续运行。每一个节点在阻塞前,须要标记其前驱节点的状态为SIGNAL。 |
CONDITION | -2 | 标识当前节点是做为等待队列节点使用的。 |
PROPAGATE | -3 | |
0 | 0 | 初始状态 |
Condition实现等待的时候内部也有一个等待队列,等待队列是一个隐式的单向队列,等待队列中的每个节点也是一个AbstractQueuedSynchronizer.Node实例。
每一个Condition对象中保存了firstWaiter和lastWaiter做为队列首节点和尾节点,每一个节点使用Node.nextWaiter保存下一个节点的引用,所以等待队列是一个单向队列。
每当一个线程调用Condition.await()方法,那么该线程会释放锁,构形成一个Node节点加入到等待队列的队尾。
Condition.await()方法的源码以下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //构造一个新的等待队列Node加入到队尾 int savedState = fullyRelease(node); //释放当前线程的独占锁,无论重入几回,都把state释放为0 int interruptMode = 0;
//若是当前节点没有在同步队列上,即尚未被signal,则将当前线程阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this);
//后面的蓝色代码都是和中断相关的,主要是区分两种中断:是在被signal前中断仍是在被signal后中断,若是是被signal前就被中断则抛出 InterruptedException,不然执行 Thread.currentThread().interrupt(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直接退出自旋 break; }
//退出了上面自旋说明当前节点已经在同步队列上,可是当前节点不必定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程得到了锁。而后await()方法就能够退出了,让线程继续执行await()后的代码。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue(Node node) {
//若是当前节点状态是CONDITION或node.prev是null,则证实当前节点在等待队列上而不是同步队列上。之因此能够用node.prev来判断,是由于一个节点若是要加入同步队列,在加入前就会设置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//若是node.next不为null,则必定在同步队列上,由于node.next是在节点加入同步队列后设置的 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是否是当前节点。 } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
Condition.signal() 方法的源码以下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //若是同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。 Node first = firstWaiter; if (first != null) doSignal(first); //通知等待队列队首的节点。 }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //transferForSignal方法尝试唤醒当前节点,若是唤醒失败,则继续尝试唤醒当前节点的后继节点。 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //若是当前节点状态为CONDITION,则将状态改成0准备加入同步队列;若是当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点 int ws = p.waitStatus;
//若是先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就当即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会从新尝试将节点的先驱状态设为SIGNAL并再次park线程;若是当前设置前驱节点状态为SIGNAL成功,那么就不须要立刻唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。
//其实笔者认为这里不加这个判断条件应该也是能够的。只是对于CAS修改前驱节点状态为SIGNAL成功这种状况来讲,若是不加这个判断条件,提早唤醒了线程,等进入acquireQueued方法了节点发现本身的前驱不是首节点,还要再阻塞,等到其前驱节点成为首节点并释放锁时再唤醒一次;而若是加了这个条件,线程被唤醒的时候它的前驱节点确定是首节点了,线程就有机会直接获取同步状态从而避免二次阻塞,节省了硬件资源。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
总的来讲,Condition的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()时,它会执行如下步骤:
当一个持有锁的线程调用Condition.signal()时,它会执行如下操做:
从等待队列的队首开始,尝试对队首节点执行唤醒操做;若是节点CANCELLED,就尝试唤醒下一个节点;若是再CANCELLED则继续迭代。
对每一个节点执行唤醒操做时,首先将节点加入同步队列,此时await()操做的步骤3的解锁条件就已经开启了。而后分两种状况讨论:
若是知道Object的等待通知机制,Condition的使用是比较容易掌握的,由于和Object等待通知的使用基本一致。
对Condition的源码理解,主要就是理解等待队列,等待队列能够类比同步队列,并且等待队列比同步队列要简单,由于等待队列是单向队列,同步队列是双向队列。
如下是笔者对等待队列是单向队列、同步队列是双向队列的一些思考,欢迎提出不一样意见:
之因此同步队列要设计成双向的,是由于在同步队列中,节点唤醒是接力式的,由每个节点唤醒它的下一个节点,若是是由next指针获取下一个节点,是有可能获取失败的,由于虚拟队列每添加一个节点,是先用CAS把tail设置为新节点,而后才修改原tail的next指针到新节点的。所以用next向后遍历是不安全的,可是若是在设置新节点为tail前,为新节点设置prev,则能够保证从tail往前遍历是安全的。所以要安全的获取一个节点Node的下一个节点,先要看next是否是null,若是是null,还要从tail往前遍历看看能不能遍历到Node。
而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,所以每次要执行唤醒操做的时候,直接唤醒等待队列的首节点就好了。等待队列的实现中不须要遍历队列,所以也不须要prev指针。