http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizerjava
通过本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的实现分析(上)的解读,相信不少读者已经对AbstractQueuedSynchronizer(下文简称AQS)的独占功能了然于胸,那么此次咱们经过对另外一个工具类:CountDownLatch的分析来解读AQS的另一个功能:共享功能。node
在开始解读AQS的共享功能前,咱们再重温一下CountDownLatch,CountDownLatch为java.util.concurrent包下的计数器工具类,常被用在多线程环境下,它在初始时须要指定一个计数器的大小,而后可被多个线程并发的实现减1操做,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协做。它在多线程环境下的基本使用方式为:安全
//main thread // 新建一个CountDownLatch,并指制定一个初始大小 CountDownLatch countDownLatch = new CountDownLatch(3); // 调用await方法后,main线程将阻塞在这里,直到countDownLatch 中的计数为0 countDownLatch.await(); System.out.println("over"); //thread1 // do something //........... //调用countDown方法,将计数减1 countDownLatch.countDown(); //thread2 // do something //........... //调用countDown方法,将计数减1 countDownLatch.countDown(); //thread3 // do something //........... //调用countDown方法,将计数减1 countDownLatch.countDown();
相关厂商内容数据结构
注意,线程thread 1,2,3各自调用 countDown后,countDownLatch 的计数为0,await方法返回,控制台输入“over”,在此以前main thread 会一直沉睡。多线程
能够看到CountDownLatch的做用相似于一个“栏栅”,在CountDownLatch的计数为0前,调用await方法的线程将一直阻塞,直到CountDownLatch计数为0,await方法才会返回,并发
而CountDownLatch的countDown()方法则通常由各个线程调用,实现CountDownLatch计数的减1。工具
知道了CountDownLatch的基本使用方式,咱们就从上述DEMO的第一行new CountDownLatch(3)开始,看看CountDownLatch是怎么实现的。oop
首先,看下CountDownLatch的构造方法:ui
和ReentrantLock相似,CountDownLatch内部也有一个叫作Sync的内部类,一样也是用它继承了AQS。spa
再看下Sync:
若是你看过本系列的上半部分,你对setState方法必定不会陌生,它是AQS的一个“状态位”,在不一样的场景下,表明不一样的含义,好比在ReentrantLock中,表示加锁的次数,在CountDownLatch中,则表示CountDownLatch的计数器的初始大小。
设置完计数器大小后CountDownLatch的构造方法返回,下面咱们再看下CountDownLatch的await()方法:
调用了Sync的acquireSharedInterruptibly方法,由于Sync是AQS子类的缘由,这里实际上是直接调用了AQS的acquireSharedInterruptibly方法:
从方法名上看,这个方法的调用是响应线程的打断的,因此在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于0,表示获取失败,经过本系列的上半部分的解读, 咱们知道AQS在获取锁的思路是,先尝试直接获取锁,若是失败会将当前线程放在队列中,按照FIFO的原则等待锁。而对于共享锁也是这个思路,若是和独占锁一致,这里的tryAcquireShared应该是个空方法,留给子类去判断:
再看看CountDownLatch:
若是state变成0了,则返回1,表示获取成功,不然返回-1则表示获取失败。
看到这里,读者可能会发现, await方法的获取方式更像是在获取一个独占锁,那为何这里还会用tryAcquireShared呢?
回想下CountDownLatch的await方法是否是只能在主线程中调用,是否认的,CountDownLatch的await方法能够在多个线程中调用,当CountDownLatch的计数器为0后,调用await的方法都会依次返回。 也就是说能够多个线程同时在等待await方法返回,因此它被设计成了实现tryAcquireShared方法,获取的是一个共享锁,锁在全部调用await方法的线程间共享,因此叫共享锁。
回到acquireSharedInterruptibly方法:
若是获取共享锁失败(返回了-1,说明state不为0,也就是CountDownLatch的计数器还不为0),进入调用doAcquireSharedInterruptibly方法中,按照咱们上述的猜测,应该是要将当前线程放入到队列中去。
在这以前,咱们再回顾一下AQS队列的数据结构:AQS是一个双向链表,经过节点中的next,pre变量分别指向当前节点后一个节点和前一个节点。其中,每一个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点仍是共享节点,头节点中的线程为正在占有锁的线程,然后的全部节点的线程表示为正在等待获取锁的线程。以下图所示:
黄色节点为头节点,表示正在获取锁的节点,剩下的蓝色节点(Node一、Node二、Node3)为正在等待锁的节点,他们经过各自的next、pre变量分别指向先后节点,造成了AQS中的双向链表。每一个线程被加上类型(共享仍是独占)后即是一个Node, 也就是本文中说的节点。
再看看doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //将当前线程包装为类型为Node.SHARED的节点,标示这是一个共享节点。 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //若是新建节点的前一个节点,就是Head,说明当前节点是AQS队列中等待获取锁的第一个节点, //按照FIFO的原则,能够直接尝试获取锁。 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); //获取成功,须要将当前节点设置为AQS队列中的第一个节点,这是AQS的规则//队列的头节点表示正在获取锁的节点 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //检查下是否须要将当前节点挂起 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这里有几点须要说明的:
1. setHeadAndPropagate方法:
首先,使用了CAS更换了头节点,而后,将当前节点的下一个节点取出来,若是一样是“shared”类型的,再作一个"releaseShared"操做。
看下doReleaseShared方法:
for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //若是当前节点是SIGNAL意味着,它正在等待一个信号, //或者说,它在等待被唤醒,所以作两件事,1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。 continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //若是自己头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。 //意味着须要将状态向后一个节点传播。 continue; // loop on failed CAS } if (h == head) // loop if head changed break; }
为何要这么作呢?这就是共享功能和独占功能最不同的地方,对于独占功能来讲,有且只有一个线程(一般只对应一个节点,拿ReentantLock举例,若是当前持有锁的线程重复调用lock()方法,那根据本系列上半部分咱们的介绍,咱们知道,会被包装成多个节点在AQS的队列中,因此用一个线程来描述更准确),可以获取锁,可是对于共享功能来讲。
共享的状态是能够被共享的,也就是意味着其余AQS队列中的其余节点也应能第一时间知道状态的变化。所以,一个节点获取到共享状态流程图是这样的:
好比如今有以下队列:
当Node1调用tryAcquireShared成功后,更换了头节点:
Node1变成了头节点而后调用unparkSuccessor()方法唤醒了Node二、Node2中持有的线程A出于上面流程图的park node的位置,
线程A被唤醒后,重复黄色线条的流程,从新检查调用tryAcquireShared方法,看可否成功,若是成功,则又更改头节点,重复以上步骤,以实现节点自身获取共享锁成功后,唤醒下一个共享类型节点的操做,实现共享状态的向后传递。
2.其实对于doAcquireShared方法,AQS还提供了集中相似的实现:
分别对应了:
比较特别的为最后一个doAcquireSharedNanos方法,咱们一块儿看下它怎么实现超时时间的控制的。
由于该方法和其他获取共享锁的方法逻辑是相似的,我用红色框圈出了它所不同的地方,也就是实现超时时间控制的地方。
能够看到,其实就是在进入方法时,计算出了一个“deadline”,每次循环的时候用当前时间和“deadline”比较,大于“dealine”说明超时时间已到,直接返回方法。
注意,最后一个红框中的这行代码:
nanosTimeout > spinForTimeoutThreshold
从变量的字面意思可知,这是拿超时时间和超时自旋的最小做比较,在这里Doug Lea把超时自旋的阈值设置成了1000ns,即只有超时时间大于1000ns才会去挂起线程,不然,再次循环,以实现“自旋”操做。这是“自旋”在AQS中的应用之处。
看完await方法,咱们再来看下countDown()方法:
调用了AQS的releaseShared方法,并传入了参数1:
一样先尝试去释放锁,tryReleaseShared一样为空方法,留给子类本身去实现,如下是CountDownLatch的内部类Sync的实现:
死循环更新state的值,实现state的减1操做,之因此用死循环是为了确保state值的更新成功。
从上文的分析中可知,若是state的值为0,在CountDownLatch中意味:全部的子线程已经执行完毕,这个时候能够唤醒调用await()方法的线程了,而这些线程正在AQS的队列中,并被挂起的,
因此下一步应该去唤醒AQS队列中的头节点了(AQS的队列为FIFO队列),而后由头节点去依次唤醒AQS队列中的其余共享节点。
若是tryReleaseShared返回true,进入doReleaseShared()方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //若是当前节点是SIGNAL意味着,它正在等待一个信号, //或者说,它在等待被唤醒,所以作两件事,1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。 continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //若是自己头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。 //意味着须要将状态向后一个节点传播。 continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
当线程被唤醒后,会从新尝试获取共享锁,而对于CountDownLatch线程获取共享锁判断依据是state是否为0,而这个时候显然state已经变成了0,所以能够顺利获取共享锁而且依次唤醒AQS队里中后面的节点及对应的线程。
本文从CountDownLatch入手,深刻分析了AQS关于共享锁方面的实现方式:
若是获取共享锁失败后,将请求共享锁的线程封装成Node对象放入AQS的队列中,并挂起Node对象对应的线程,实现请求锁线程的等待操做。待共享锁能够被获取后,从头节点开始,依次唤醒头节点及其之后的全部共享类型的节点。实现共享状态的传播。
这里有几点值得注意:
以上的分析都是从AQS子类的角度去看待AQS的部分功能的,而若是直接看待AQS,或许能够这么去解读:
首先,AQS并不关心“是什么锁”,对于AQS来讲它只是实现了一系列的用于判断“资源”是否能够访问的API,而且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操做, AQS只关心“资源不能够访问时,怎么处理?”、“资源是能够被同时访问,仍是在同一时间只能被一个线程访问?”、“若是有线程等不及资源了,怎么从AQS的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否能够被访问?”这个问题则交给AQS的子类去实现。
当AQS的子类是实现独占功能时,例如ReentrantLock,“资源是否能够被访问”被定义为只要AQS的state变量不为0,而且持有锁的线程不是当前线程,则表明资源不能访问。
当AQS的子类是实现共享功能时,例如:CountDownLatch,“资源是否能够被访问”被定义为只要AQS的state变量不为0,说明资源不能访问。
这是典型的将规则和操做分开的设计思路:规则子类定义,操做逻辑由于具备公用性,放在父类中去封装。
固然,正式由于AQS只是关心“资源在什么条件下可被访问”,因此子类还能够同时使用AQS的共享功能和独占功能的API以实现更为复杂的功能。
好比:ReentrantReadWriteLock,咱们知道ReentrantReadWriteLock的中也有一个叫Sync的内部类继承了AQS,而AQS的队列能够同时存放共享锁和独占锁,对于ReentrantReadWriteLock来讲分别表明读锁和写锁,当队列中的头节点为读锁时,表明读操做能够执行,而写操做不能执行,所以请求写操做的线程会被挂起,当读操做依次推出后,写锁成为头节点,请求写操做的线程被唤醒,能够执行写操做,而此时的读请求将被封装成Node放入AQS的队列中。如此往复,实现读写锁的读写交替进行。
而本系列文章上半部分提到的FutureTask,其实思路也是:封装一个存放线程执行结果的变量A,使用AQS的独占API实现线程对变量A的独占访问,判断规则是,线程没有执行完毕:call()方法没有返回前,不能访问变量A,或者是超时时间没到前不能访问变量A(这就是FutureTask的get方法能够实现获取线程执行结果时,设置超时时间的缘由)。
综上所述,本系列文章从AQS独占锁和共享锁两个方面深刻分析了AQS的实现方式和独特的设计思路,但愿对读者有启发,下一篇文章,咱们将继续JDK 1.8下 J.U.C (java.util.concurrent)包中的其余工具类,敬请期待。
感谢郭蕾对本文的策划和审校。
给InfoQ中文站投稿或者参与内容翻译工做,请邮件至editors@cn.infoq.com。也欢迎你们经过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注咱们,并与咱们的编辑和其余读者朋友交流。