扫描下方二维码或者微信搜索公众号
菜鸟飞呀飞
,便可关注微信公众号,阅读更多Spring源码分析
和Java并发编程
文章。java
队列同步器AQS是经过管程模型来实现的,在管程模型中,咱们提到了两个队列:
入口等待队列
和条件变量等待队列
。在AQS中同步队列
对应管程模型中的入口等待队列
,条件等待队列
对应管程模型中的条件变量等待队列
。关于AQS的同步队列的设计原理及源码实现能够阅读这两篇文章:队列同步器(AQS)的设计原理 和 队列同步器(AQS)源码分析。今天将详细分析AQS中等待队列的设计原理和源码。(关于管程的介绍能够参考这篇文章:管程:并发编程的基石 )node
互斥
与同步
,互斥指的是同一时刻只容许一个线程访问共享资源,这一点AQS的同步队列已经帮助咱们解决了。同步指的是线程间如何进行通讯和协做,那么AQS又是如何来解决同步问题的呢?答案就是今天的主角:Condition
。ConditionObject
是队列同步器AbstractQueuedSynchronizer(后面简称AQS)的一个内部类。它须要与Lock一块儿使用,经过Lock.newCondition()
来建立实例。await()、signal()、signalAll()
三个方法,它们分别与Object类的这三个方法对应,也是用来实现线程以前的通讯的,可是它们在功能和使用法方式上存在部分差别。具体差别能够参考下表。(表格来源于《Java并发编程的艺术》一书第5章第6节)对比项 | Object | Condition |
---|---|---|
使用前提 | 使用synchronize获取到锁 | 使用Lock实例的lock()方法获取到锁 |
使用方式 | object.wait()、object.notify()等 | 须要使用Lock接口的实例对象来建立,Lock.newCondition() |
等待队列个数 | 支持1个 | 能够支持多个,使用Lock实例new多个Condition便可 |
线程进入等待队列后是否响应中断 | 不支持 | 支持 |
超时等待 | 支持 | 支持 |
线程释放锁后等待到未来的某个时间点 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持,notify() | 支持,signal() |
唤醒等待队列中的全部线程 | 支持,notifyAll() | 支持,signalAll() |
属性名 | 做用 |
---|---|
Node prev | 同步队列中,当前节点的前一个节点,若是当前节点是同步队列的头结点,那么prev属性为null |
Node next | 同步队列中,当前节点的后一个节点,若是当前节点是同步队列的尾结点,那么next属性为null |
Node thread | 当前节点表明的线程,若是当前线程获取到了锁,那么当前线程所表明的节点必定处于同步队列的队首,且thread属性为null,至于为何要将其设置为null,这是AQS特地设计的。 |
int waitStatus | 当前线程的等待状态,有5种取值。0表示初始值,1表示线程被取消,-1表示当前线程处于等待状态,-2表示节点处于等待队列中,-3表示下一次共享式同步状态获取将会无条件地被传播下去 |
Node nextWaiter | 等待队列中,该节点的下一个节点 |
等待队列
的实现主要依靠的是Node节点中nextWaiter
属性和waitStatus
属性来实现的,其中waitStatus=-2
时,表示线程是处于等待队列中
。(同步队列依靠prev和next属性来实现双向链表)。Condition包含两个属性:firstWaiter
和lastWaiter
,分别表示等待队列的队首和队尾。等待队列遵循先进先出的原则(FIFO)。下图为Condition等待队列的示意图。添加到等待队列
中;而后释放锁;最后调用LockSopport.park()
方法将本身挂起。可使用以下示意图表示。firstWaiter
节点,而后将firstWaiter
节点加入到同步队列
中。示意图以下。全部Node节点移到同步队列中
。理解了上面的等待队列的数据结构和实现原理,接下来就结合源码看看具体的实现。接下来将分析await()方法和signal()方法的源码。编程
当调用condition.await()方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的await()方法。该方法的源码以下。设计模式
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到等待队列中
Node node = addConditionWaiter();
// 彻底释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断同步节点是否在同步队列中,
// 若是在当前线程释放锁后,锁被其余线程抢到了,此时当前节点就不在同步队列中了。
// 若是锁没有被抢占到,节点就会再同步队列中(当前线程是持有锁的线程,因此它是头结点)
while (!isOnSyncQueue(node)) {
// 若是节点不在同步队列中,将当前线程park
LockSupport.park(this);
/** * 当被唤醒之后,接着从下面开始执行。醒来后会判断本身在等待过程当中有没有被中断过。 * checkInterruptWhileWaiting()方法返回0表示没有被中断过 * 返回-1表示须要抛出异常 * 返回1表示须要重置中断标识 */
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 若是线程在同步队列中,那么就尝试去获取锁,若是获取不到,就会加入到同步队列中,并阻塞
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 当条件等待队列中还有其余线程在等待时,须要判断条件等待队列中有没有线程被取消,若是有,则将它们清除
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
addConditionWaiter()
方法,将本身封装成一个Node后,加入到等待队列中。而后调用fullyRelease()
方法释放锁,接着经过判断当前线程是否在等待队列中,若是不在等待队列中,就将本身park
。lastWaiter
属性等于当前线程所表明的节点。(注意:此时若是等待队列没有进行初始化时,会先进行初始化)。addConditionWaiter()方法的源码以下。/** * Adds a new waiter to wait queue. * @return its new wait node * 将当前线程加入到等待队列中 */
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立一个节点,节点的waitStatus等于-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 若是等待队列尚未初始化,即等跌队列中尚未任何元素,那么此时firstWaiter和lastWaiter均为null
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 令旧的队尾节点nextWaiter属性等于当前线程的节点,这样就维护了队列的先后关系
lastWaiter = node;
return node;
}
复制代码
fullyRelease()
方法,从方法名看,就知道它的做用是彻底释放锁
。这里为何要彻底释放锁呢?由于对于重入锁而言,锁可能被重入了屡次,此时同步变量state的值大于1
,而调用await()方法时,要让当前线程将锁释放掉,因此须要将state的值减为0,所以这里取名为fullyRelease()。fullyRelease()最终仍是调用AQS的release()
方法来释放锁。fullyRelease()方法的源码以下。final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取同步变量state的值
int savedState = getState();
// 释放锁,注意此时将同步变量的值传入进去了,若是是重入锁,且被重入过,那么此时savedState的值大于1
// 此时释放锁时,会将同步变量state的值减为0。(一般可重入锁在释放锁时,每次只会将state减1,重入了几回就要释放几回,在这里是一会儿所有释放)。
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 当线程没有获取到锁时,调用该方法会释放失败,会抛出异常。
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
复制代码
isOnSyncQueue()
判断当前线程的节点是否是在同步队列
中,若是isOnSyncQueue()方法返回true,表示节点在同步队列中,若是返回false,表示当前节点不在同步队列中。当返回false时,会进入到while循环中
,此时会调用LockSupport.park()
方法,让当前线程挂起。因为当前线程在释放锁之后会唤醒同步队列中的线程去抢锁,若是有线程抢到锁,那么当前线程就确定不会在同步队列了(同步队列的首节点变化了),因此此时isOnSyncQueue()方法大几率返回的是false,所以会进入到while()方法中。若是当前线程在同步队列中,或者从park()处醒来后(醒来后会出如今同步队列中,由于signal()或者signalAll()方法会将节点移到同步队列),就会执行到await()方法后面的逻辑,即acquireQueued()
方法,该方法就是去尝试获取锁,若是获取到就会返回,获取不到就阻塞。isOnSyncQueue()
的源码以及注释以下。final boolean isOnSyncQueue(Node node) {
// 若是节点的waitStatus=-2时,节点确定不在同步队列中,由于只有在等待队列时,才会为-2。
// 若是节点的前驱节点为空时,有两种状况:
// 1. 当前节点是同步队列的首节点,首节点是已经获取到锁的线程,能够认为线程不在同步队列中
// 2. 当前节点在等待队列中,等待队列中的节点在建立时,没有给prev属性赋值
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 在上一个判断的前提条件下,若是后置节点不为空,那么当前节点确定在同步队列中。
if (node.next != null) // If has successor, it must be on queue
return true;
/* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
// 以上状况均不属于,那么就从同步队列的尾部开始遍历,找到同步队列中是否含有node节点
return findNodeFromTail(node);
}
复制代码
1.
将本身加到等待队列;2.
释放锁;3.
将本身park()。在Condition接口中还提供了几个重载的await()方法,它们在await()方法的基础上添加了部分功能,例如超时等待、不响应中断的等待等功能,但大体逻辑和await()方法相似,有兴趣的朋友能够去研究下。condition.signal()
方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的signal()方法
。该方法的源码以下。public final void signal() {
// 先判断当前线程有没有获取到锁,若是没有获取到锁就来调用signal()方法,就会抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
复制代码
doSignal()
方法中实现的。doSignal()方法的源码以下。private void doSignal(Node first) {
do {
// 令firstWaiter等于条件等待队列中的下一个节点。
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
/** *调用transferForSignal()方法,是将节点从条件等待队列中移到同步队列中. * 当transferForSignal()返回true时,表示节点被成功移到同步队列了。返回false,表示移动失败,当节点所表示的线程被取消时,会返回false * 当transferForSignal()返回true时,do...while循环结束。返回false时,继续。为何要这样呢? * 由于当transferForSignal()返回false表示条件等待队列中的,队列的头结点的状态时取消状态,不能将它移到同步队列中,随意须要继续从条件等待队列找没有被取消的节点。 */
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
复制代码
从等待队列中移除
,而后调用transferForSignal()
方法将其加入到同步队列
中,当transferForSignal()返回true时,表示节点被成功移到了同步队列中,返回false表示移动失败,只有一种状况会移动失败,那就是线程被取消了。transferForSignal()方法的源码以下。/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) * 将节点从条件等待队列移到同步队列 */
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. */
// 将节点的waitStatus的值从-2改成-1。这里若是出现CAS失败,说明节点的waitStatus值被修改过,在条件等待队列中,只有当线程被取消后,才会去修改waitStatus
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
// 当加入到同步队列后,须要将当前节点的前一个节点的waitStatus设置为-1,表示队列中还有线程在等待
// 若是前驱节点的waitStatus大于0表示线程被取消,须要将当前线程唤醒
// 或者修改前驱节点的waitStatus是失败,也须要去唤醒当前线程
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
复制代码
signalAll()
方法的做用就是唤醒等待队列中的全部节点
,而signal()方法只唤醒等待队列的第一个节点。当调用condition.signalAll()方法时,会调用到AbstractQueuedSynchornizer中的内部类ConditionObject的signalAll()方法。该方法的源码以下。public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 核心逻辑在doSignalAll()方法
doSignalAll(first);
}
复制代码
doSignalAll()
方法。doSignalAll()方法的源码以下。private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 经过do...while循环,遍历等待队列的全部节点
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// transferForSignal()方法将节点移到到同步队列
transferForSignal(first);
first = next;
} while (first != null);
}
复制代码
do...while
循环,遍历等待队列的全部节点,循环调用transferForSignal()
方法,将等待队列中的节点所有移动到同步队列
中。Lock.newCondition()
方法就能建立Condition实例,屡次调用
Lock.newCondition()方法,那么就会建立多个条件等待队列
。LinkedBlockingQueue
就是经过Condition来实现的,有兴趣的朋友能够先去阅读下源码。