Condition
是一个接口,AbstractQueuedSynchronizer 中的ConditionObject
内部类实现了这个接口。Condition
声明了一组等待/通知
的方法,这些方法的功能与Object
中的wait/notify/notifyAll
等方法类似。这二者相同的地方在于,它们所提供的等待/通知
方法均是为了协同线程的运行秩序。只不过,Object 中的方法须要配合 synchronized 关键字使用,而 Condition 中的方法则要配合锁对象使用,并经过newCondition
方法获取实现类对象。除此以外,Condition 接口中声明的方法功能上更为丰富一些。好比,Condition 声明了具备不响应中断和超时功能的等待接口,这些都是 Object wait 方法所不具有的。html
本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 独占/共享模式的续篇,在学习 Condition 的原理前,建议你们先去了解 AbstractQueuedSynchronizer 同步队列相关原理。本篇文章会涉及到同步队列相关知识,这些知识在上一篇文章分析过。java
关于Condition
的简介这里先说到这,接下来分析一下Condition
实现类ConditionObject
的原理。node
ConditionObject
是经过基于单链表的条件队列来管理等待线程的。线程在调用await
方法进行等待时,会释放同步状态。同时线程将会被封装到一个等待节点中,并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的状况下调用signal
或singalAll
方法时,队列中的等待线程将会被唤醒,从新竞争锁。另外,须要说明的是,一个锁对象可同时建立多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不一样的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。其大体示意图以下:并发
以上就是 ConditionObject 所实现的等待/通知机制的大体原理,并非很难理解。固然,在具体的实现中,则考虑的更为细致一些。相关细节将会在接下来一章中进行说明,继续往下看吧。源码分析
ConditionObject 中实现了几种不一样的等待方法,每种方法均有它本身的特色。好比await()
会响应中断,而awaitUninterruptibly()
则不响应中断。await(long, TimeUnit)
则会在响应中断的基础上,新增了超时功能。除此以外,还有一些等待方法,这里就不一一列举了。学习
在本节中,我将主要分析await()
的方法实现。其余的等待方法大同小异,就不一一分析了,有兴趣的朋友能够本身看一下。好了,接下来进入源码分析阶段。ui
/** * await 是一个响应中断的等待方法,主要逻辑流程以下: * 1. 若是线程中断了,抛出 InterruptedException 异常 * 2. 将线程封装到节点对象里,并将节点添加到条件队列尾部 * 3. 保存并彻底释放同步状态,保存下来的同步状态在从新竞争锁时会用到 * 4. 线程进入等待状态,直到被通知或中断才会恢复运行 * 5. 使用第3步保存的同步状态去竞争独占锁 */ public final void await() throws InterruptedException { // 线程中断,则抛出中断异常,对应步骤1 if (Thread.interrupted()) throw new InterruptedException(); // 添加等待节点到条件队列尾部,对应步骤2 Node node = addConditionWaiter(); // 保存并彻底释放同步状态,对应步骤3。此方法的意义会在后面详细说明。 int savedState = fullyRelease(node); int interruptMode = 0; /* * 判断节点是否在同步队列上,若是不在则阻塞线程。 * 循环结束的条件: * 1. 其余线程调用 singal/singalAll,node 将会被转移到同步队列上。node 对应线程将 * 会在获取同步状态的过程当中被唤醒,并走出 while 循环。 * 2. 线程在阻塞过程当中产生中断 */ while (!isOnSyncQueue(node)) { // 调用 LockSupport.park 阻塞当前线程,对应步骤4 LockSupport.park(this); /* * 检测中断模式,这里有两种中断模式,以下: * THROW_IE: * 中断在 node 转移到同步队列“前”发生,须要当前线程自行将 node 转移到同步队 * 列中,并在随后抛出 InterruptedException 异常。 * * REINTERRUPT: * 中断在 node 转移到同步队列“期间”或“以后”发生,此时代表有线程正在调用 * singal/singalAll 转移节点。在该种中断模式下,再次设置线程的中断状态。 * 向后传递中断标志,由后续代码去处理中断。 */ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /* * 被转移到同步队列的节点 node 将在 acquireQueued 方法中从新获取同步状态,注意这里 * 的这里的 savedState 是上面调用 fullyRelease 所返回的值,与此对应,能够把这里的 * acquireQueued 做用理解为 fullyAcquire(并不存在这个方法)。 * * 若是上面的 while 循环没有产生中断,则 interruptMode = 0。但 acquireQueued 方法 * 可能会产生中断,产生中断时返回 true。这里仍将 interruptMode 设为 REINTERRUPT, * 目的是继续向后传递中断,acquireQueued 不会处理中断。 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; /* * 正常经过 singal/singalAll 转移节点到同步队列时,nextWaiter 引用会被置空。 * 若发生线程产生中断(THROW_IE)或 fullyRelease 方法出现错误等异常状况, * 该引用则不会被置空 */ if (node.nextWaiter != null) // clean up if cancelled // 清理等待状态非 CONDITION 的节点 unlinkCancelledWaiters(); if (interruptMode != 0) /* * 根据 interruptMode 以为中断的处理方式: * THROW_IE:抛出 InterruptedException 异常 * REINTERRUPT:从新设置线程中断标志 */ reportInterruptAfterWait(interruptMode); } /** 将当先线程封装成节点,并将节点添加到条件队列尾部 */ private Node addConditionWaiter() { Node t = lastWaiter; /* * 清理等待状态为 CANCELLED 的节点。fullyRelease 内部调用 release 发生异常或释放同步状 * 态失败时,节点的等待状态会被设置为 CANCELLED。因此这里要清理一下已取消的节点 */ if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 建立节点,并将节点置于队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** 清理等待状态为 CANCELLED 的节点 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; // 指向上一个等待状态为非 CANCELLED 的节点 Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; /* * trail 为 null,代表 next 以前的节点等待状态均为 CANCELLED,此时更新 * firstWaiter 引用的指向。 * trail 不为 null,代表 next 以前有节点的等待状态为 CONDITION,这时将 * trail.nextWaiter 指向 next 节点。 */ if (trail == null) firstWaiter = next; else trail.nextWaiter = next; // next 为 null,代表遍历到条件队列尾部了,此时将 lastWaiter 指向 trail if (next == null) lastWaiter = trail; } else // t.waitStatus = Node.CONDITION,则将 trail 指向 t trail = t; t = next; } } /** * 这个方法用于彻底释放同步状态。这里解释一下彻底释放的缘由:为了不死锁的产生,锁的实现上 * 通常应该支持重入功能。对应的场景就是一个线程在不释放锁的状况下能够屡次调用同一把锁的 * lock 方法进行加锁,且不会加锁失败,如失败必然致使致使死锁。锁的实现类可经过 AQS 中的整型成员 * 变量 state 记录加锁次数,每次加锁,将 state++。每次 unlock 方法释放锁时,则将 state--, * 直至 state = 0,线程彻底释放锁。用这种方式便可实现了锁的重入功能。 */ final int fullyRelease(Node node) { boolean failed = true; try { // 获取同步状态数值 int savedState = getState(); // 调用 release 释放指定数量的同步状态 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 若是 relase 出现异常或释放同步状态失败,此处将 node 的等待状态设为 CANCELLED if (failed) node.waitStatus = Node.CANCELLED; } } /** 该方法用于判断节点 node 是否在同步队列上 */ final boolean isOnSyncQueue(Node node) { /* * 节点在同步队列上时,其状态可能为 0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一, * 但不会为 CONDITION,因此可已经过节点的等待状态来判断节点所处的队列。 * * node.prev 仅会在节点获取同步状态后,调用 setHead 方法将本身设为头结点时被置为 * null,因此只要节点在同步队列上,node.prev 必定不会为 null */ if (node.waitStatus == Node.CONDITION || node.prev == null) return false; /* * 若是节点后继被为 null,则代表节点在同步队列上。由于条件队列使用的是 nextWaiter 指 * 向后继节点的,条件队列上节点的 next 指针均为 null。但仅以 node.next != null 条 * 件判定节点在同步队列是不充分的。节点在入队过程当中,是先设置 node.prev,后设置 * node.next。若是设置完 node.prev 后,线程被切换了,此时 node.next 仍然为 * null,但此时 node 确实已经在同步队列上了,因此这里还须要进行后续的判断。 */ if (node.next != null) return true; // 在同步队列上,从后向前查找 node 节点 return findNodeFromTail(node); } /** 因为同步队列上的的节点 prev 引用不会为空,因此这里从后向前查找 node 节点 */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } /** 检测线程在等待期间是否发生了中断 */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * 判断中断发生的时机,分为两种: * 1. 中断在节点被转移到同步队列前发生,此时返回 true * 2. 中断在节点被转移到同步队列期间或以后发生,此时返回 false */ final boolean transferAfterCancelledWait(Node node) { // 中断在节点被转移到同步队列前发生,此时自行将节点转移到同步队列上,并返回 true if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 调用 enq 将节点转移到同步队列中 enq(node); return true; } /* * 若是上面的条件分支失败了,则代表已经有线程在调用 signal/signalAll 方法了,这两个 * 方法会先将节点等待状态由 CONDITION 设置为 0 后,再调用 enq 方法转移节点。下面判断节 * 点是否已经在同步队列上的缘由是,signal/signalAll 方法可能仅设置了等待状态,还没 * 来得及转移节点就被切换走了。因此这里用自旋的方式判断 signal/signalAll 是否已经完 * 成了转移操做。这种状况代表了中断发生在节点被转移到同步队列期间。 */ while (!isOnSyncQueue(node)) Thread.yield(); } // 中断在节点被转移到同步队列期间或以后发生,返回 false return false; } /** * 根据中断类型作出相应的处理: * THROW_IE:抛出 InterruptedException 异常 * REINTERRUPT:从新设置中断标志,向后传递中断 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** 中断线程 */ static void selfInterrupt() { Thread.currentThread().interrupt(); }
/** 将条件队列中的头结点转移到同步队列中 */ public final void signal() { // 检查线程是否获取了独占锁,未获取独占锁调用 signal 方法是不容许的 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 将头结点转移到同步队列中 doSignal(first); } private void doSignal(Node first) { do { /* * 将 firstWaiter 指向 first 节点的 nextWaiter 节点,while 循环将会用到更新后的 * firstWaiter 做为判断条件。 */ if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将头结点从条件队列中移除 first.nextWaiter = null; /* * 调用 transferForSignal 将节点转移到同步队列中,若是失败,且 firstWaiter * 不为 null,则再次进行尝试。transferForSignal 成功了,while 循环就结束了。 */ } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** 这个方法用于将条件队列中的节点转移到同步队列中 */ final boolean transferForSignal(Node node) { /* * 若是将节点的等待状态由 CONDITION 设为 0 失败,则代表节点被取消。 * 由于 transferForSignal 中不存在线程竞争的问题,因此下面的 CAS * 失败的惟一缘由是节点的等待状态为 CANCELLED。 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 调用 enq 方法将 node 转移到同步队列中,并返回 node 的前驱节点 p Node p = enq(node); int ws = p.waitStatus; /* * 若是前驱节点的等待状态 ws > 0,则代表前驱节点处于取消状态,此时应唤醒 node 对应的 * 线程去获取同步状态。若是 ws <= 0,这里经过 CAS 将节点 p 的等待设为 SIGNAL。 * 这样,节点 p 在释放同步状态后,才会唤醒后继节点 node。若是 CAS 设置失败,则应当即 * 唤醒 node 节点对应的线程。以避免因 node 没有被唤醒致使同步队列挂掉。关于同步队列的相关的 * 知识,请参考个人另外一篇文章“AbstractQueuedSynchronizer 原理分析 - 独占/共享模式”, * 连接为:http://t.cn/RuERpHl */ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
看完了 signal 方法的分析,下面再来看看 signalAll 的源码分析,以下:this
public final void signalAll() { // 检查线程是否获取了独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; /* * 将条件队列中全部的节点转移到同步队列中。与 doSignal 方法略有不一样,主要区别在 * while 循环的循环条件上,下的循环只有在条件队列中没节点后才终止。 */ do { Node next = first.nextWaiter; // 将 first 节点从条件队列中移除 first.nextWaiter = null; // 转移节点到同步队列上 transferForSignal(first); first = next; } while (first != null); }
在我阅读 ConditionObject 源码时发现了一个问题 - await 方法居然没有作同步控制。而在 signal 和 signalAll 方法开头都会调用 isHeldExclusively 检测线程是否已经获取了独占锁,未获取独占锁调用这两个方法会抛出异常。但在 await 方法中,却没有进行相关的检测。若是在正确的使用方式下调用 await 方法是不会出现问题的,所谓正确的使用方式指的是在获取锁的状况下调用 await 方法。但若是没获取锁就调用该方法,就会产生线程竞争的状况,这将会对条件队列的结构形成破坏。这里再来看一下新增节点的方法源码,以下:spa
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 存在竞争时将会致使节点入队出错 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
假如如今有线程 t1 和 t2,对应节点 node1 和 node2。线程 t1 获取了锁,而 t2 未获取锁,此时条件队列为空,即 firstWaiter = lastWaiter = null。演绎一下会致使条件队列被破坏的场景,以下:线程
if (t == null)
,两个线程都认为 if 条件知足如上,若是线程是按照上面的顺序执行,这会致使队列被破坏。firstWaiter 本应该指向 node1,但结果却指向了 node2,node1 被排挤出了队列。这样会致使什么问题呢?这样可能会致使线程 t1 一直阻塞下去。由于 signal/signalAll 是从条件队列头部转移节点的,但 node1 不在队列中,因此 node1 没法被转移到同步队列上。在不出现中断的状况下,node1 对应的线程 t1 会被永久阻塞住。
这里未对 await 方法进行同步控制,致使条件队列出现问题,应该算 ConditionObject 实现上的一个缺陷了。关于这个缺陷,博客园博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源码解读--续篇之Condition 中也提到了。并向 JDK 开发者提了一个 BUG,BUG 连接为 JDK-8187408,有兴趣的同窗能够去看看。
到这里,Condition 的原理就分析完了。分析完 Condition 原理,关于 AbstractQueuedSynchronizer 的分析也就结束了。整体来讲,经过分析 AQS 并写成博客,使我对 AQS 的原理有了更深入的认识。AQS 是 JDK 中锁和其余并发组件实现的基础,弄懂 AQS 原理对后续在分析各类锁和其余同步组件大有裨益。
AQS 自己实现比较复杂,要处理各类各样的状况。做为类库,AQS 要考虑和处理各类可能的状况,实现起来可谓很是复杂。不只如此,AQS 还很好的封装了同步队列的管理,线程的阻塞与唤醒等基础操做,大大下降了继承类实现同步控制功能的复杂度。因此,在本文的最后,再次向 AQS 的做者,Java 大师Doug Lea
致敬。
好了,本文到此结束,谢谢你们阅读。
本文在知识共享许可协议 4.0 下发布,转载需在明显位置处注明出处
做者:coolblog
本文同步发布在个人我的博客: http://www.coolblog.xyz
本做品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。