3.6 ConditionObject java
AbstractQueuedSynchronizer的内部类ConditionObject实现了Condition接口。Condition接口提供了跟Java语言内置的monitor机制相似的接口:await()/signal()/signalAll(),以及一些支持超时和回退的await版本。能够将任意个数的ConcitionObject关联到对应的synchronizer,例如经过调用ReentrantLock.newCondition()方法便可构造一个ConditionObject实例。每一个ConditionObject实例内部都维护一个ConditionQueue,该队列的元素跟AbstractQueuedSynchronizer的WaitQueue同样,都是Node对象。 node
ConditionObject的await()代码以下: 并发
Java代码
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
以上代码中,能够看出ConditionObject的await语义跟Java语言内置的monitor机制是很是类似的(详见:http://whitesock.iteye.com/blog/162344 )。首先addConditionWaiter()将当前线程加入到ConditionQueue中,而后fullyRelease(node)释放掉跟ConditionObject关联的synchronizer锁。若是某个线程在没有持有对应的synchronizer锁的状况下调用某个ConditionObject对象的await()方法,那么跟Object.wait()同样会抛出IllegalMonitorStateException。接下来while (!isOnSyncQueue(node)) {...}会保证在其它线程调用了该ConditionObject的signal()/siangalAll()以前,当前线程一直被阻塞(signal()/siangalAll()的行为稍后会介绍)。在被signal()/siangalAll()唤醒以后,await()经过acquireQueued(node, savedState)确保再次得到synchronizer的锁。 app
ConditionObject的signal()代码以下: 函数
Java代码
- public final void signal() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node first = firstWaiter;
- if (first != null)
- doSignal(first);
- }
-
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
那么跟await()同样,若是某个线程在没有持有对应的synchronizer锁的状况下调用某个ConditionObject对象的signal()/siangalAll()方法,会抛出IllegalMonitorStateException。signal()主要的行为就是将ConditionQueue中对应的Node实例transfer到AbstractQueuedSynchronizer的WaitQueue中,以便在synchronizer release的过程当中,该Node对应的线程可能被唤醒。 ui
3.7 Timeout & Cancellation this
AbstractQueuedSynchronizer的acquireQueued()和doAcquire***()系列方法在acquire失败(超时或者中断)后,都会调用cancelAcquire(Node node)方法进行清理,其代码以下: atom
Java代码
- private void cancelAcquire(Node node) {
- // Ignore if node doesn't exist
- if (node == null)
- return;
-
- node.thread = null;
-
- // Skip cancelled predecessors
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
-
- // predNext is the apparent node to unsplice. CASes below will
- // fail if not, in which case, we lost race vs another cancel
- // or signal, so no further action is necessary.
- Node predNext = pred.next;
-
- // Can use unconditional write instead of CAS here.
- // After this atomic step, other Nodes can skip past us.
- // Before, we are free of interference from other threads.
- node.waitStatus = Node.CANCELLED;
-
- // If we are the tail, remove ourselves.
- if (node == tail && compareAndSetTail(node, pred)) {
- compareAndSetNext(pred, predNext, null);
- } else {
- // If successor needs signal, try to set pred's next-link
- // so it will get one. Otherwise wake it up to propagate.
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- compareAndSetNext(pred, predNext, next);
- } else {
- unparkSuccessor(node);
- }
-
- node.next = node; // help GC
- }
- }
须要注意的是, cancelAcquire(Node node)方法是可能会被并发调用。while (pred.waitStatus > 0) {...}这段循环的做用就是清除当前Node以前的已经被标记为取消的节点,可是head节点除外(由于head节点保证不会被标记为Node.CANCELLED)。这段循环初看起来有并发问题,可是推敲一下以后发现:循环过程当中函数参数node的waitStatus不会大于0,所以即便是多个线程并发执行这个循环,那么这些线程处理的都只是链表中互不重叠的一部分。接下来在node.waitStatus = Node.CANCELLED执行完毕以后,后续的操做都必需要避免并发问题。 spa
关于处理线程中断, ConditionObject的await()/signal()/signalAll()等方法符合JSR 133: Java Memory Model and Thread Specification Revision中规定的语义:若是中断在signal以前发生,那么await必须在从新得到synchronizer的锁以后,抛出InterruptedException;若是中断发生在signal以后发生,那么await必需要设定当前线程的中断状态,而且不能抛出InterruptedException。 .net
4 Reference
The java.util.concurrent Synchronizer Framework
The Art of Multiprocessor Programming