回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法能够实现等待/通知模式。java
对于 Lock,经过 Condition 也能够实现等待/通知模式。node
Condition 是一个接口。sql
Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。架构
Lock 接口中有个 newCondition() 方法,经过这个方法能够得到 Condition 对象(其实就是 ConditionObject)。并发
所以,经过 Lock 对象能够得到 Condition 对象。分布式
Lock lock = new ReentrantLock(); Condition c1 = lock.newCondition(); Condition c2 = lock.newCondition();
ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。高并发
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; ...
能够看到,等待队列和同步队列同样,使用的都是同步器 AQS 中的节点类 Node。性能
一样拥有首节点和尾节点,学习
每一个 Condition 对象都包含着一个 FIFO 队列。ui
结构图:
调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //释放同步状态(锁) int savedState = fullyRelease(node); int interruptMode = 0; //判断节点是否放入同步对列 while (!isOnSyncQueue(node)) { //阻塞 LockSupport.park(this); //若是已经中断了,则退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
分析上述方法的大概过程:
addConditionWaiter() 方法:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { //清除条件队列中全部状态不为Condition的节点 unlinkCancelledWaiters(); t = lastWaiter; } //将该线程建立节点,放入等待队列 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
过程分析:同步队列的首节点移动到等待队列。加入尾节点以前会清除全部状态不为 Condition 的节点。
调用 Condition 的 signal() 方法,能够唤醒等待队列的首节点(等待时间最长),唤醒以前会将该节点移动到同步队列中。
public final void signal() { //判断是否获取了锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
过程:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
过程:
final boolean transferForSignal(Node node) { //将节点状态变为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将该节点加入同步队列 Node p = enq(node); int ws = p.waitStatus; //若是结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
调用同步器的 enq 方法,将节点移动到同步队列,
知足条件后使用 LockSupport 唤醒该线程。
当 Condition 调用 signalAll() 方法:
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
能够看到 doSignalAll() 方法使用了 do-while 循环来唤醒每个等待队列中的节点,直到 first 为 null 时,中止循环。
一句话总结 signalAll() 的做用:将等待队列中的所有节点移动到同步队列中,并唤醒每一个节点的线程。
整个过程能够分为三步:
第一步:一个线程获取锁后,经过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。而后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,不然一直阻塞。
第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,而后经过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。
第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,而后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法,继续执行。
欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!