前两篇文章分别介绍了 AQS 框架中的独占模式和共享模式,本篇将介绍 AQS 对 Condition 接口的实现。
在阅读本篇以前,建议先了解 AQS 中的数据结构和独占模式的实现原理。java
JUC 经过 Lock 和 Condition 两个接口实现管程(Monitor),其中 Lock 用于解决互斥问题,而 Condition 用于解决同步问题,而 AQS 对 Lock 和 Condition 接口的实现提供了一个基础的框架。node
本文基于 jdk1.8.0_91
Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。 c#
Condition 实现能够提供不一样于 Object 监视器方法的行为和语义,好比受保证的通知排序,或者在执行通知时不须要保持一个锁。
若是某个实现提供了这样特殊的语义,则该实现必须记录这些语义。 segmentfault
方法摘要:数组
// 形成当前线程在接到信号或被中断以前一直处于等待状态。 void await() // 形成当前线程在接到信号、被中断或到达指定等待时间以前一直处于等待状态。 boolean await(long time, TimeUnit unit) // 形成当前线程在接到信号、被中断或到达指定等待时间以前一直处于等待状态。 long awaitNanos(long nanosTimeout) // 形成当前线程在接到信号以前一直处于等待状态。 void awaitUninterruptibly() // 形成当前线程在接到信号、被中断或到达指定最后期限以前一直处于等待状态。 boolean awaitUntil(Date deadline) // 唤醒一个等待线程。 void signal() // 唤醒全部等待线程。 void signalAll()
Condition 本质上是一个队列(称为条件队列),线程等待某个条件成立时,在队列中阻塞,直到其余线程检查条件成立后来通知它。
对于同一个锁,只会存在一个同步队列,可是可能会有多个条件队列,只有在使用了 Condition 才会存在条件队列。数据结构
AQS 中对条件队列的使用:框架
当线程获取锁以后,执行 Condition.await()
会释放锁并进入条件队列,阻塞等待直到被其余线程唤醒。
当其余线程执行 Condition.signal()
唤醒当前线程时,当前线程会从条件队列转移到同步队列来等待再次获取锁。
当前线程再一次获取锁以后,须要在 while 循环中判断条件是否成立,若不成立需从新执行 Condition.await()
去等待。less
Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例得到 Condition 实例,请使用其 newCondition() 方法。 ui
Java 官方文档提供 Condition 接口的使用示例:this
对于一个有界阻塞数组,当数组非满时才能够往数组中存放数据,不然阻塞;当数据非空时才能够往数组中取元素,不然阻塞。
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生产者方法,往数组里面写数据 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 阻塞直到非满 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 通知非空 } finally { lock.unlock(); } } // 消费者方法,从数组里面拿数据 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 阻塞直到非空 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 通知非满 return x; } finally { lock.unlock(); } } }
在 JDK 的实现中,独占模式才可以使用 Condition,共享模式不支持 Condition。
由于 AQS 的内部类 ConditionObject 只支持独占模式。
java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() { return new ConditionObject(); }
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition
public Condition newCondition() { return sync.newCondition(); }
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition
/** * Throws {@code UnsupportedOperationException} because * {@code ReadLocks} do not support conditions. * * @throws UnsupportedOperationException always */ public Condition newCondition() { throw new UnsupportedOperationException(); }
代码流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 将当前线程封装成节点存入条件队列 int savedState = fullyRelease(node); // 释放已经持有的锁(就是在调用 Condition#await 以前持有的 Lock#lock 锁),并返回释放前的锁状态 int interruptMode = 0; while (!isOnSyncQueue(node)) { // 检查节点是否在同步队列上 LockSupport.park(this); // 节点还在条件队列中,则阻塞 // 节点从阻塞中被唤醒(condition#signal,Thread#interrupt),检查中断状态,设置中断处理模式 // 补充:被 condition#signal 唤醒后的线程会从条件队列转移到同步队列(先出队再入队) // 补充:若在条件队列中就发生了中断,也会被转移到同步队列(不出队,只入队,见 checkInterruptWhileWaiting -> transferAfterCancelledWait) if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 在同步队列等待获取资源直到成功,判断设置中断处理模式 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // nextWaiter不为空,说明当前节点是由 Thread#interrupt 唤醒的(condition#signal 唤醒阻塞节点会设置nextWaiter为空) // 此时当前节点同时存在于同步队列、条件队列上!可是 waitStatus 不是 CONDITION // 须要清除条件队列中已取消的节点 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); // 处理中断:抛异常,或者补上中断状态 }
注意:
将当前线程封装为节点(waitStatus 为 CONDITION),添加到条件队列尾部。
若条件队列不存在则进行初始化,把当前节点做为头节点(不使用 dummy node)。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 清理条件队列中已取消的尾节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 构建节点,尾插法 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
释放当前线程已持有的锁/资源,返回释放以前的锁/资源。
若未持有锁,报错。
这里存在 BUG:报错以前,当前线程已经加入到条件队列之中了,会致使条件队列存储无效的节点数据。
应该将是否持有锁的校验提早到 addConditionWaiter 以前,JDK 11 中已修复该问题。
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) { // 释放当前线程已持有的锁 boolean failed = true; try { int savedState = getState(); // 获取 volatile 的 state,独占模式下表示当前线程锁持有的所有锁 if (release(savedState)) { // 释放所有的锁 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); // 未持有锁,报错 } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
判断节点是否在同步队列上。
java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); // 从尾节点向前遍历查找 } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
1. 若是 waitStatus == CONDITION
说明必定是位于条件队列上。
从条件队列入队,构造节点的时候默认就为 CONDITION 状态。
将节点从条件队列转移到同步队列,首先会 CAS 设置 waitStatus 状态为 CONDITION,再执行入队操做。
2. node.prev == null
说明必定是位于条件队列上。
同步队列只有头节点符合 node.prev == null
,可是同步队列的头节点是 dummy node,其 thread 为空。
也就是说,来调用 isOnSyncQueue 方法且符合 node.prev == null
条件的节点,只多是位于条件队列上的节点。
3. 若是 node.next != null
说明必定是处于同步队列上。
节点加入同步队列是个复合操做,最后一步是设置 node.next,当 node.next != null
说明入队操做已执行完成。
4. 若是以上都没法判断节点是否位于同步队列,则遍历链表查找节点。
存在 node.prev != null
可是节点尚未彻底入队成功的状况,由于入队操做设置 prev -> tail -> next 是非原子操做。
因此须要从 tail 向前遍历,才能准确判断 node 是否位于同步队列上。
调用 findNodeFromTail 方法前,节点通常位于尾节点附近,不会遍历过多节点。
阻塞在 Condition#await 的线程被唤醒以后,调用 checkInterruptWhileWaiting 来检查是不是由线程中断唤醒的。
若是是由线程中断唤醒的,须要进一步判断如何处理中断:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { // 校验当前线程是否被中断,并清除线程的中断状态 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; // 若是线程未被中断,返回0 }
若是阻塞在 Condition#await 的线程是被中断唤醒的,执行 transferAfterCancelledWait 判断发生中断发生时节点所在的位置。
若是是位于条件队列,则将其添加到同步队列,返回 true;不然返回 false。
如何判断中断发生时节点所在的位置?
java.util.concurrent.locks.AbstractQueuedSynchronizer#transferAfterCancelledWait
/** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 经过 CAS 成功与否来判断节点位置 enq(node); // 若是CAS成功,说明节点是位于条件队列,须要将它添加到同步队列 return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ // 条件队列上的节点获得通知(Condition#signal)以后,会添加到同步队列中去。 while (!isOnSyncQueue(node)) // 这里循环检测,直到确认节点已经成功添加到同步队列中。 Thread.yield(); return false; }
在 Condition#await 方法中,当线程从阻塞中被线程中断唤醒后,判断节点是位于条件队列中,除了将节点加入同步队列以外,还须要将节点从条件队列中移除。
官方的说明:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { // 清除条件队列中状态不为CONDITION的节点 Node t = firstWaiter; // 游标节点,记录当前遍历的节点 Node trail = null; // 游标节点,记录遍历过的最后一个有效节点(状态为CONDITION) while (t != null) { // 从条件队列的头节点开始遍历(下面注释用next表明下一个节点) Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // 当前t为无效节点 t.nextWaiter = null; if (trail == null) // 首次遍历到t为有效节点时,才会初始化trail firstWaiter = next; // 设置t.next为新的头节点(下一次循环校验t.next:若t.next无效,则把t.next.next设为新的头节点) else trail.nextWaiter = next; // 设置trail.next为t.next(把t出队,下一次循环校验t.next:若t.next无效,则把t.next.next设为trail.next) if (next == null) lastWaiter = trail; // 设置trail为新的尾节点 } else // 当前t为有效节点 trail = t; t = next; // 继续遍历t.next } }
Condition#await 执行到最后,从阻塞中被唤醒且从新取得锁,判断 interruptMode != 0,即 Condition#await 整个过程当中发生过中断,须要对中断进行统一处理。
具体见设置 interruptMode 的代码:checkInterruptWhileWaiting
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#reportInterruptAfterWait
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt
/** * Convenience method to interrupt current thread. */ static void selfInterrupt() { Thread.currentThread().interrupt(); }
在 Condition 条件上阻塞,具备超时时间。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; // 等待截止的时间戳 int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { // 已超时,检查节点所在位置,判断是否把节点加入同步队列 transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) // 大于时间阈值,进行阻塞;小于时间阈值,进行自旋 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 从阻塞中唤醒后,检查是否发生中断,若有中断则结束自旋 break; nanosTimeout = deadline - System.nanoTime(); // 剩余等待时间 } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); // 返回剩余的阻塞时间 }
java.util.concurrent.locks.AbstractQueuedSynchronizer#spinForTimeoutThreshold
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
在 Condition 条件上阻塞,只能被 Condition#signal 唤醒。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitUninterruptibly
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
唤醒在 Condition#await 上等待最久的线程。
把条件队列的头节点出队,把它加入同步队列,并唤醒节点中的线程。
被唤醒的线程从 Condition#await 中醒来后,执行 AbstractQueuedSynchronizer#acquireQueued 等待再次获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) // 未持有独占锁,报错 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); // 唤醒队首节点(等待时间最长) }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 把条件队列的头节点转移到同步队列 do { if ( (firstWaiter = first.nextWaiter) == null) // 当前节点的后继节点做为新的头节点(出队),若为空,说明队列为空 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 把当前节点转移到同步队列(入队),并唤醒节点上的线程(说明条件队列的头节点不是dummy node) (first = firstWaiter) != null); // 转移失败,取最新的firstWaiter,若不为空则重试,若为空,说明队列为空 }
java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
/** * Transfers a node from a condition queue onto sync queue. // 将一个节点从条件队列转移到同步队列 * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. // 条件队列上的节点状态不为CONDITION,说明是已取消 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); // 添加到同步队列,返回上一个节点 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 检查上一个节点,发现不具有唤醒当前节点条件,则当即唤醒当前节点 return true; // 补充:node.thread 从 Condition#await 之中被唤醒,后续执行 acquireQueued 尝试获取锁 }
遍历条件队列,依次唤醒全部节点。
全部节点都会迁移到同步队列等待获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignalAll
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
相关阅读:
阅读 JDK 源码:AQS 中的独占模式
阅读 JDK 源码:AQS 中的共享模式
阅读 JDK 源码:AQS 对 Condition 的实现
做者:Sumkor