系列传送门:java
Contition是一种广义上的条件队列,它利用await()和signal()为线程提供了一种更为灵活的等待/通知模式。node
图源:《Java并发编程的艺术》编程
Condition必需要配合Lock一块儿使用,由于对共享状态变量的访问发生在多线程环境下。c#
一个Condition的实例必须与一个Lock绑定,所以await和signal的调用必须在lock和unlock之间,有锁以后,才能使用condition嘛。以ReentrantLock为例,简单使用以下:多线程
public class ConditionTest { public static void main(String[] args) { final ReentrantLock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); Thread thread1 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " <==成功获取到锁" + lock); try { System.out.println(name + " <==进入条件队列等待"); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " <==醒了"); lock.unlock(); System.out.println(name + " <==释放锁"); }, "等待线程"); thread1.start(); Thread thread2 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " ==>成功获取到锁" + lock); try { System.out.println("========== 这里演示await中的线程没有被signal的时候会一直等着 ==========="); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " ==>通知等待队列的线程"); condition.signal(); lock.unlock(); System.out.println(name + " ==>释放锁"); }, "通知线程"); thread2.start(); } } 等待线程 <==成功获取到锁java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待线程] 等待线程 <==进入条件队列等待 通知线程 ==>成功获取到锁java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知线程] ========== 这里演示await中的线程没有被signal的时候会一直等着 =========== 通知线程 ==>通知等待队列的线程 通知线程 ==>释放锁 等待线程 <==醒了 等待线程 <==释放锁
接下来咱们将从源码的角度分析上面这个流程,理解所谓条件队列的内涵。并发
AQS,Lock,Condition,ConditionObject
之间的关系:less
ConditionObject是AQS的内部类,实现了Condition接口,Lock中提供newCondition()方法,委托给内部AQS的实现Sync来建立ConditionObject对象,享受AQS对Condition的支持。oop
// ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // Sync#newCondition final ConditionObject newCondition() { // 返回Contition的实现,定义在AQS中 return new ConditionObject(); }
ConditionObject用来结合锁实现线程同步,ConditionObject能够直接访问AQS对象内部的变量,好比state状态值和AQS队列。源码分析
ConditionObject是条件变量,每一个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,ConditionObject维护了首尾节点,没错这里的Node就是咱们以前在学习AQS的时候见到的那个Node,咱们会在下面回顾:post
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** 条件队列的第一个节点. */ private transient Node firstWaiter; /** 条件队列的最后一个节点. */ private transient Node lastWaiter; }
看到这里咱们须要明确这里的条件队列和咱们以前说的AQS同步队列是不同的:
这里着重说明一下,接下来的源码学习部分,咱们会将两个队列进行区分,涉及到同步队列和阻塞队列的描述,意味着是AQS的同步队列,而条件队列指的是Condition队列,望读者知晓。
这里咱们针对上面的demo来分析一下会更好理解一些:
为了简化,接下来我将用D表示等待线程,用T表示通知线程。
lock.lock()
方法,此时无竞争,【D】被加入到AQS同步队列中。condition.await()
方法,此时【D】被构建为等待节点并加入到condition对应的条件等待队列中,并从AQS同步队列中移除。condition.signal()
方法,这时condition对应的条件队列中只有一个节点【D】,因而【D】被取出,并被再次加入AQS的等待队列中。此时【D】并无被唤醒,只是单纯换了个位置。lock.unlock()
,释放锁锁以后,会唤醒AQS队列中的【D】,此时【D】真正被唤醒且执行。OK,lock -> await -> signal -> unlock
这一套流程相信已经大概可以理解,接下来咱们试着看看源码吧。
咱们这里再简单回顾一下AQS中Node类与Condition相关的字段:
// 记录当前线程的等待状态, volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // node存储的线程 volatile Thread thread; // 当前节点在Condition中等待队列上的下一个节点 Node nextWaiter;
waitStatus能够取五种状态:
固然,除了-2这个condition状态,其余的等待状态咱们以前都或多或少分析过,今天着重学习condition这个状态的意义。
咱们还能够看到一个Node类型的nextWaiter,它表示条件队列中当前节点的下一个节点,能够看出用以实现条件队列的单向链表。
调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。
其实就是从AQS同步队列的首节点,注意不是head,而是获取了锁的节点,移动到Condition的等待队列中。
了解这些以后,咱们直接来看看具体方法的源码:
public final void await() throws InterruptedException { // 这个方法是响应中断的 if (Thread.interrupted()) throw new InterruptedException(); // 添加到条件队列中 Node node = addConditionWaiter(); // 释放同步资源,也就是释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 若是这个节点的线程不在同步队列中,说明该线程还不具有竞争锁的资格 while (!isOnSyncQueue(node)) { // 挂起线程 LockSupport.park(this); // 若是线程中断,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 上面的循环退出有两种状况: // 1. isOnSyncQueue(node) 为true,即当前的node已经转移到阻塞队列了 // 2. checkInterruptWhileWaiting != 0, 表示线程中断 // 退出循环,被唤醒以后,进入阻塞队列,等待获取锁 acquireQueued if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter() 是将当前节点加入到条件队列中:
private Node addConditionWaiter() { Node t = lastWaiter; // 若是lastWaiter被取消了,将其清除 if (t != null && t.waitStatus != Node.CONDITION) { // 遍历整个条件队列,将已取消的全部节点清除出列 unlinkCancelledWaiters(); // t从新赋值一下,由于last可能改变了 t = lastWaiter; } //注意这里,node在初始化的时候,会指定ws为CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // t == null 表示队列此时为空,初始化firstWaiter if (t == null) firstWaiter = node; else t.nextWaiter = node;// 入队尾 lastWaiter = node;// 将尾指针指向新建的node return node; }
unlinkCancelledWaiters用于清除队列中已经取消等待的节点。
private void unlinkCancelledWaiters() { Node t = firstWaiter; // trail这里表示取消节点的前驱节点 Node trail = null; // t会从头至尾遍历这个单链表 while (t != null) { // next用于保存下一个 Node next = t.nextWaiter; // 若是发现当前这个节点 不是 condition了, 那么考虑移除它 // 下面是单链表的移除节点操做 简单来讲就是 trail.next = t.next if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; // 说明first就是否是condition了 if (trail == null) firstWaiter = next; else //trail.next = t.next trail.nextWaiter = next; // trail后面没东西,天然trail就是lastWaiter了 if (next == null) lastWaiter = trail; } // 当前节点是一直跟到不是condition节点的上一个 else trail = t; // 向后遍历 t = t.next t = next; } }
总结一下addConditionWaiter的过程:
将节点加入等待队列中后,就须要彻底释放线程拥有的独占锁了,彻底释放针对重入锁的状况。咱们能够拉到await()方法中看看,将会调用:int savedState = fullyRelease(node);
,这句话有什么内涵呢?
咱们看到这个方法返回了一个savedState变量,简单的理解就是保存状态。咱们知道重入锁的state由重入的次数,若是一个state为N,咱们能够认为它持有N把锁。
await()方法必须将state置0,也就是彻底释放锁,后面的线程才能获取到这把锁,置0以后,咱们须要用个变量标记一下,也就是这里的savedState。
这样它被从新唤醒的时候,咱们就知道,他须要获取savedState把锁。
final int fullyRelease(Node node) { boolean failed = true; try { // 获取当前的state值,重入次数 int savedState = getState(); // 释放N = savedState资源 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 若是获取失败,将会将节点设置为取消状态,并抛出异常 if (failed) node.waitStatus = Node.CANCELLED; } }
这里其实咱们就会明白开头说的:若是某个线程没有获取lock,就直接调用condition的await()方法,结果是什么呢,在release的时候抛出异常,而后节点被取消,以后节点进来的时候,将它清理掉。
ok,彻底释放锁以后,将会来到这几步,若是这个节点的线程不在同步队列中,说明该线程还不具有竞争锁的资格,将被一直挂起,这里的同步队列指的是AQS的阻塞队列。
int interruptMode = 0; // 若是这个节点的线程不在同步队列中,说明该线程还不具有竞争锁的资格,会一直挂起 while (!isOnSyncQueue(node)) { // 挂起线程 LockSupport.park(this); // 若是线程中断,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
下面这个方法会判断节点是否是已经到阻塞队列中了,若是是的话,就直接返回true,这个方法的必要性是什么呢?
其实啊,这里须要提早说一下signal()方法,signal的做用和await()方法,将在等待队列中阻塞的节点移动到AQS同步队列中,这个方法就是说判断一下这个节点是否是移过去了。
final boolean isOnSyncQueue(Node node) { // 1. 节点的等待状态仍是condition表示还在等待队列中 // 2. node.prev == null 表示还没移到阻塞队列中[prev和next都是阻塞队列中用的] if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node已经有了后继节点,表示已经在阻塞队列中了 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 来到这里的状况:ws != condition && node.prev != null && node.next == null // 想一想:为何node.prev != null不能做为判断不在阻塞队列的依据呢? // CAS首先设置node.prev 指向tail,这个时候node.prev 是不为null的,但CAS可能会失败 return findNodeFromTail(node); }
为何node.prev != null不能做为判断不在阻塞队列的依据呢?
官方给出了解答: 由于CAS的入队操做中,首先设置node.prev 指向tail,这个时候node.prev 是不为null的。你可以说他入队成功必定成功吗?不必定,由于CAS可能会失败,因此要findNodeFromTail(node)。
从阻塞队列的尾部向前遍历,若是找到这个node,表示它已经在了,那就返回true。
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { // 已经有了 if (t == node) return true; // 尾都没有,找啥呢,返回false if (t == null) return false; // 一直往前找 t = t.prev; } }
因为以前节点被加入等待队列将会一直阻塞,为了连贯性,咱们来看看唤醒它的signal方法吧:
以前说到,若是这个线程会在等待队列中等待,那么唤醒它的signal方法的流程是怎么样的呢,前面其实已经说了一丢丢了,咱们猜想,signal会将isOnSyncQueue方法的循环打破,接下来看看是否是这样子的。
public final void signal() { // 同样的,必须占有当前这个锁才能用signal方法 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
该方法会从头至尾遍历条件队列,找到须要移到同步队列的节点。
private void doSignal(Node first) { do { // firstWaiter 指向first的下一个 if ( (firstWaiter = first.nextWaiter) == null) // 若是first是最后一个且要被移除了,就将last置null lastWaiter = null; // first断绝与条件队列的链接 first.nextWaiter = null; // fisrt转移失败,就看看后面是否是须要的 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
这里的while循环表示,若是first没有转移成功,就接着判断first后面的节点是否是须要转移。
该方法将节点从条件队列转移到阻塞队列。
final boolean transferForSignal(Node node) { /* * CAS操做尝试将Condition的节点的ws改成0 * 若是失败,意味着:节点的ws已经不是CONDITION,说明节点已经被取消了 * 若是成功,则该节点的状态ws被改成0了 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * 经过enq方法将node自旋的方式加入同步队列队尾 * 这里放回的p是node在同步队列的前驱节点 */ Node p = enq(node); int ws = p.waitStatus; // ws大于0 的状况只有 cancenlled,表示node的前驱节点取消了争取锁,那直接唤醒node线程 // ws <= 0 会使用cas操做将前驱节点的ws置为signal,若是cas失败也会唤醒node if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 自旋的方式入队 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; // 返回的是node的前驱节点 return t; } } } }
ok,一旦signal以后,节点被成功转移到同步队列后,这时下面这个循环就会退出了,继续回到这里:
int interruptMode = 0; // 若是这个节点的线程不在同步队列中,说明该线程还不具有竞争锁的资格,会一直挂起 while (!isOnSyncQueue(node)) { // 挂起线程 LockSupport.park(this); // 若是线程中断,退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
interruptMode能够有如下几种取值:
/** await 返回的时候,须要从新设置中断状态 */ private static final int REINTERRUPT = 1; /** await 返回的时候,须要抛出 InterruptedException 异常 */ private static final int THROW_IE = -1; /** interruptMode取0的时候表示在await()期间,没有发生中断 */
说到这里咱们须要明白,LockSupport.park(this)
挂起的线程是何时唤醒的:
LockSupport.unpark(node.thread);
方法。唤醒以后,咱们能够看到调用checkInterruptWhileWaiting方法检查等待期间是否发生了中断,若是不为0表示确实在等待期间发生了中断。
但其实这个方法的返回结果用interruptMode变量接收,拥有更加丰富的内涵,它还可以判断中断的时机是否在signal以前。
该方法用于判断该线程是否在挂起期间发生了中断。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ?// 若是处于中断状态,返回true,且将重置中断状态 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 若是中断了,判断什么时候中断 0; // 没有中断, 返回0 }
该方法判断什么时候中断,是否在signal以前。
final boolean transferAfterCancelledWait(Node node) { // 尝试使用CAS操做将node 的ws设置为0 // 若是成功,说明在signal方法以前中断就已经发生: // 缘由在于:signal若是在此以前发生,必然已经cas操做将ws设置为0了,这里不可能设置成功 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 就算中断了,也将节点入队 enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. * 这里就是signal以后发生的中断 * 可是signal可能还在进行转移中,这边自旋等一下它完成 */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
这里的话,咱们仍是稍微总结一下:
接下来三个部分我将一一说明:
// 第一部分 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 第二部分 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 清除取消的节点 // 第三部分 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
第一部分
signal唤醒的线程并不会当即获取到资源,从while循环退出后,会经过acquireQueued方法加入获取同步状态的竞争中。
// 第一部分 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
acquireQueued(node, savedState)
中node此时已经被加入同步队列了,savedState是以前存储的state。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; // } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued方法返回时,表示已经获取到了锁,且返回的是interrupted值,若是返回true,表示已经被中断。
接着判断interruptMode != THROW_IE
表示是在signal以后发生的中断,须要从新中断当前线程,将interruptMode设置为REINTERRUPT。
第二部分
// 第二部分 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 清除取消的节点
前面说了,signal会将节点移到同步队列中,最后一步须要和条件队列断开关系,也就是:node.nextWaiter = null
,但这是想象中比较正常的状况,若是在signal以前被中断,节点也会被加入同步队列中,这时实际上是没有调用这个断开关系的。
所以这边作一点处理, unlinkCancelledWaiters()
逻辑上面也说过了,能够回过头去看看,主要是清除队列中已经取消等待的节点。
第三部分
最后一个部分,就是对两种interruptMode的状况进行处理,看看代码就知道了:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // signal 以前的中断, 须要抛出异常 if (interruptMode == THROW_IE) throw new InterruptedException(); // signal 以后发生的中断, 须要从新中断 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
带超时机制的await()方法有如下几个,简单看下便可:
咱们选最后一个来看看,主要看看和以前await()方法不同的地方:
public final boolean await(long time, TimeUnit unit) throws InterruptedException { // 计算等待的时间 long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 表示是否超时 boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { // 等待时间到了 if (nanosTimeout <= 0L) { // 这个方法返回true表示在这个方法内,已经将节点转移到阻塞队列中 // 返回false,表示signal已经发生,表示没有超时 timedout = transferAfterCancelledWait(node); break; } //spinForTimeoutThreshold 是AQS中的一个字段,若是超过1ms,使用parkNonos if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 更新一下还须要等待多久 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } // 相比await() 针对中断少了抛出异常的操做,而是直接进行中断 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
最后以一个Java doc给的例子结尾吧:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); // condition 依赖于 lock 来产生 final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生产 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 队列已满,等待,直到 not full 才能继续生产 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去 } finally { lock.unlock(); } } // 消费 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去 return x; } finally { lock.unlock(); } } }
其实这个以前也说过,ArrayBlockingQueue就是采用了这种方式实现的生产者-消费者模式,若是你感兴趣,能够看看具体的实现细节哦。
isOnSyncQueue(Node node)
,若是在等待队列中,就一直等着,若是signal将它移到AQS队列中,则退出循环。