目录java
在使用内置锁synchronized时,经过调用java.lang.Objec中定义的监视器方法,主要有wait()、wait(long timeout)、notify()和notifyAll()方法,能够实现等待/通知模式。Codition接口中也定义了相似的监视器方法,与显示锁Lock配合使用也能够实现等待/通知模式。
当线程须要利用Condition对象进行等待时,须要提早获取到Condition对象关联的显示锁Lock对象,使用案例以下:node
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); //等待 public void coditionWait() throws InterruptedException { lock.lock(); try { condition.await(); }finally { lock.unlock(); } } //通知 public void coditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); }finally { lock.unlock(); } }
Condition接口由同步器AbstractQueuedSynchronizer内部类ConditionObject提供实现,而显示锁Lock对象实现时内部类Sync会继承AQS,从而把Condition对象与Lock对象关联起来。并发
在上一篇博客中介绍到为了处理多个线程竞争同一把锁,同步器AQS中维护了一个先入先出的双向同步队列,让竞争失败的线程进入同步队列等待。一样,AQS在实现Condition接口也维护了一个先入先出的单向等待队列,当一个与Lock对象关联的Condition对象调用await方法,得到锁的线程就要释放锁,并推出同步队列head头节点,进入condition等待队列。condition队列规定了头节点firstWaiter和尾节点lastWaiter。less
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }
AQS中构建等待队列复用了内部类Node结点类源码分析
static final class Node { //等待状态 volatile int waitStatus; //前驱结点 volatile Node prev; //后继节点 volatile Node next; //等待获取锁的线程 volatile Thread thread; //condition队列的后继节点 Node nextWaiter; }
从上图能够发现,Condition等待队列是一个先入先出的单向链表,从链表尾部加入元素,头部移出链表。使用nextWaiter指向下一个等待节点,构成链表的基本元素是节点Node,复用了AQS中的Node类,nextWaiter并不仅仅在Condition链表指向下一个等待节点。这是Node类定义nextWaiter的注释:ui
Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive,we save a field by using special value to indicate sharedmode.this
大意是只有独占锁才会关联Condition队列,经过nextWaiter变量在构成同步队列节点标识同步锁是独占锁仍是共享锁,从如下方法能够看出AQS使用nextWaiter来表示锁:线程
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; //判断是不是共享锁 final boolean isShared() { return nextWaiter == SHARED; } //构建同步队列节点,nextWaiter标识同步锁是独占锁仍是共享锁 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //构建等待队列节点,nextWaiter指向单向链表下一个节点 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
从以上分析能够看出:AQS复用了Node类来构建同步队列和等待队列,Node用来构建同步队列节点,nextWaiter标识同步锁是独占锁仍是共享锁;Node用来构建等待队列节点,nextWaiter指向单向链表下一个节点。刚开始看这一部分时,对我形成了很大的困扰,因此特意写出来。code
await实现等待考虑到了中断,若当前线程等待期间发生中断,抛出InterruptedException异常。线程在等待期间会被阻塞,直到发生中断或者Condition对象调用signal方法。基本流程:首先将node加入condition队列,而后释放锁,挂起当前线程等待唤醒,唤醒后线程从新进入同步队列并调用acquireQueued获取锁。流程图以下:
对象
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //将当前线程加入Condition等待队列 Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; //判断当前线程是否在同步队列中 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //在阻塞的过程当中发生中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //被其余线程唤醒,退出Condition等待队列加入同步队列 //获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //若是尾节点lastWaiter等待状态是CANCELLED,将队列全部CANCELLED节点清除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //以当前线程构成节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); //尾节点为空,等待队列为空,进行初始化,当前节点是等待队列的头节点 if (t == null) firstWaiter = node; //不然添加到等待队列的尾部,当前节点是等待队列新的lastWaiter else t.nextWaiter = node; lastWaiter = node; return node; } //unlinkCancelledWaiters方法遍历CONDITION队列,删除状态为CANCELLED的节点。 private void unlinkCancelledWaiters() { //首节点 Node t = firstWaiter; //保存遍历节点前驱节点的引用 Node trail = null; //单向链表从前日后遍历 while (t != null) { //下一个节点 Node next = t.nextWaiter; //节点t的waitStatus为CANCELLED if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
final int fullyRelease(Node node) { boolean failed = true; try { //获取同步状态 int savedState = getState(); //若是是重入锁,要屡次释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
final boolean isOnSyncQueue(Node node) { //转移到同步队列,CONDITION状态会被清除 //同步队列prev表示前驱结点,不为null if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //同步队列next表示后继节点,不为null if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ //遍历同步队列,一个一个找 return findNodeFromTail(node); }
//表示从等待状态退出时会从新产生一个中断,但不会抛出异常 private static final int REINTERRUPT = 1; //从等待状态退出时抛出InterruptedException异常 private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //产生异常 if (interruptMode == THROW_IE) throw new InterruptedException(); //产生中断 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
检查当前线程是否占据独占锁,唤醒等待在当前Condition对象等待最久的线程(等待队列的头节点)
public final void signal() { //检查当前线程是否占据独占锁,若是不是没有权限唤醒等待线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //若是CAS设置失败,说明节点在signal以前被取消了,返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //CAS设置成功,入队 //插入节点的前驱节点 Node p = enq(node); //前驱节点的等待状态 int ws = p.waitStatus; //若是p等待状态为CANECLLED或对p进行CAS设置失败,则唤醒线程,让node中线程进入acquireQueued方法。不然 //因为前驱节点等待状态为signal,由同步器唤醒线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
将等待队列全部节点依次转移到同步队列末尾。
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { //first节点从condition队列移出 Node next = first.nextWaiter; first.nextWaiter = null; //first节点加入同步队列 transferForSignal(first); //更新first节点指向 first = next; } while (first != null); }
以上是对AQS中内部类ConditionObject对Condition接口实现的简单分析。