Java并发系列[4]----AbstractQueuedSynchronizer源码分析之条件队列

经过前面三篇的分析,咱们深刻了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列。咱们仍是拿公共厕所作比喻,同步队列是主要的排队区,若是公共厕所没开放,全部想要进入厕所的人都得在这里排队。而条件队列主要是为条件等待设置的,咱们想象一下若是一我的经过排队终于成功获取锁进入了厕所,但在方便以前发现本身没带手纸,碰到这种状况虽然很无奈,可是它也必须接受这个事实,这时它只好乖乖的出去先准备好手纸(进入条件队列等待),固然在出去以前还得把锁给释放了好让其余人可以进来,在准备好了手纸(条件知足)以后它又得从新回到同步队列中去排队。固然进入房间的人并不都是由于没带手纸,可能还有其余一些缘由必须中断操做先去条件队列中去排队,因此条件队列能够有多个,依不一样的等待条件而设置不一样的条件队列。条件队列是一条单向链表,Condition接口定义了条件队列中的全部操做,AbstractQueuedSynchronizer内部的ConditionObject类实现了Condition接口,下面咱们看看Condition接口都定义了哪些操做。node

 1 public interface Condition {
 2     
 3     //响应线程中断的条件等待
 4     void await() throws InterruptedException;
 5     
 6     //不响应线程中断的条件等待
 7     void awaitUninterruptibly();
 8     
 9     //设置相对时间的条件等待(不进行自旋)
10     long awaitNanos(long nanosTimeout) throws InterruptedException;
11     
12     //设置相对时间的条件等待(进行自旋)
13     boolean await(long time, TimeUnit unit) throws InterruptedException;
14     
15     //设置绝对时间的条件等待
16     boolean awaitUntil(Date deadline) throws InterruptedException;
17     
18     //唤醒条件队列中的头结点
19     void signal();
20     
21     //唤醒条件队列的全部结点
22     void signalAll();
23     
24 }

Condition接口虽然定义了这么多方法,但总共就分为两类,以await开头的是线程进入条件队列等待的方法,以signal开头的是将条件队列中的线程“唤醒”的方法。这里要注意的是,调用signal方法可能唤醒线程也可能不会唤醒线程,何时会唤醒线程这得看状况,后面会讲到,可是调用signal方法必定会将线程从条件队列中移到同步队列尾部。这里为了叙述方便,咱们先暂时不纠结这么多,统一称signal方法为唤醒条件队列线程的操做。你们注意看一下,await方法分为5种,分别是响应线程中断等待,不响应线程中断等待,设置相对时间不自旋等待,设置相对时间自旋等待,设置绝对时间等待;signal方法只有2种,分别是只唤醒条件队列头结点和唤醒条件队列全部结点的操做。同一类的方法基本上是相通的,因为篇幅所限,咱们不可能也不须要将这些方法所有仔细的讲到,只须要将一个表明方法搞懂了再看其余方法就可以举一反三。因此在本文中我只会细讲await方法和signal方法,其余方法不细讲但会贴出源码来以供你们参考。源码分析

1. 响应线程中断的条件等待学习

 1 //响应线程中断的条件等待
 2 public final void await() throws InterruptedException {
 3     //若是线程被中断则抛出异常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //将当前线程添加到条件队列尾部
 8     Node node = addConditionWaiter();
 9     //在进入条件等待以前先彻底释放锁
10     int savedState = fullyRelease(node);
11     int interruptMode = 0;
12     //线程一直在while循环里进行条件等待
13     while (!isOnSyncQueue(node)) {
14         //进行条件等待的线程都在这里被挂起, 线程被唤醒的状况有如下几种:
15         //1.同步队列的前继结点已取消
16         //2.设置同步队列的前继结点的状态为SIGNAL失败
17         //3.前继结点释放锁后唤醒当前结点
18         LockSupport.park(this);
19         //当前线程醒来后立马检查是否被中断, 若是是则表明结点取消条件等待, 此时须要将结点移出条件队列
20         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
21             break;
22         }
23     }
24     //线程醒来后就会以独占模式获取锁
25     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
26         interruptMode = REINTERRUPT;
27     }
28     //这步操做主要为防止线程在signal以前中断而致使没与条件队列断绝联系
29     if (node.nextWaiter != null) {
30         unlinkCancelledWaiters();
31     }
32     //根据中断模式进行响应的中断处理
33     if (interruptMode != 0) {
34         reportInterruptAfterWait(interruptMode);
35     }
36 }

当线程调用await方法的时候,首先会将当前线程包装成node结点放入条件队列尾部。在addConditionWaiter方法中,若是发现条件队列尾结点已取消就会调用unlinkCancelledWaiters方法将条件队列全部的已取消结点清空。这步操做是插入结点的准备工做,那么确保了尾结点的状态也是CONDITION以后,就会新建一个node结点将当前线程包装起来而后放入条件队列尾部。注意,这个过程只是将结点添加到同步队列尾部而没有挂起线程哦。ui

第二步:彻底将锁释放this

 1 //彻底释放锁
 2 final int fullyRelease(Node node) {
 3     boolean failed = true;
 4     try {
 5         //获取当前的同步状态
 6         int savedState = getState();
 7         //使用当前的同步状态去释放锁
 8         if (release(savedState)) {
 9             failed = false;
10             //若是释放锁成功就返回当前同步状态
11             return savedState;
12         } else {
13             //若是释放锁失败就抛出运行时异常
14             throw new IllegalMonitorStateException();
15         }
16     } finally {
17         //保证没有成功释放锁就将该结点设置为取消状态
18         if (failed) {
19             node.waitStatus = Node.CANCELLED;
20         }
21     }
22 }

将当前线程包装成结点添加到条件队列尾部后,紧接着就调用fullyRelease方法释放锁。注意,方法名为fullyRelease也就这步操做会彻底的释放锁,由于锁是可重入的,因此在进行条件等待前须要将锁所有释放了,否则的话别人就获取不了锁了。若是释放锁失败的话就会抛出一个运行时异常,若是成功释放了锁的话就返回以前的同步状态。spa

第三步:进行条件等待线程

 1 //线程一直在while循环里进行条件等待
 2 while (!isOnSyncQueue(node)) {
 3     //进行条件等待的线程都在这里被挂起, 线程被唤醒的状况有如下几种:
 4     //1.同步队列的前继结点已取消
 5     //2.设置同步队列的前继结点的状态为SIGNAL失败
 6     //3.前继结点释放锁后唤醒当前结点
 7     LockSupport.park(this);
 8     //当前线程醒来后立马检查是否被中断, 若是是则表明结点取消条件等待, 此时须要将结点移出条件队列
 9     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
10         break;
11     }
12 }
13 
14 //检查条件等待时的线程中断状况
15 private int checkInterruptWhileWaiting(Node node) {
16     //中断请求在signal操做以前:THROW_IE
17     //中断请求在signal操做以后:REINTERRUPT
18     //期间没有收到任何中断请求:0
19     return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
20 }
21 
22 //将取消条件等待的结点从条件队列转移到同步队列中
23 final boolean transferAfterCancelledWait(Node node) {
24     //若是这步CAS操做成功的话就代表中断发生在signal方法以前
25     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
26         //状态修改为功后就将该结点放入同步队列尾部
27         enq(node);
28         return true;
29     }
30     //到这里代表CAS操做失败, 说明中断发生在signal方法以后
31     while (!isOnSyncQueue(node)) {
32         //若是sinal方法尚未将结点转移到同步队列, 就经过自旋等待一下
33         Thread.yield();
34     }
35     return false;
36 }

在以上两个操做完成了以后就会进入while循环,能够看到while循环里面首先调用LockSupport.park(this)将线程挂起了,因此线程就会一直在这里阻塞。在调用signal方法后仅仅只是将结点从条件队列转移到同步队列中去,至于会不会唤醒线程须要看状况。若是转移结点时发现同步队列中的前继结点已取消,或者是更新前继结点的状态为SIGNAL失败,这两种状况都会当即唤醒线程,不然的话在signal方法结束时就不会去唤醒已在同步队列中的线程,而是等到它的前继结点来唤醒。固然,线程阻塞在这里除了能够调用signal方法唤醒以外,线程还能够响应中断,若是线程在这里收到中断请求就会继续往下执行。能够看到线程醒来后会立刻检查是不是因为中断唤醒的仍是经过signal方法唤醒的,若是是由于中断唤醒的一样会将这个结点转移到同步队列中去,只不过是经过调用transferAfterCancelledWait方法来实现的。最后执行完这一步以后就会返回中断状况并跳出while循环。设计

第四步:结点移出条件队列后的操做code

 1 //线程醒来后就会以独占模式获取锁
 2 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
 3     interruptMode = REINTERRUPT;
 4 }
 5 //这步操做主要为防止线程在signal以前中断而致使没与条件队列断绝联系
 6 if (node.nextWaiter != null) {
 7     unlinkCancelledWaiters();
 8 }
 9 //根据中断模式进行响应的中断处理
10 if (interruptMode != 0) {
11     reportInterruptAfterWait(interruptMode);
12 }
13 
14 //结束条件等待后根据中断状况作出相应处理
15 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
16     //若是中断模式是THROW_IE就抛出异常
17     if (interruptMode == THROW_IE) {
18         throw new InterruptedException();
19     //若是中断模式是REINTERRUPT就本身挂起
20     } else if (interruptMode == REINTERRUPT) {
21         selfInterrupt();
22     }
23 }

当线程终止了while循环也就是条件等待后,就会回到同步队列中。不论是由于调用signal方法回去的仍是由于线程中断致使的,结点最终都会在同步队列中。这时就会调用acquireQueued方法执行在同步队列中获取锁的操做,这个方法咱们在独占模式这一篇已经详细的讲过。也就是说,结点从条件队列出来后又是乖乖的走独占模式下获取锁的那一套,等这个结点再次得到锁以后,就会调用reportInterruptAfterWait方法来根据这期间的中断状况作出相应的响应。若是中断发生在signal方法以前,interruptMode就为THROW_IE,再次得到锁后就抛出异常;若是中断发生在signal方法以后,interruptMode就为REINTERRUPT,再次得到锁后就从新中断。blog

2.不响应线程中断的条件等待

 1 //不响应线程中断的条件等待
 2 public final void awaitUninterruptibly() {
 3     //将当前线程添加到条件队列尾部
 4     Node node = addConditionWaiter();
 5     //彻底释放锁并返回当前同步状态
 6     int savedState = fullyRelease(node);
 7     boolean interrupted = false;
 8     //结点一直在while循环里进行条件等待
 9     while (!isOnSyncQueue(node)) {
10         //条件队列中全部的线程都在这里被挂起
11         LockSupport.park(this);
12         //线程醒来发现中断并不会立刻去响应
13         if (Thread.interrupted()) {
14             interrupted = true;
15         }
16     }
17     if (acquireQueued(node, savedState) || interrupted) {
18         //在这里响应全部中断请求, 知足如下两个条件之一就会将本身挂起
19         //1.线程在条件等待时收到中断请求
20         //2.线程在acquireQueued方法里收到中断请求
21         selfInterrupt();
22     }
23 }

3.设置相对时间的条件等待(不进行自旋)

 1 //设置定时条件等待(相对时间), 不进行自旋等待
 2 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
 3     //若是线程被中断则抛出异常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //将当前线程添加到条件队列尾部
 8     Node node = addConditionWaiter();
 9     //在进入条件等待以前先彻底释放锁
10     int savedState = fullyRelease(node);
11     long lastTime = System.nanoTime();
12     int interruptMode = 0;
13     while (!isOnSyncQueue(node)) {
14         //判断超时时间是否用完了
15         if (nanosTimeout <= 0L) {
16             //若是已超时就须要执行取消条件等待操做
17             transferAfterCancelledWait(node);
18             break;
19         }
20         //将当前线程挂起一段时间, 线程在这期间可能被唤醒, 也可能本身醒来
21         LockSupport.parkNanos(this, nanosTimeout);
22         //线程醒来后先检查中断信息
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
24             break;
25         }
26         long now = System.nanoTime();
27         //超时时间每次减去条件等待的时间
28         nanosTimeout -= now - lastTime;
29         lastTime = now;
30     }
31     //线程醒来后就会以独占模式获取锁
32     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
33         interruptMode = REINTERRUPT;
34     }
35     //因为transferAfterCancelledWait方法没有把nextWaiter置空, 全部这里要再清理一遍
36     if (node.nextWaiter != null) {
37         unlinkCancelledWaiters();
38     }
39     //根据中断模式进行响应的中断处理
40     if (interruptMode != 0) {
41         reportInterruptAfterWait(interruptMode);
42     }
43     //返回剩余时间
44     return nanosTimeout - (System.nanoTime() - lastTime);
45 }

4.设置相对时间的条件等待(进行自旋)

 1 //设置定时条件等待(相对时间), 进行自旋等待
 2 public final boolean await(long time, TimeUnit unit) throws InterruptedException {
 3     if (unit == null) { throw new NullPointerException(); }
 4     //获取超时时间的毫秒数
 5     long nanosTimeout = unit.toNanos(time);
 6     //若是线程被中断则抛出异常
 7     if (Thread.interrupted()) { throw new InterruptedException(); }
 8     //将当前线程添加条件队列尾部
 9     Node node = addConditionWaiter();
10     //在进入条件等待以前先彻底释放锁
11     int savedState = fullyRelease(node);
12     //获取当前时间的毫秒数
13     long lastTime = System.nanoTime();
14     boolean timedout = false;
15     int interruptMode = 0;
16     while (!isOnSyncQueue(node)) {
17         //若是超时就须要执行取消条件等待操做
18         if (nanosTimeout <= 0L) {
19             timedout = transferAfterCancelledWait(node);
20             break;
21         }
22         //若是超时时间大于自旋时间, 就将线程挂起一段时间
23         if (nanosTimeout >= spinForTimeoutThreshold) {
24             LockSupport.parkNanos(this, nanosTimeout);
25         }
26         //线程醒来后先检查中断信息
27         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
28             break;
29         }
30         long now = System.nanoTime();
31         //超时时间每次减去条件等待的时间
32         nanosTimeout -= now - lastTime;
33         lastTime = now;
34     }
35     //线程醒来后就会以独占模式获取锁
36     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
37         interruptMode = REINTERRUPT;
38     }
39     //因为transferAfterCancelledWait方法没有把nextWaiter置空, 全部这里要再清理一遍
40     if (node.nextWaiter != null) {
41         unlinkCancelledWaiters();
42     }
43     //根据中断模式进行响应的中断处理
44     if (interruptMode != 0) {
45         reportInterruptAfterWait(interruptMode);
46     }
47     //返回是否超时标志
48     return !timedout;
49 }

5.设置绝对时间的条件等待

 1 //设置定时条件等待(绝对时间)
 2 public final boolean awaitUntil(Date deadline) throws InterruptedException {
 3     if (deadline == null) { throw new NullPointerException(); } 
 4     //获取绝对时间的毫秒数
 5     long abstime = deadline.getTime();
 6     //若是线程被中断则抛出异常
 7     if (Thread.interrupted()) { throw new InterruptedException(); }
 8     //将当前线程添加到条件队列尾部
 9     Node node = addConditionWaiter();
10     //在进入条件等待以前先彻底释放锁
11     int savedState = fullyRelease(node);
12     boolean timedout = false;
13     int interruptMode = 0;
14     while (!isOnSyncQueue(node)) {
15         //若是超时就须要执行取消条件等待操做
16         if (System.currentTimeMillis() > abstime) {
17             timedout = transferAfterCancelledWait(node);
18             break;
19         }
20         //将线程挂起一段时间, 期间线程可能被唤醒, 也可能到了点本身醒来
21         LockSupport.parkUntil(this, abstime);
22         //线程醒来后先检查中断信息
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
24             break;
25         }
26     }
27     //线程醒来后就会以独占模式获取锁
28     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
29         interruptMode = REINTERRUPT;
30     }
31     //因为transferAfterCancelledWait方法没有把nextWaiter置空, 全部这里要再清理一遍
32     if (node.nextWaiter != null) {
33         unlinkCancelledWaiters();
34     }
35     //根据中断模式进行响应的中断处理
36     if (interruptMode != 0) {
37         reportInterruptAfterWait(interruptMode);
38     }
39     //返回是否超时标志
40     return !timedout;
41 }

6.唤醒条件队列中的头结点

 1 //唤醒条件队列中的下一个结点
 2 public final void signal() {
 3     //判断当前线程是否持有锁
 4     if (!isHeldExclusively()) {
 5         throw new IllegalMonitorStateException();
 6     }
 7     Node first = firstWaiter;
 8     //若是条件队列中有排队者
 9     if (first != null) {
10         //唤醒条件队列中的头结点
11         doSignal(first);
12     }
13 }
14 
15 //唤醒条件队列中的头结点
16 private void doSignal(Node first) {
17     do {
18         //1.将firstWaiter引用向后移动一位
19         if ( (firstWaiter = first.nextWaiter) == null) {
20             lastWaiter = null;
21         }
22         //2.将头结点的后继结点引用置空
23         first.nextWaiter = null;
24         //3.将头结点转移到同步队列, 转移完成后有可能唤醒线程
25         //4.若是transferForSignal操做失败就去唤醒下一个结点
26     } while (!transferForSignal(first) && (first = firstWaiter) != null);
27 }
28 
29 //将指定结点从条件队列转移到同步队列中
30 final boolean transferForSignal(Node node) {
31     //将等待状态从CONDITION设置为0
32     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
33         //若是更新状态的操做失败就直接返回false
34         //多是transferAfterCancelledWait方法先将状态改变了, 致使这步CAS操做失败
35         return false;
36     }
37     //将该结点添加到同步队列尾部
38     Node p = enq(node);
39     int ws = p.waitStatus;
40     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
41         //出现如下状况就会唤醒当前线程
42         //1.前继结点是取消状态
43         //2.更新前继结点的状态为SIGNAL操做失败
44         LockSupport.unpark(node.thread);
45     }
46     return true;
47 }

能够看到signal方法最终的核心就是去调用transferForSignal方法,在transferForSignal方法中首先会用CAS操做将结点的状态从CONDITION设置为0,而后再调用enq方法将该结点添加到同步队列尾部。咱们再看到接下来的if判断语句,这个判断语句主要是用来判断何时会去唤醒线程,出现这两种状况就会当即唤醒线程,一种是当发现前继结点的状态是取消状态时,还有一种是更新前继结点的状态失败时。这两种状况都会立刻去唤醒线程,不然的话就仅仅只是将结点从条件队列中转移到同步队列中就完了,而不会立马去唤醒结点中的线程。signalAll方法也大体相似,只不过它是去循环遍历条件队列中的全部结点,并将它们转移到同步队列,转移结点的方法也仍是调用transferForSignal方法。

7.唤醒条件队列的全部结点

 1 //唤醒条件队列后面的所有结点
 2 public final void signalAll() {
 3     //判断当前线程是否持有锁
 4     if (!isHeldExclusively()) {
 5         throw new IllegalMonitorStateException();
 6     }
 7     //获取条件队列头结点
 8     Node first = firstWaiter;
 9     if (first != null) {
10         //唤醒条件队列的全部结点
11         doSignalAll(first);
12     }
13 }
14 
15 //唤醒条件队列的全部结点
16 private void doSignalAll(Node first) {
17     //先把头结点和尾结点的引用置空
18     lastWaiter = firstWaiter = null;
19     do {
20         //先获取后继结点的引用
21         Node next = first.nextWaiter;
22         //把即将转移的结点的后继引用置空
23         first.nextWaiter = null;
24         //将结点从条件队列转移到同步队列
25         transferForSignal(first);
26         //将引用指向下一个结点
27         first = next;
28     } while (first != null);
29 }

至此,咱们整个的AbstractQueuedSynchronizer源码分析就结束了,相信经过这四篇的分析,你们能更好的掌握并理解AQS。这个类确实很重要,由于它是其余不少同步类的基石,因为笔者水平和表达能力有限,若是哪些地方没有表述清楚的,或者理解不到位的,还请广大读者们可以及时指正,共同探讨学习。可在下方留言阅读中所遇到的问题,若是有须要AQS注释源码的也可联系笔者索取。

注:以上所有分析基于JDK1.7,不一样版本间会有差别,读者须要注意

相关文章
相关标签/搜索