上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充。ReentrantLock#newCondition()能够建立Condition,在ReentrantLock加锁过程当中能够利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。Condition经常使用在基于异步通讯的同步机制实现中,好比dubbo中的请求和获取应答结果的实现。java
Condition中主要的方法有2个node
这里的await(),signal()必须在ReentrantLock#lock()和ReentrantLock#unlock()之间调用。异步
Condition的实现也是利用AbstractQueuedSynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,而后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成ReentrantLock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。源码分析
await()源码以下ui
public final void await() throws InterruptedException { //判断是否当前线程是否被中断中断则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //加入等待队列 Node node = addConditionWaiter(); //释放当前线程锁 int savedState = fullyRelease(node); int interruptMode = 0; //判断是否在同步阻塞队列,若是不在一直循环到被加入 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //判断是否被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //获取锁,若是获取中被中断则设置中断状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清除等待队列中被"激活"的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //若是当前线程被中断,处理中断逻辑 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
主要分如下几步this
(1)先判断是否当前线程是否被中断中断则抛出中断异常若是未中断调用addConditionWaiter()加入等待队列线程
(2)调用fullyRelease(node)释放锁使同步阻塞队列的下个节点线程能获取锁。code
(3)调用isOnSyncQueue(node)判断是否在同步阻塞队列,这里的加入同步阻塞队列操做是在另外一个线程调用signal()后加入,若是不在同步阻塞队列会进行阻塞直到被激活。队列
(4)若是被激活而后调用checkInterruptWhileWaiting(node)判断是否被中断并获取中断模式。get
(5)继续调用isOnSyncQueue(node)判断是否在同步阻塞队列。
(6)是则调用acquireQueued(node, savedState) 获取锁,这里若是获取不到也会被阻塞,获取不到缘由是在第一次调用isOnSyncQueue(node)前,可能另外一个线程已经调用signal()后加入到同步阻塞队列,而后调用acquireQueued(node, savedState) 获取不到锁并阻塞。acquireQueued(node, savedState)也会返回当前线程是否被中断,若是被中断设置中断模式。
(7)在激活后调用unlinkCancelledWaiters()清理等待队列的已经被激活的节点。
(8)最后判断当前线程是否被中断,若是被中断则对中断线程作处理。
下面来看下addConditionWaiter()实现
private Node addConditionWaiter() { //获取等待队列尾部节点 Node t = lastWaiter; //若是尾部状态不为CONDITION,若是已经被"激活",清理之,而后从新获取尾部节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //建立以当前线程为基础的节点,并将节点模式设置成CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //若是尾节点不存在,说明队列为空,将头节点设置成当前节点 if (t == null) firstWaiter = node; //若是尾节点存在,将此节点设置成尾节点的下个节点 else t.nextWaiter = node; //将尾节点设置成当前节点 lastWaiter = node; return node; }
addConditionWaiter()的逻辑很简单,就是建立以当前线程为基础的节点并把节点加入等待队列的尾部待其余线程处理。
下面来看下fullyRelease(Node node)实现
final int fullyRelease(Node node) { boolean failed = true; try { //获取阻塞队列中当前线程节点的锁状态值 int savedState = getState(); //释放当前线程节点锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { //释放失败讲节点等待状态设置成关闭 if (failed) node.waitStatus = Node.CANCELLED; } }
调用getState()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示屡次重入,而后调用release(savedState)释放全部锁,若是释放成功返回锁状态值。
下面来看下isOnSyncQueue(Node node)实现
final boolean isOnSyncQueue(Node node) { //判断当前节点是不是CONDITION或者前置节点是否为空若是为空直接返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //若是下个节点存在,则在同步阻塞队列中返回true if (node.next != null) // If has successor, it must be on queue return true; //遍历查找当前节点是否在同步阻塞队列中 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。
下面看下checkInterruptWhileWaiting(Node node)实现
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态THROW_IE和REINTERRUPT,THROW_IE是调用signal()前被中断返回,REINTERRUPT在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferAfterCancelledWait(node)取判断是那种中断状态,transferAfterCancelledWait(node)方法分2步
若是使用await()方法上面2步实际上是没什么做用其最后必定会返回false,由于await()被激活只能调用 signal()方法,而signal()方法确定已经将节点加入到同步阻塞队列中。因此以上逻辑是给await(long time, TimeUnit unit)等带超时激活方法用的。
acquireQueued(node, savedState)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkCancelledWaiters()方法
private void unlinkCancelledWaiters() { //获取等待队列头节点 Node t = firstWaiter; Node trail = null; while (t != null) { //获取下个节点 Node next = t.nextWaiter; //若是状态不为CONDITION说明已经加入阻塞队列须要清理掉 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else //获取下个节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
此方法就是从头开始查找状态不为CONDITION的节点并清理,状态不为CONDITION节点说明此节点已经加入到阻塞队列,已经不须要维护。
下面来看下reportInterruptAfterWait(interruptMode)方法
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //若是是THROW_IE模式直接抛出异常 if (interruptMode == THROW_IE) throw new InterruptedException(); //若是是REINTERRUPT模式标记线程中断由上层处理中断 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
此方法处理中断逻辑。若是是THROW_IE模式直接抛出异常,若是是REINTERRUPT模式标记线程中断由上层处理中断。
signal()源码以下
public final void signal() { //是否当前线程持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //通知"激活"头节点线程 if (first != null) doSignal(first); }
先调用isHeldExclusively()判断锁是否被当前线程持有,而后检查等待队列是否为空,不为空就是能够取第一个节点调用doSignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,因此上下文中带""的激活都是这种解释。
下面看下isHeldExclusively()实现
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
实现就是比较下当前线程和持有锁的线程是否同一个
下面看下doSignal(first)的实现
private void doSignal(Node first) { do { //头指头后移一位,若是后面的节点为空,则将尾指头也指向空,说明队列为空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //清空头节点的下个节点 first.nextWaiter = null; //若是"激活"失败者取下个继续,直到成功或者遍历完 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。
下面来看下transferForSignal(first)方法
final boolean transferForSignal(Node node) { //将CONDITION状态设置成0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //加入到同步阻塞队列 Node p = enq(node); int ws = p.waitStatus; //状态异常直接激活 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。