收录于话题 #进阶架构师 | 并发编程专题 12个
点击上方“java进阶架构师”,选择右上角“置顶公众号”
20大进阶架构专题每日送达
咱们知道 synchronized 锁经过 Object 类的 wait()和 notify()方法实现线程间的等待通知机制,而比 synchronized 更灵活 Lock 锁一样也有实现等待通知机制的方式,那就是条件 Condition。本文将从如下几个方面介绍 Condition:java
1.1 Condition 类提供的方法
等待方法:node
// 当前线程进入等待状态,若是其余线程调用 condition 的 signal 或者 signalAll 方法而且当前线程获取 Lock 从 await 方法返回,若是在等待状态中被中断会抛出被中断异常 void await() throws InterruptedException // 当前线程进入等待状态直到被通知,中断或者超时 long awaitNanos(long nanosTimeout) // 同第二个方法,支持自定义时间单位 boolean await(long time, TimeUnit unit)throws InterruptedException // 当前线程进入等待状态直到被通知,中断或者到了某个时间 boolean awaitUntil(Date deadline) throws InterruptedException
唤醒方法:面试
// 唤醒一个等待在 condition 上的线程,将该线程从等待队列中转移到同步队列中,若是在同步队列中可以竞争到 Lock 则能够从等待方法中返回 void signal() // 与 1 的区别在于可以唤醒全部等待在 condition 上的线程 void signalAll()
1.2 使用举例
启动 waiter 和 signaler 两个线程。
waiter 线程获取到锁,检查 flag=false 不知足条件,执行 condition.await()方法将线程阻塞等待并释放锁。
signaler 线程获取到锁以后更改条件,将 flag 变为 true,执行 condition.signalAll()通知唤醒等待线程,释放锁。
waiter 线程被唤醒获取到锁,自旋检查 flag=true 知足条件,继续执行。编程
public class ConditionTest { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); private static volatile boolean flag = false; public static void main(String[] args) { Thread waiter = new Thread(new waiter()); waiter.start(); Thread signaler = new Thread(new signaler()); signaler.start(); } static class waiter implements Runnable { @Override public void run() { lock.lock(); try { while (!flag) { System.out.println(Thread.currentThread().getName() + "当前条件不知足等待"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "接收到通知条件知足"); } finally { lock.unlock(); } } } static class signaler implements Runnable { @Override public void run() { lock.lock(); try { flag = true; condition.signalAll(); } finally { lock.unlock(); } } } }
输出结果:
Thread-0当前条件不知足等待
Thread-0接收到通知,条件知足markdown
Object 的 wait 和 notify/notify 是与 synchronized 配合完成线程间的等待/通知机制,是属于 Java 底层级别的。而 Condition 是语言级别的,具备更高的可控制性和扩展性。具体表现以下:
wait/notify 方式是响应中断的,当线程处于 Object.wait()的等待状态中,线程中断会抛出中断异常;Condition 有响应中断和不响应中断模式能够选择。
wait/notify 方式一个 synchronized 锁只有一个等待队列;一个 Lock 锁能够根据不一样的条件,new 多个 Condition 对象,每一个对象包含一个等待队列。架构
须要注意的是,Condition 同 wait/notify 同样,在等待与唤醒方法使用以前必须获取到该锁。并发
Tips:须要在理解 AQS 及 ReentrantLock 基础上阅读本文源码,给出这两篇的连接:
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
3.1 条件队列
首先看 Condition 对象的建立:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();ide
public Condition newCondition() {
return sync.newCondition();
}微服务
final ConditionObject newCondition() {
return new ConditionObject();
}
建立的 Condition 对象其实就是 ConditionObject 对象,ConditionObject 是 AbstractQueuedSynchronizer(AQS)的内部类,实现了 Condition 接口。
每一个 ConditionObject 对象都有一个条件等待队列,用于保存在该 Condition 对象上等待的线程。条件等待队列是一个单向链表,结点用的 AQS 的 Node 类,每一个结点包含线程、next 结点、结点状态。ConditionObject 经过持有头尾指针类管理条件队列。高并发
注意区分 AQS 的同步队列和 Condition 的条件队列。
线程抢锁失败时进入 AQS 同步队列,AQS 同步队列中的线程都是等待着随时准备抢锁的。
线程由于没有知足某一条件而调用 condition.await()方法以后进入 Condition 条件队列,Condition 条件队列中的线程只能等着,没有获取锁的机会。
当条件知足后调用 condition.signal()线程被唤醒,那么线程就从 Condition 条件队列移除,进入 AQS 同步队列,被赋予抢锁继续执行的机会。
条件队列源码:
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter;// 头结点 private transient Node lastWaiter;// 尾结点 /** * 入队操做 */ private Node addConditionWaiter() { Node t = lastWaiter; // 若是尾结点取消等待了,将其清除出去,并检查整个条件队列将已取消的全部结点清除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 这个方法会遍历整个条件队列,而后会将已取消的全部结点清除出队列 t = lastWaiter; } // 将当前线程构形成结点,加入队尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;// 维护尾结点指针 return node; } /** * 遍历整个条件队列,清除已取消等待的结点 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null;// 用于保存前一个结点 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // t结点状态不是Node.CONDITION,说明已经取消等待,删除 t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t;// 下次循环中t结点的前一个结点 t = next; } } } static final class Node { volatile Thread thread;// 每个节点对应一个线程 Node nextWaiter;// next结点 volatile int waitStatus;// 结点状态 static final int CONDITION = -2;// 结点状态:当前节点进入等待队列中 ... }
3.2 await()
当调用 condition.await()方法后会使得线程进入到条件队列,此时线程将被阻塞。当调用 condition.signal()方法后,线程从条件队列进入 AQS 同步队列排队等锁。线程在 AQS 中发生的事情这里就不介绍了,不明白的能够看下之前 AQS 的文章【原创】14|AQS 源码分析。
await()方法源码:
/** * 当前线程被阻塞,并加入条件队列 * 线程在AQS同步队列中被唤醒后尝试获取锁 */ public final void await() throws InterruptedException { // 响应打断 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程构形成结点,加入条件队列队尾,上文详细分析了该方法 Node node = addConditionWaiter(); // 释放锁,线程阻塞前必须将锁释放,下文详解fullyRelease()方法 int savedState = fullyRelease(node); int interruptMode = 0; /* * 1.isOnSyncQueue()检查node是否在AQS同步队列中,不在同步队列中返回false,下文详解isOnSyncQueue()方法 * 2.若是node不在AQS同步队列中,将当前线程阻塞 * 3.当其余代码调用signal()方法,线程进入AQS同步队列后被唤醒,继续从这里阻塞的地方开始执行 * 4.注意这里while循环的自旋,线程被唤醒之后还要再检查一下node是否在AQS同步队列中 */ while (!isOnSyncQueue(node)) { // 检查node是否在AQS同步队列中 LockSupport.park(this); // 阻塞,线程被唤醒后从这里开始执行 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /* * 到这里,是当前线程在AQS同步队列中被唤醒了,尝试获取锁 * acquireQueued()方法抢锁,抢不到锁就在同步队列中阻塞 * acquireQueued()方法是AQS文章中详细重点讲解过的这里不详细分析了 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
fullyRelease()方法:
/** * 将node线程的锁所有释放 * “所有”是指屡次重入的状况,这里一次所有释放 */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 锁状态 if (release(savedState)) {// 释放锁 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } isOnSyncQueue()方法: /** * 检查node是否在AQS同步队列中,在同步队列中返回true */ final boolean isOnSyncQueue(Node node) { // 状态为Node.CONDITION条件等待状态,确定是在条件队列中,而不在同步队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node已经有后继节点next,那确定是在同步队列了 if (node.next != null) return true; // 遍历同步队列,查看是否有与node相等的结点 return findNodeFromTail(node); } /** * 从同步队列的队尾开始从后往前遍历找,若是找到相等的,说明在同步队列,不然就是不在同步队列 */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
3.3 signal()
调用 condition.signal()方法后,线程从 Condition 条件队列移除,进入 AQS 同步队列排队等锁。
注意:正常状况下 signal 只是将线程从 Condition 条件队列转移到 AQS 同步队列,并无唤醒线程。线程的唤醒时机是 AQS 中线程的前驱节点释放锁以后。
public final void signal() { // 验证当前线程持有锁才能调用该方法 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * 从条件队列队头日后遍历,找出第一个须要转移的结点node,将node从条件队列转移到AQS同步队列 * 为何须要遍历找?由于前有些线程会取消等待,可是可能还在条件队列中 */ private void doSignal(Node first) { do { // 将first中条件队列中移除,将first的next结点做为头结点赋值给firstWaiter if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; /* * transferForSignal()将first结点加入AQS同步队列 * 若是first结点加入同步队列失败,是由于first结点取消了Node.CONDITION状态,缘由在下面transferForSignal()的讲解中说明 * 若是first结点加入同步队列失败,那么选择first后面的第一个结点进行转移,依此类推 */ } while (!transferForSignal(first) && // 将first结点加入AQS同步队列 (first = firstWaiter) != null); // first结点加入同步队列失败,选择first后面的结点进行转移 } /** * 将结点转移到同步队列 * @return true-表明成功转移;false-表明在signal以前,节点已经取消等待了 */ final boolean transferForSignal(Node node) { /* * CAS设置结点状态 * CAS失败说明此node的waitStatus已不是Node.CONDITION,说明节点已经取消。既然已经取消,也就不须要转移了,方法返回,转移后面一个节点 * CAS失败为何不是其余线程抢先操做了呢?由于这里还持有lock独占锁,只有当前线程能够访问。 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);// 自旋进入同步队列的队尾 int ws = p.waitStatus; // 正常状况下不会走这里,这里是前驱节点取消或者 CAS 失败的状况 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } static final class Node { volatile Thread thread;// 每个结点对应一个线程 Node nextWaiter;// next结点 volatile int waitStatus;// 结点状态 static final int CONDITION = -2;// 结点状态:当前结点进入等待队列中 }
3.4 源码过程总结
ReentrantLock lock = new ReentrantLock();建立 lock 锁,对应生成 AQS 同步队列,一个 ReentrantLock 锁对应一个 AQS 同步队列。
Condition condition = lock.newCondition();建立 condition,对应生成 condition 条件队列。
线程 A 调用condition.await();,线程 A 阻塞并加入 condition 同步队列。
线程 B 调用condition.signal();,线程 A 阻塞从 condition1 同步队列转移到 AQS 同步队列的队尾。
当 AQS 队列中线程 A 的前驱节点线程执行完并释放锁时,将线程 A 唤醒。
线程 A 被唤醒以后抢锁,执行逻辑代码。
Condition 实现的生产者消费者问题。
class BoundedBuffer { final ReentrantLock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生产 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 队列已满,等待,直到 not full 才能继续生产 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去 } finally { lock.unlock(); } } // 消费 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去 return x; } finally { lock.unlock(); } } }
生产者线程调用 put()方法向队列中添加对象,当队列满时,生产者线程就阻塞等待。
消费者线程调用 take()方法取出队列中的对象,取出对象后队列能够添加对象了,通知被阻塞的生产者线程。
生产者线程被唤醒后,从阻塞的位置开始执行,继续向队列中添加对象。
一样,消费者取出队列中对象时,发现队列为空了也会阻塞等待,生产者线程添加对象以后会通知消费者线程。
Object 的 wait 和 notify/notify 是与 synchronized 配合完成线程间的等待/通知机制,而 Condition 与 Lock 配合完成等待通知机制。
Condition 比 wait 和 notify 具备更高的可控制性和扩展性,一个 Lock 锁能够有多个 Condition 条件,此外 Condition 还有响应中断和不响应中断模式能够选择。Condition 的使用与 wait/notify 同样,在等待与唤醒方法使用以前必须获取到锁。
Condition 的实现原理:每一个 condition 都有一个条件队列,调用 condition.await()方法将线程阻塞后线程就进入了条件队列,调用 condition.sigal()方法后线程从 condition 条件队列转移到 AQS 同步队列等锁,该线程的前一节点释放锁以后会唤醒该线程抢锁执行。
Condition 多用于实现的生产者消费者问题。
【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深刻理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘 CAS
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
【原创】17|读写锁八讲(上)
【原创】18|读写锁八讲(下)
【原创】19|JDK8 新增锁 StampedLock
【原创】20|StampedLock 源码解析
———— e n d ————
微服务、高并发、JVM调优、面试专栏等20大进阶架构师专题请关注公众号【Java进阶架构师】后在菜单栏查看。
回复【架构】领取架构师视频一套。
原创历来不开赞扬是由于我以为你的“在看”,就是给我最好的赞扬^_^