通常咱们在使用锁的Condition时,咱们通常都是这么使用,以ReentrantLock为例,java
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); try{ condition.await(); }finally{ lock.unlock(); } lock.lock(); try{ condition.signal(); }finally{ lock.unlock(); }
从上面能够知道,咱们调用Condition的await和signal方法必须是在获取获得锁的状况下,首先咱们以这个为基础,先不论是如何获取获得锁的,那么上面的程序在condition.await()时阻塞当前调用的线程,而调用 condition.signal()方法的时候可能唤起一个正在await阻塞的线程,我这里说的是可能不是必定。为何这么说,咱们来看下await()方法主要作了什么事情。node
下面是await 方法的在jdk8的源码,less
下面是await 方法的在jdk8的源码, public final void await() throws InterruptedException { if (Thread.interrupted()) // ① throw new InterruptedException(); Node node = addConditionWaiter(); //② int savedState = fullyRelease(node); //③ int interruptMode = 0; while (!isOnSyncQueue(node)) { //④ LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // ⑤ break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //⑥ interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled ⑦ unlinkCancelledWaiters(); if (interruptMode != 0) // ⑧ reportInterruptAfterWait(interruptMode); }
执行过程以下,
① ,判断当前显示是否被interrupt? 是的话,会抛出中断异常InterruptedException,
② ,调用addConditionWaiter方法,主要是new 一个以当前线程为数据的节点Node,而后添加到condition条件队列里。
③ ,调用fullyRelease方法,该方法内部调用release方法,而release方法主要是在AQS队列里面唤起第一个节点(即head的next节点)的线程(若是head后继节点存在的话)。这时,在ASQ同步器内至少有2个活动的线程(一个是当前线程(多是头节点),另外一个是唤起的线程(若是存在))。若是唤起失败,会抛异常IllegalMonitorStateException。注:当存在唤起的线程的时候,这个线程就能够去争取获取锁。
④ ,经过isOnSyncQueue方法判断该节点是否在AQS队列中,当调用await时候,确定不在AQS上,由于addConditionWaiter方法是new 一个新的Node.接着会进入while循环里面。调用 LockSupport.park(this);阻塞当前线程,至关于释放锁。
⑤ ,当当前线程被唤起的时候(多是 另外一个await线程唤起的第一个节点多是这个线程,或者在signal下,前驱节点已经cancel时,第一个firstWaiter节点是该当前节点),须要判断是否被中断,存储在interruptMode,若是被中断则break,不然checkInterruptWhileWaiting返回0,那么会接着判断node节点是否在AQS中,若是仍是不在的话,park当前线程,不然跳出while循环。那么node节点是何时被加入到AQS上的,答案是在signal方法上。
⑥ ,当node节点在AQS队列时,咱们须要获取锁,只有当前线程的节点Node在AQS队列上,才能去争取锁。争取锁就是经过调用acquireQueued方法。等下来分析下acquireQueued方法。
⑦ , 若是当前节点的node.nextWaiter不为空,说明还有其余线程在该condition上,而且当前的线程已经获取锁,接着清除条件队列上的cancel类型节点
⑧ , 若是interruptMode 是InterruptedException类型或者REINTERRUPT类型。则进行相应的抛中断异常或者线程自我中断标志位设置。优化
接着,来分析下signal方法ui
public final void signal() { if (!isHeldExclusively()) // ① throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); // ② }
执行过程以下,
① ,若是当前线程不是持有该condition的锁,那么执行抛IllegalMonitorStateException异常。
② ,调用doSignal方法,而且条件队列的首节点传入。this
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) // ① lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // ② }
① ,这种条件队列firstWaiter指针为next节点,由于当前的节点须要被移除条件队列,而且next节点为空,那么lastWaiter置为null,说明是空条件队列,接着把first.nextWaiter=null,说明移除了条件队列
② ,在这里有2步操做,一是transferForSignal,二是 first = firstWaiter,若是咱们当前first节点入AQS队列成功,那么transferForSignal返回true,则doSignal的while循环结束,
若是当前的first节点入AQS返回失败,则须要next的节点从新signal,保证有一个成功的firstWaiter节点入AQS队列。接着来看下transferForSignal 方法主要作了什么事情。线程
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // ① return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); // ② int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // ③ LockSupport.unpark(node.thread); return true; }
① ,首先传入的节点node是条件队列的第一个节点(在外部已经移除),改变其状态CONDITION,为初始状态0,若是改变失败,说明该节点已经不是条件节点,直接返回false,doSignal方法从新调用新firstWaiter节点入AQS队列,
② ,把首节点入AQS节点,enq()方法返回的是入节点的前驱节点。从这里核心方法能够知道,signal() 方法的做用其实只是把等待队列中第一个非取消节点转移到AQS的同步队列尾部。转移后的节点极可能正在在同步队列阻塞着,何时唤醒,取决于它的前驱节点是不是头节点。
③ ,若是当前前驱节点的waitStatus>0(说明是CANCELLED状态),前驱节点已经Canncel(说明前驱节点已经中断等状况),则能够调用LockSupport.unpark(node.thread)唤起线程,则await方法的park返回能够当即返回,预先将AQS同步队列中取消的节点移除掉,而不用等到获取同步状态失败的时候再去判断了,起到必定的优化做用。指针
最后来分析下获取锁方法 acquireQueued blog
执行以下:队列
final boolean acquireQueued(final Node node, int arg) { // ① boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //② setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // ③ interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
① ,传入node 为须要获取锁的节点,arg为以前state状态.
② ,若是传入的节点的前驱节点是head节点,说明当前节点是AQS队列的首节点,能够尝试去获取锁,即咱们须要是要实现的同步语义方法tryAcquire,若是同步语义获取锁成功,则设置当前头节点为头节点。
这里注意返回的值得语义是是否发生中断,而不是获取锁是否成功。
③ ,调用 shouldParkAfterFailedAcquire方法,该方法用来判断获取锁失败后是否须要park当前线程,若是须要park线程,则接着判断该线程是否有中断标志。
接着咱们来看下shouldParkAfterFailedAcquire 方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // ① /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { // ② /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //③ } return false; }
① 前驱节点pred的waitStatus为SIGNAL,说明当前节点有效,返回true,表明须要park当前线程,② 前驱节点已经取消,则删除去取消掉的前驱节点,返回false,外面继续for循环获取锁③ 处在该条件语句的前驱节点的waitStatus一定是 0,或者是传播PROPAGATE,则设置传播节点为SIGNAL,而后返回false,则接着去for循环获取锁,而且失败的时候,调用shouldParkAfterFailedAcquire时知道前驱为SIGNAL(以前由③设置),则须要park线程。从这里能够知道transferForSignal方法中,!compareAndSetWaitStatus(p, ws, Node.SIGNAL)语句,若是对其前驱节点设置Node.SIGNAL失败,则不须要等到acquireQueued去判断是否须要park线程,直接unpark线程便可