Condition将Object监控器方法( wait , notify和notifyAll )分解为不一样的对象,从而经过与任意Lock实现结合使用,从而使每一个对象具备多个等待集。 Lock替换了synchronized方法和语句的使用,而Condition替换了Object监视器方法的使用。java
条件(也称为条件队列或条件变量)为一个线程暂停执行(“等待”)直到另外一线程通知某些状态条件如今可能为真提供了一种方法。 因为对该共享状态信息的访问发生在不一样的线程中,所以必须对其进行保护,所以某种形式的锁与该条件相关联。 等待条件提供的关键属性是它自动释放关联的锁并挂起当前线程,就像Object.wait同样。node
Condition实例从本质上绑定到锁。 要获取特定Lock实例的Condition实例,请使用其newCondition()方法数据结构
条件队列是一个单向链表,在该链表中咱们使用nextWaiter属性来串联链表。可是,就像在同步队列中不会使用nextWaiter属性来串联链表同样,在条件队列是中,也并不会用到prev, next属性,它们的值都为null。并发
队列的信息包含如下几个部分:ide
队列中节点的信息包含如下几个部分:源码分析
结构图:优化
注意:ui
在条件队列中,咱们只须要关注一个值便可那就是CONDITION。它表示线程处于正常的等待状态,而只要waitStatus不是CONDITION,咱们就认为线程再也不等待了,此时就要从条件队列中出队。this
每建立一个Condtion对象就会对应一个Condtion队列,每个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中线程
通常状况下,等待锁的同步队列和条件队列条件队列是相互独立的,彼此之间并无任何关系。可是,当咱们调用某个条件队列的signal方法时,会将某个或全部等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程同样须要去争锁,若是没有抢到,则一样要被加到等待锁的同步队列中去,此时节点就从条件队列中被转移到同步队列中
注意图中标红色的线
可是,这里尤为要注意的是,node是被一个一个转移过去的,哪怕咱们调用的是signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在同步队列的末尾。
同时要注意的是,咱们在同步队列中只使用prev、next来串联链表,而不使用nextWaiter;咱们在条件队列中只使用nextWaiter来串联链表,而不使用prev、next.事实上,它们就是两个使用了一样的Node数据结构的彻底独立的两种链表。
所以,将节点从条件队列中转移到同步队列中时,咱们须要断开原来的连接(nextWaiter),创建新的连接(prev, next),这某种程度上也是须要将节点一个一个地转移过去的缘由之一。
同步队列是等待锁的队列,当一个线程被包装成Node加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操做是将获取到锁的节点设为新的dummy head,并将thread属性置为null)。
条件队列是等待在特定条件下的队列,由于调用await方法时,必然是已经得到了lock锁,因此在进入条件队列前线程必然是已经获取了锁;在被包装成Node扔进条件队列中后,线程将释放锁,而后挂起;当处于该队列中的线程被signal方法唤醒后,因为队列中的节点在以前挂起的时候已经释放了锁,因此必须先去再次的竞争锁,所以,该节点会被添加到同步队列中。所以,条件队列在出队时,线程并不持有锁。
条件队列:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到同步队列
同步队列:入队时没有锁 -> 在队列中争锁 -> 离开队列时得到了锁
例如,假设咱们有一个有界缓冲区,它支持put和take方法。 若是尝试在空缓冲区上进行take ,则线程将阻塞,直到有可用项为止。 若是尝试在完整的缓冲区上进行put ,则线程将阻塞,直到有可用空间为止。 咱们但愿继续等待put线程,并在单独的等待集中take线程,以便咱们可使用仅当缓冲区中的项目或空间可用时才通知单个线程的优化。 这可使用两个Condition实例来实现一个典型的生产者-消费者模型。这里在同一个lock锁上,建立了两个条件队列fullCondition, notFullCondition。当队列已满,没有存储空间时,put方法在notFull条件上等待,直到队列不是满的;当队列空了,没有数据可读时,take方法在notEmpty条件上等待,直到队列不为空,而notEmpty.signal()和notFull.signal()则用来唤醒等待在这个条件上的线程。
public class BoundedQueue { /** * 生产者容器 */ private LinkedList<Object> buffer; /** * 容器最大值是多少 */ private int maxSize; /** * 锁 */ private Lock lock; /** * 满了 */ private Condition fullCondition; /** * 不满 */ private Condition notFullCondition; BoundedQueue(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<Object>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } /** * 生产者 * * @param obj * @throws InterruptedException */ public void put(Object obj) throws InterruptedException { //获取锁 lock.lock(); try { while (maxSize == buffer.size()) { System.out.println(Thread.currentThread().getName() + "此时队列满了,添加的线程进入等待状态"); // 队列满了,添加的线程进入等待状态 notFullCondition.await(); } buffer.add(obj); //通知 fullCondition.signal(); } finally { lock.unlock(); } } /** * 消费者 * * @return * @throws InterruptedException */ public Object take() throws InterruptedException { Object obj; lock.lock(); try { while (buffer.size() == 0) { System.out.println(Thread.currentThread().getName() + "此时队列空了线程进入等待状态"); // 队列空了线程进入等待状态 fullCondition.await(); } obj = buffer.poll(); //通知 notFullCondition.signal(); } finally { lock.unlock(); } return obj; } public static void main(String[] args) { // 初始化最大能放2个元素的队列 BoundedQueue boundedQueue = new BoundedQueue(2); for (int i = 0; i < 3; i++) { Thread thread = new Thread(() -> { try { boundedQueue.put("元素"); System.out.println(Thread.currentThread().getName() + "生产了元素"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setName("线程" + i); thread.start(); } for (int i = 0; i < 3; i++) { Thread thread = new Thread(() -> { try { boundedQueue.take(); System.out.println(Thread.currentThread().getName() + "消费了元素"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setName("线程" + i); thread.start(); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出结果:
Condition接口中的方法
实现可中断条件等待,其实咱们以上案例是利用ReentrantLock来实现的生产者消费者案例,进去看源码发现其实实现该方法的是 AbstractQueuedSynchronizer 中ConditionObject实现的
将节点添加进同步队列中,并要么当即唤醒线程,要么等待前驱节点释放锁后将本身唤醒,不管怎样,被唤醒的线程要从哪里恢复执行呢?调用了await方法的地方
中断模式interruptMode这个变量记录中断事件,该变量有三个值:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加节点到条件队列中 Node node = addConditionWaiter(); // 释放当前线程所占用的锁,保存当前的锁状态 int savedState = fullyRelease(node); int interruptMode = 0; // 若是当前队列不在同步队列中,说明刚刚被await, 尚未人调用signal方法, // 则直接将当前线程挂起 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 线程挂起的地方 // 线程将在这里被挂起,中止运行 // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了 // 因此检查下线程被唤醒的缘由,若是是由于中断被唤醒,则跳出while循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁, // 抢到锁就返回,抢不到锁就继续被挂起。所以,当await()方法返回时, // 必然是保证了当前线程已经持有了lock锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter() 方法是封装一个节点将该节点放入条件队列中
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 若是尾节点被cancel了,则先遍历整个链表,清除全部被cancel的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 将当前线程包装成Node扔进条件队列 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是当前节点为空值那么新建立的node节点就是第一个等待节点 if (t == null) firstWaiter = node; // 若是当前节点不为空值那么新建立的node节点就加入到当前节点的尾部节点的下一个 else t.nextWaiter = node; lastWaiter = node; // 尾部节点指向当前节点 return node; // 返回新加入的节点 }
注意:
private void unlinkCancelledWaiters() { // 获取队列的头节点 Node t = firstWaiter; Node trail = null; // 当前节点不为空 while (t != null) { // 获取下一个节点 Node next = t.nextWaiter; // 若是当前节点不是条件节点 if (t.waitStatus != Node.CONDITION) { // 在队列中取消当前节点 t.nextWaiter = null; if (trail == null) // 队列的头节点是当前节点的下一个节点 firstWaiter = next; else // trail的 nextWaiter 指向当前节点t的下一个节点 // 由于此时t节点已经被取消了 trail.nextWaiter = next; // 若是t节点的下一个节点为空那么lastWaiter指向trail if (next == null) lastWaiter = trail; } else // 若是是条件节点 trail 指向当前节点 trail = t; // 循环赋值遍历 t = next; } }
fullyRelease(node) 方法释放当前线程所占用的锁
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 若是释放成功 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) // 节点的状态被设置成取消状态,从同步队列中移除 node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { // 尝试获取锁,若是获取成功,唤醒后续线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
线程唤醒后利用checkInterruptWhileWaiting方法检测中断模式
这里假设已经发生过中断,则Thread.interrupted()方法必然返回true,接下来就是用transferAfterCancelledWait进一步判断是否发生了signal。
// 检查是否有中断,若是在发出信号以前被中断,则返回THROW_IE; // 在发出信号以后,则返回REINTERRUPT;若是没有被中断,则返回0。 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
只要一个节点的waitStatus仍是Node.CONDITION,那就说明它尚未被signal过。
因为如今咱们分析状况1,则当前节点的waitStatus必然是Node.CONDITION,则会成功执行compareAndSetWaitStatus(node, Node.CONDITION, 0),将该节点的状态设置成0,而后调用enq(node)方法将当前节点添加进同步队列中,而后返回true。
注意: 咱们此时并无断开node的nextWaiter,因此最后必定不要忘记将这个连接断开。
再回到transferAfterCancelledWait调用处,可知,因为transferAfterCancelledWait将返回true,如今checkInterruptWhileWaiting将返回THROW_IE,这表示咱们在离开await方法时应当要抛出THROW_IE异常。
// .... while (!isOnSyncQueue(node)) { LockSupport.park(this); // 线程挂起的地方 // 线程将在这里被挂起,中止运行 // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了 // 因此检查下线程被唤醒的缘由,若是是由于中断被唤醒,则跳出while循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁, // 抢到锁就返回,抢不到锁就继续被挂起。所以,当await()方法返回时, // 必然是保证了当前线程已经持有了lock锁 // 咱们这里假设它获取到了锁了,因为咱们这时 // 的interruptMode = THROW_IE,则会跳过if语句。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 遍历链表了,把链表中全部没有在等待的节点都拿出去,因此这里调用 // unlinkCancelledWaiters方法,该方法咱们在前面await()第一部分的分析 // 的时候已经讲过了,它就是简单的遍历链表,找到全部waitStatus // 不为CONDITION的节点,并把它们从队列中移除 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 这里咱们的interruptMode=THROW_IE,说明发生了中断, // 则将调用reportInterruptAfterWait if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } } // 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedException private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
interruptMode如今为THROW_IE,则咱们将执行break,跳出while循环。接下来咱们将执行acquireQueued(node, savedState)进行争锁,注意,这里传入的须要获取锁的重入数量是savedState,即以前释放了多少,这里就须要再次获取多少
状况一总结:
所以:
由此能够看出,一个调用了await方法挂起的线程在被中断后不会当即抛出InterruptedException,而是会被添加到同步队列中去争锁,若是争不到,仍是会被挂起;
只有争到了锁以后,该线程才得以从同步队列和条件队列中移除,最后抛出InterruptedException。
因此说,一个调用了await方法的线程,即便被中断了,它依旧仍是会被阻塞住,直到它获取到锁以后才能返回,并在返回时抛出InterruptedException。中断对它意义更多的是体如今将它从条件队列中移除,加入到同步队列中去争锁,从这个层面上看,中断和signal的效果其实很像,所不一样的是,在await()方法返回后,若是是由于中断被唤醒,则await()方法须要抛出InterruptedException异常,表示是它是被非正常唤醒的(正常唤醒是指被signal唤醒)。
final boolean transferAfterCancelledWait(Node node) { // 线程A执行到这里,CAS操做将会失败 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // 因为中断发生前,线程已经被signal了,则这里只须要等待线程成功进入同步便可 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
因为transferAfterCancelledWait返回了false,则checkInterruptWhileWaiting方法将返回REINTERRUPT,这说明咱们在退出该方法时只须要再次中断由于signal后条件队列加入到了同步队列中因此node.nextWaiter为空了,因此直接走到了reportInterruptAfterWait(interruptMode)方法
if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 这里咱们的interruptMode=THROW_IE,说明发生了中断, // 则将调用reportInterruptAfterWait if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } } // 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedException private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); // 这里并无抛出中断异常,而只是将当前线程再中断一次。 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
状况二中的第一种状况总结:
最后咱们经过reportInterruptAfterWait将当前线程再次中断,可是不会抛出InterruptedException
b. 被唤醒时,并无发生中断,可是在抢锁的过程当中发生了中断
此状况就是已经被唤醒了那么isOnSyncQueue(node)返回true,在同步队列中了就,退出了while循环。
退出while循环后接下来仍是利用acquireQueued争锁,由于前面没有发生中断,则interruptMode=0,这时,若是在争锁的过程当中发生了中断,则acquireQueued将返回true,则此时interruptMode将变为REINTERRUPT。
接下是判断node.nextWaiter != null,因为在调用signal方法时已经将节点移出了队列,全部这个条件也不成立。
最后就是汇报中断状态了,此时interruptMode的值为REINTERRUPT,说明线程在被signal后又发生了中断,这个中断发生在抢锁的过程当中,这个中断来的太晚了,所以咱们只是再次自我中断一下。
状况二中的第二种状况总结:
3.状况三一直没发生中断
直接正常返回
await方法总结
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) // 发生了中断后线程依旧留在了条件队列中,将会再次被挂起 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
因而可知,awaitUninterruptibly()全程忽略中断,即便是当前线程由于中断被唤醒,该方法也只是简单的记录中断状态,而后再次被挂起(由于并无并无任何操做将它添加到同步队列中)要使当前线程离开条件队列去争锁,则必须是发生了signal事件。
最后,当线程在获取锁的过程当中发生了中断,该方法也是不响应,只是在最终获取到锁返回时,再自我中断一下。能够看出,该方法和“中断发生于signal以后的”REINTERRUPT模式的await()方法很像
方法总结:
该方法几乎和await()方法同样,只是多了超时时间的处理该方法的主要设计思想是,若是设定的超时时间还没到,咱们就将线程挂起;超过等待的时间了,咱们就将线程从条件队列转移到同步对列中。
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
在awaitNanos(long nanosTimeout)的基础上多了对于超时时间的时间单位的设置,可是在内部实现上仍是会把时间转成纳秒去执行。
能够看出,这两个方法主要的差异就体如今返回值上面,awaitNanos(long nanosTimeout)的返回值是剩余的超时时间,若是该值大于0,说明超时时间还没到,则说明该返回是由signal行为致使的,而await(long time, TimeUnit unit)的返回值就是transferAfterCancelledWait(node)的值,咱们知道,若是调用该方法时,node尚未被signal过则返回true,node已经被signal过了,则返回false。所以当await(long time, TimeUnit unit)方法返回true,则说明在超时时间到以前就已经发生过signal了,该方法的返回是由signal方法致使的而不是超时时间。
public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
awaitUntil(Date deadline)方法与上面的几种带超时的方法也基本相似,所不一样的是它的超时时间是一个绝对的时间
public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
只唤醒一个节点
public final void signal() { // getExclusiveOwnerThread() == Thread.currentThread(); 当前线 // 程是否是独占线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取第一个阻塞线程节点 Node first = firstWaiter; // 条件队列是否为空 if (first != null) doSignal(first); } // 遍历整个条件队列,找到第一个没有被cancelled的节点,并将它添加到条件队列的末尾 // 若是条件队列里面已经没有节点了,则将条件队列清空 private void doSignal(Node first) { do { // 将firstWaiter指向条件队列队头的下一个节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将条件队列原来的队头从条件队列中断开,则此时该节点成为一个孤立的节点 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
方法总结:
调用signal()方法会从当前条件队列中取出第一个没有被cancel的节点添加到sync队列的末尾。
唤醒全部的节点
public final void signalAll() { // getExclusiveOwnerThread() == Thread.currentThread(); 当前线 // 程是否是独占线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取第一个阻塞线程节点 Node first = firstWaiter; // 条件队列是否为空 if (first != null) doSignalAll(first); } // 移除并转移全部节点 private void doSignalAll(Node first) { // 清空队列中全部数据 lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } // 将条件队列中的节点一个一个的遍历到同步队列中 final boolean transferForSignal(Node node) { // 若是该节点在调用signal方法前已经被取消了,则直接跳过这个节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 利用enq方法将该节点添加至同步队列的尾部 Node p = enq(node); // 返回的是前驱节点,将其设置SIGNAL以后,才会挂起 // 当前节点 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
在transferForSignal方法中,咱们先使用CAS操做将当前节点的waitStatus状态由CONDTION设为0,若是修改不成功,则说明该节点已经被CANCEL了,则咱们直接返回,操做下一个节点;若是修改为功,则说明咱们已经将该节点从等待的条件队列中成功“唤醒”了,但此时该节点对应的线程并无真正被唤醒,它还要和其余普通线程同样去争锁,所以它将被添加到同步队列的末尾等待获取锁 。
方法总结:
以上即是Condition的分析,下一篇文章将是并发容器类的分析,若有错误之处,帮忙指出及时更正,谢谢, 若是喜欢谢谢点赞加收藏加转发(转发注明出处谢谢!!!)