在咱们能够深刻学习AbstractQueuedSynchronizer(AQS)以前,必须具有了volatile、CAS和模板方法设计模式的知识,本文主要想从AQS的产生背景、设计和结构、源代码实现及AQS应用这4个方面来学习下AQSnode
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。算法
一、AQS产生背景编程
经过JCP的JSR166规范,Jdk1.5开始引入了j.u.c包,这个包提供了一系列支持并发的组件。这些组件是一系列的同步器,这些同步器主要维护着如下几个功能:内部同步状态的管理(例如表示一个锁的状态是获取仍是释放),同步状态的更新和检查操做,且至少有一个方法会致使调用线程在同步状态被获取时阻塞,以及在其余线程改变这个同步状态时解除线程的阻塞。上述的这些的实际例子包括:互斥排它锁的不一样形式、读写锁、信号量、屏障、Future、事件指示器以及传送队列等。能够看下这里的4.2的图便能理解j.u.c包的组件构成。设计模式
几乎任一同步器均可以用来实现其余形式的同步器。例如,能够用可重入锁实现信号量或者用信号量实现可重入锁。可是,这样作带来的复杂性、开销及不灵活使j.u.c最多只能是一个二流工程,且缺少吸引力。若是任何这样的构造方式不能在本质上比其余形式更简洁,那么开发者就不该该随意地选择其中的某个来构建另外一个同步器。所以,JSR166基于AQS类创建了一个小框架,这个框架为构造同步器提供一种通用的机制,而且被j.u.c包中大部分类使用,同时不少用户也能够用它来定义本身的同步器。这个就是j.u.c的做者Doug Lea大神的初衷,经过提供AQS这个基础组件来构建j.u.c的各类工具类,至此就能够理解AQS的产生背景了。安全
二、AQS的设计和结构数据结构
2.1 设计思想并发
同步器的核心方法是acquire和release操做,其背后的思想也比较简洁明确。acquire操做是这样的:app
while (当前同步器的状态不容许获取操做) {框架
若是当前线程不在队列中,则将其插入队列异步
阻塞当前线程
}
若是线程位于队列中,则将其移出队列
release操做是这样的:
更新同步器的状态
if (新的状态容许某个被阻塞的线程获取成功)
解除队列中一个或多个线程的阻塞状态
从这两个操做中的思想中咱们能够提取出三大关键操做:同步器的状态变动、线程阻塞和释放、插入和移出队列。因此为了实现这两个操做,须要协调三大关键操做引伸出来的三个基本组件:
·同步器状态的原子性管理;
·线程阻塞与解除阻塞;
·队列的管理;
由这三个基本组件,咱们来看j.u.c是怎么设计的。
2.1.1 同步状态
AQS类使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSet操做来读取和更新这个同步状态。其中属性state被声明为volatile,而且经过使用CAS指令来实现compareAndSetState,使得当且仅当同步状态拥有一个一致的指望值的时候,才会被原子地设置成新值,这样就达到了同步状态的原子性管理,确保了同步状态的原子性、可见性和有序性。
基于AQS的具体实现类(如锁、信号量等)必须根据暴露出的状态相关的方法定义tryAcquire和tryRelease方法,以控制acquire和release操做。当同步状态知足时,tryAcquire方法必须返回true,而当新的同步状态容许后续acquire时,tryRelease方法也必须返回true。这些方法都接受一个int类型的参数用于传递想要的状态。
2.1.2 阻塞
直到JSR166,阻塞线程和解除线程阻塞都是基于Java的内置管程,没有其它非基于Java内置管程的API能够用来达到阻塞线程和解除线程阻塞。惟一能够选择的是Thread.suspend和Thread.resume,可是它们都有没法解决的竞态问题,因此也无法用,目前该方法基本已被抛弃。具体不能用的缘由能够官方给出的答复。
j.u.c.locks包提供了LockSupport类来解决这个问题。方法LockSupport.park阻塞当前线程直到有个LockSupport.unpark方法被调用。unpark的调用是没有被计数的,所以在一个park调用前屡次调用unpark方法只会解除一个park操做。另外,它们做用于每一个线程而不是每一个同步器。一个线程在一个新的同步器上调用park操做可能会当即返回,由于在此以前能够有多余的unpark操做。可是,在缺乏一个unpark操做时,下一次调用park就会阻塞。虽然能够显式地取消多余的unpark调用,但并不值得这样作。在须要的时候屡次调用park会更高效。park方法一样支持可选的相对或绝对的超时设置,以及与JVM的Thread.interrupt结合 ,可经过中断来unpark一个线程。
2.1.3 队列
整个框架的核心就是如何管理线程阻塞队列,该队列是严格的FIFO队列,所以不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构,业界主要有两种选择,一种是MCS锁,另外一种是CLH锁。其中CLH通常用于自旋,可是相比MCS,CLH更容易实现取消和超时,因此同步队列选择了CLH做为实现的基础。
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
CLH队列实际并不那么像队列,它的出队和入队与实际的业务使用场景密切相关。它是一个链表队列,经过AQS的两个字段head(头节点)和tail(尾节点)来存取,这两个字段是volatile类型,初始化的时候都指向了一个空节点。以下图:
入队操做:CLH队列是FIFO队列,故新的节点到来的时候,是要插入到当前队列的尾节点以后。试想一下,当一个线程成功地获取了同步状态,其余线程将没法获取到同步状态,转而被构形成为节点并加入到同步队列中,而这个加入队列的过程必需要保证线程安全,所以同步器提供了一个CAS方法,它须要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与以前的尾节点创建关联。入队操做示意图大体以下:
出队操做:由于遵循FIFO规则,因此能成功获取到AQS同步状态的一定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,然后续节点会在获取AQS同步状态成功的时候将本身设置为首节点。设置首节点是由获取同步成功的线程来完成的,因为只能有一个线程能够获取到同步状态,因此设置首节点的方法不须要像入队这样的CAS操做,只须要将首节点设置为原首节点的后续节点同时断开原节点、后续节点的引用便可。出队操做示意图大体以下:
这一小节只是简单的描述了队列的大概,目的是为了表达清楚队列的设计框架,实际上CLH队列已经和初始的CLH队列已经发生了一些变化,具体的能够看查看资料中Doug Lea的那篇论文中的3.3 Queues。
2.1.4 条件队列
上一节的队列实际上是AQS的同步队列,这一节的队列是条件队列,队列的管理除了有同步队列,还有条件队列。AQS只有一个同步队列,可是能够有多个条件队列。AQS框架提供了一个ConditionObject类,给维护独占同步的类以及实现Lock接口的类使用。
ConditionObject类实现了Condition接口,Condition接口提供了相似Object管程式的方法,如await、signal和signalAll操做,还扩展了带有超时、检测和监控的方法。ConditionObject类有效地将条件与其它同步操做结合到了一块儿。该类只支持Java风格的管程访问规则,这些规则中,当且仅当当前线程持有锁且要操做的条件(condition)属于该锁时,条件操做才是合法的。这样,一个ConditionObject关联到一个ReentrantLock上就表现的跟内置的管程(经过Object.wait等)同样了。二者的不一样仅仅在于方法的名称、额外的功能以及用户能够为每一个锁声明多个条件。
ConditionObject类和AQS共用了内部节点,有本身单独的条件队列。signal操做是经过将节点从条件队列转移到同步队列中来实现的,没有必要在须要唤醒的线程从新获取到锁以前将其唤醒。signal操做大体示意图以下:
await操做就是当前线程节点从同步队列进入条件队列进行等待,大体示意图以下:
实现这些操做主要复杂在,因超时或Thread.interrupt致使取消了条件等待时,该如何处理。await和signal几乎同时发生就会有竞态问题,最终的结果遵守内置管程相关的规范。JSR133修订之后,就要求若是中断发生在signal操做以前,await方法必须在从新获取到锁后,抛出InterruptedException。可是,若是中断发生在signal后,await必须返回且不抛异常,同时设置线程的中断状态。
2.2 方法结构
若是咱们理解了上一节的设计思路,咱们大体就能知道AQS的主要数据结构了。
进而再来看下AQS的主要方法及其做用。
看到这,咱们对AQS的数据结构应该基本上有一个大体的认识,有了这个基本面的认识,咱们就能够来看下AQS的源代码。
三、AQS的源代码实现
主要经过独占式同步状态的获取和释放、共享式同步状态的获取和释放来看下AQS是如何实现的。
3.1 独占式同步状态的获取和释放
独占式同步状态调用的方法是acquire,代码以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工做,其主要逻辑是:首先调用子类实现的tryAcquire方法,该方法保证线程安全的获取同步状态,若是同步状态获取失败,则构造独占式同步节点(同一时刻只能有一个线程成功获取同步状态)并经过addWaiter方法将该节点加入到同步队列的尾部,最后调用acquireQueued方法,使得该节点以自旋的方式获取同步状态。若是获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
下面来首先来看下节点构造和加入同步队列是如何实现的。代码以下:
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
private Node addWaiter(Node mode) {
// 当前线程构形成Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速在尾节点后新增节点 提高算法效率 先将尾节点指向pred
Node pred = tail;
if (pred != null) {
//尾节点不为空 当前线程节点的前驱节点指向尾节点
node.prev = pred;
//并发处理 尾节点有可能已经不是以前的节点 因此须要CAS更新
if (compareAndSetTail(pred, node)) {
//CAS更新成功 当前线程为尾节点 原先尾节点的后续节点就是当前节点
pred.next = node;
return node;
}
}
//第一个入队的节点或者是尾节点后续节点新增失败时进入enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾节点为空 第一次入队 设置头尾节点一致 同步队列的初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
//全部的线程节点在构造完成第一个节点后 依次加入到同步队列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
节点进入同步队列以后,就进入了一个自旋的过程,每一个线程节点都在自省地观察,当条件知足,获取到了同步状态,就能够从这个自旋过程当中退出,不然依旧留在这个自旋过程当中并会阻塞节点的线程,代码以下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前线程节点的前驱节点
final Node p = node.predecessor();
//前驱节点为头节点且成功获取同步状态
if (p == head && tryAcquire(arg)) {
//设置当前节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
再来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎么来阻塞当前线程的,代码以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点的状态决定后续节点的行为
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*前驱节点为-1 后续节点能够被阻塞
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
节点自旋的过程大体示意图以下,其实就是对图2、图三的补充。
图六 节点自旋获取队列同步状态
整个独占式获取同步状态的流程图大体以下:
图七 独占式获取同步状态
当同步状态获取成功以后,当前线程从acquire方法返回,对于锁这种并发组件而言,就意味着当前线程获取了锁。有获取同步状态的方法,就存在其对应的释放方法,该方法为release,如今来看下这个方法的实现,代码以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {//同步状态释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
//直接释放头节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*寻找符合条件的后续节点
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒后续节点
LockSupport.unpark(s.thread);
}
独占式释放是很是简单并且明确的。
总结下独占式同步状态的获取和释放:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,而后唤醒头节点的后继节点。
3.2 共享式同步状态的获取和释放
共享式同步状态调用的方法是acquireShared,代码以下:
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
public final void acquireShared(int arg) {
//获取同步状态的返回值大于等于0时表示能够获取同步状态
//小于0时表示能够获取不到同步状态 须要进入队列等待
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//和独占式同样的入队操做
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//前驱结点为头节点且成功获取同步状态 可退出自旋
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//退出自旋的节点变成首节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
与独占式同样,共享式获取也须要释放同步状态,经过调用releaseShared方法能够释放同步状态,代码以下:
public final boolean releaseShared(int arg) {
//释放同步状态
if (tryReleaseShared(arg)) {
//唤醒后续等待的节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后续节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor方法和独占式是同样的。
四、AQS应用
AQS被大量的应用在了同步工具上。
ReentrantLock:ReentrantLock类使用AQS同步状态来保存锁重复持有的次数。当锁被一个线程获取时,ReentrantLock也会记录下当前得到锁的线程标识,以便检查是不是重复获取,以及当错误的线程试图进行解锁操做时检测是否存在非法状态异常。ReentrantLock也使用了AQS提供的ConditionObject,还向外暴露了其它监控和监测相关的方法。
ReentrantReadWriteLock:ReentrantReadWriteLock类使用AQS同步状态中的16位来保存写锁持有的次数,剩下的16位用来保存读锁的持有次数。WriteLock的构建方式同ReentrantLock。ReadLock则经过使用acquireShared方法来支持同时容许多个读线程。
Semaphore:Semaphore类(信号量)使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireShared方法会减小计数,或当计数为非正值时阻塞线程;tryRelease方法会增长计数,在计数为正值时还要解除线程的阻塞。
CountDownLatch:CountDownLatch类使用AQS同步状态来表示计数。当该计数为0时,全部的acquire操做(对应到CountDownLatch中就是await方法)才能经过。
FutureTask:FutureTask类使用AQS同步状态来表示某个异步计算任务的运行状态(初始化、运行中、被取消和完成)。设置(FutureTask的set方法)或取消(FutureTask的cancel方法)一个FutureTask时会调用AQS的release操做,等待计算结果的线程的阻塞解除是经过AQS的acquire操做实现的。
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
SynchronousQueues:SynchronousQueues类使用了内部的等待节点,这些节点能够用于协调生产者和消费者。同时,它使用AQS同步状态来控制当某个消费者消费当前一项时,容许一个生产者继续生产,反之亦然。
除了这些j.u.c提供的工具,还能够基于AQS自定义符合本身需求的同步器。