任意一个java对象,都拥有一组监视器方法,主要包括 wait(),wait(long timeout)、notify 以及 notifyAll 方法,这些方法与synchronized 同步关键字配合,能够实现等待/通知模式。Condition 接口也提供了类型Object的监视器方法,与Lock配合能够实现等待/通知模式。java
Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,须要提早获取到Condition对象关联的锁。Condition对象由Lock对象(调用Lock对象的 newCondition方法)建立出来的。Condition是依赖Lock对象的。node
package com.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by cxx on 2018/1/18. */ public class ConditionOption { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditonWait() throws InterruptedException{ lock.lock(); try { condition.await(); }finally { lock.unlock(); } } public void conditonSignal() throws InterruptedException{ lock.lock(); try { condition.signal(); }finally { lock.unlock(); } } }
获取一个Condition 必须经过Lock的newCondition() 方法。有界队列是一种特殊的队列,当队列为空时,队列的获取操做将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操做将会阻塞插入线程,知道队列出现“空位。”数组
package com.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by cxx on 2018/1/18. */ public class BoundedQueue <T> { private Object[] items; private int addIndex,removeIndex,count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size){ items = new Object[size]; } //添加一个元素,若是数组满,则添加线程进入等待状态,直到有空位 public void add(T t) throws InterruptedException{ lock.lock(); try { while (count == items.length){ notFull.await(); } items[addIndex] = t; if (++addIndex == items.length){ addIndex = 0; } ++count; notEmpty.signal(); }finally { lock.unlock(); } } //由头部删除一个元素,若是数组空,则删除线程进入等待状态,直到有添加元素。 public T remove() throws InterruptedException{ lock.lock(); try { while (count == 0){ notEmpty.await(); } Object x = items[removeIndex]; if (++removeIndex == items.length){ removeIndex = 0; } --count; notFull.signal(); return (T) x; }finally { lock.unlock(); } } }
在添加和删除方法中使用 while 循环而非 if判断,目的是防止过早或意外的通知,只有条件符合才可以退出循环。安全
ConditionObject 是同步器 AbstractQueuedSynchronizer的内部类,由于Condition的操做须要获取相关联的锁。每一个Condition对象都包含着一个队列,该队列是Condition对象实现 等待/通知功能的关键。并发
等待队列是一个FIFO的队列,在队列中的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,若是一个线程调用了Condition.await()方法,那么该线程将会释放锁、构形成节点加入等待队列并进入等待状态。ui
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。 Condition拥有首尾节点的引用,而新增节点只须要将原有的尾节点 nextWaiter 指向它,而且更新尾节点便可。上述节点应用更新的过程并无使用CAS保证,缘由在于调用 await()方法的线程一定是获取了锁的线程,这个过程是由锁来保证线程安全的。this
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。线程
调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程必定获取了Condition相关联的锁。code
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法将会当前线程构形成节点并加入等待队列中,而后释放同步状态,唤醒同步队列中的后继节点,而后当前线程会进入等待状态。对象
调用 Condition的signal() 方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点以前,会将节点移到同步队列中。
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
调用该方法的前置条件是当前线程必须获取了锁,能够看到signal()方法进行了IsHeldExclusively()检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程在使用LockSupport唤醒该节点的线程。
被唤醒后的线程,将从await()方法中的while循环中退出,进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态以后,被唤醒的线程就爱哪根葱先前调用的await()方法返回,此时该线程已经成功地获取了锁。
Condition的signalAll()方法,至关于对等待队列中的每一个节点均执行一次signal()方法,效果就是将等待队列中全部节点所有移动到同步队列中,并唤醒每一个节点的线程。