该文章属于《Java并发编程》系列文章,若是想了解更多,请点击《Java并发编程之总目录》node
在上篇文章《Java并发编程之锁机制之Lock接口》中,咱们已经了解了,Java下整个Lock接口下实现的锁机制是经过AQS(这里咱们将AbstractQueuedSynchronizer 或AbstractQueuedLongSynchronizer统称为AQS)
与Condition来实现的。那下面咱们就来具体了解AQS的内部细节与实现原理。编程
PS:该篇文章会以
AbstractQueuedSynchronizer
来进行讲解,对AbstractQueuedLongSynchronizer有兴趣的小伙伴,能够自行查看相关资料。安全
抽象队列同步器AbstractQueuedSynchronizer (如下都简称AQS),是用来构建锁或者其余同步组件的基础框架,它使用了一个int成员变量来表示同步状态,经过内置的FIFO(first-in-first-out)同步队列来控制获取共享资源的线程。bash
该类被设计为大多数同步组件的基类,这些同步组件都依赖于单个原子值(int)来控制同步状态,子类必需要定义获取获取同步与释放状态的方法,在AQS中提供了三种方法getState()
、setState(int newState)
及compareAndSetState(int expect, int update)
来进行操做。同时子类应该为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既能够支持独占式地获取同步状态,也能够支持共享式地获取同步状态,这样就能够方便实现不一样类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。多线程
AQS的设计是基于模板方法模式的,也就是说,使用者须要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。并发
在子类实现自定义同步组件的时候,须要经过AQS提供的如下三个方法,来获取与释放同步状态。框架
当咱们实现自定义同步组件时,将会调用AQS对外提供的方法同步状态与释放的方法,固然这些方法内部会调用其子类的模板方法。这里将对外提供的方法分为了两类,具体以下所示:函数
响应中断
,若是当前没有获取到同步状态,那么就会进入等待队列,若是当前线程被中断(Thread().interrupt()
),那么该方法将会抛出InterruptedException。并返回在acquireInterruptibly(int arg)的基础上
,增长了超时限制,若是当前线程没有获取到同步状态,那么将返回fase,反之返回true。与独占式获取的主要区别是在同一时刻能够有多个线程获取到同步状态。
在acquireShared(int arg)的基本逻辑相同
,增长了响应中断。在acquireSharedInterruptibly的基础上
,增长了超时限制。在了解了AQS中的针对不一样方式获取与释放同步状态(独占式与共享式
)与修改同步状态的方法后,如今咱们来了解AQS中具体的实现及其内部原理。工具
在上文中咱们提到AQS中主要经过一个FIFO(first-in-first-out)来控制线程的同步。那么在实际程序中,AQS会将获取同步状态的线程构形成一个Node节点,并将该节点加入到队列中。若是该线程获取同步状态失败会阻塞该线程,当同步状态释放时,会把头节点中的线程唤醒,使其尝试获取同步状态。oop
下面咱们就经过实际代码来了解Node节点中存储的信息。Node节点具体实现以下:
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
复制代码
Node节点是AQS中的静态内部类
,下面分别对其中的属性(注意其属性都用volatile 关键字进行修饰
)进行介绍。
经过上文的描述咱们大概了解了Node节点中存储的数据与信息,如今咱们来看看整个AQS下同步队列的结构。具体以下图所示:
head
指针指向队列中的头节点,一个
tail
指针指向队列中的尾节点。
当一个线程成功获取了同步状态(或者锁),其余线程没法获取到同步状态,这个时候会将该线程构形成Node节点,并加入到同步队列中,而这个加入队列的过程必需要确保线程安全,因此在AQS中提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate)
,它须要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与以前的尾节点创建关联。具体过程以下图所示:
在AQS中的同步队列中,头节点是获取同步状态成功的节点,头节点的线程会在释放同步状态时,将会唤醒其下一个节点,而下一个节点会在获取同步状态成功时将本身设置为头节点,具体过程以下图所示:
上图中,虚线部分为以前head指向的节点
。由于设置头节点是获取同步状态成功的线程来完成的,因为只有一个线程可以成功获取到同步状态,所以设置头节点的方法并不须要CAS来进行保证,只须要将原头节点的next指向断开就好了。
如今咱们已经了解了AQS中同步队列的头节点与尾节点的设置过程。如今咱们根据实际代码进行分析,由于涉及到不一样状态对同步状态的获取(独占式与共享式
),因此下面会分别对这两种状态进行讲解。
经过acquire(int arg)
方法咱们能够获取到同步状态,可是须要注意的是该方法并不会响应线程的中断与获取同步状态的超时机制。同时即便当前线程已经中断了,经过该方法放入的同步队列的Node节点(该线程构造的Node),也不会从同步队列中移除。具体代码以下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
在该方法中,主要经过子类重写的方法tryAcquire(arg)
来获取同步状态,若是获取同步状态失败,则会将请求线程构造独占式Node节点(Node.EXCLUSIVE),同时将该线程加入同步队列的尾部(由于AQS中的队列是FIFO类型)。接着咱们查看addWaiter(Node mode)方法具体细节:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//将该线程构形成Node节点
Node pred = tail;
if (pred != null) {//尝试将尾指针 tail 指向当前线程构造的Node节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//若是成功,那么将尾指针以前指向的节点的next指向 当前线程构造的Node节点
pred.next = node;
return node;
}
}
enq(node);//若是当前尾指针为null,则调用enq(final Node node)方法
return node;
}
复制代码
在该方法中,主要分为两个步骤:
接下来咱们继续查看enq(final Node node)方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {//若是当前尾指针为null,那么尝试将头指针 head指向当前线程构造的Node节点
if (compareAndSetHead(new Node()))
tail = head;
} else {//若是当前尾指针(tail)不为null,那么尝试将尾指针 tail 指向当前线程构造的Node节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
在enq(final Node node)方法中,经过死循环(你也能够叫作自旋)
的方式来保证节点的正确的添加。接下来,咱们继续查看acquireQueued(final Node node, int arg)方法的处理。该方法才是整个多线程竞争同步状态的关键,你们必定要注意看!!!
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取该节点的上一节点
//若是上一节点是head锁指向的节点,且该节点获取同步状态成功
if (p == head && tryAcquire(arg)) {
//设置head指向该节点,
setHead(node);
p.next = null; // 将上一节点的next指向断开
failed = false;
return interrupted;
}
//判断获取同步状态失败的线程是否须要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//阻塞并判断当前线程是否已经中断了
interrupted = true;
}
} finally {
if (failed)
//若是线程中断了,那么就将该线程从同步队列中移除,同时唤醒下一节点
cancelAcquire(node);
}
}
复制代码
在该方法中主要分为三个步骤:
死循环(你也能够叫作自旋)
的方式来获取同步状态,若是当前节点的上一节点是head指向的节点
且该节点获取同步状态成功
,那么会设置head指向该节点 ,同时将上一节点的next指向断开。
shouldParkAfterFailedAcquire(Node pred, Node node)
方法来判断是须要否阻塞当前线程,若是该方法返回true
,则调用parkAndCheckInterrupt()
方法来阻塞线程。若是该方法返回false
,那么该方法内部会把当前节点的上一节点的状态修改成Node.SINGAL。cancelAcquire(Node node)
方法将该线程(对应的Node节点)从同步队列中移除,同时唤醒下一节点。下面咱们接着来看shouldParkAfterFailedAcquire(Node pred, Node node)
方法,看看具体的阻塞具体逻辑,代码以下所示:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//上一节点已经设置状态请求释放信号,所以当前节点能够安全地阻塞
return true;
if (ws > 0) {
//上一节点,已经被中断或者超时,那么接跳过全部状态为Node.CANCELLED
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//其余状态,则调用cas操做设置状态为Node.SINGAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
在该方法中会获取上一节点的状态(waitStatus)
,而后进行下面的三个步骤的判断。
(函数 return true)
。(函数 return false)
。(函数 return false)
。当shouldParkAfterFailedAcquire(Node pred, Node node)
方法返回true
时,接着会调用parkAndCheckInterrupt()方法来阻塞当前线程。该方法的返回值为当前线程是否中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
在该方法中,主要阻塞线程的方法是经过LockSupport(在后面的文章中会具体介绍)的park来阻塞当前线程。
经过对独占式获取同步状态的理解,咱们知道 acquireQueued(final Node node, int arg)方法中最终会执行finally
语句块中的代码,来判断当前线程是否已经中断。若是中断,则经过那么cancelAcquire(Node node)方法将该线程从同步队列中移除
。那么接下来咱们来看看该方法的具体实现。具体代码以下:
private void cancelAcquire(Node node) {
//若是当前节点已经不存在直接返回
if (node == null)
return;
//(1)将该节点对应的线程置为null
node.thread = null;
//(2)跳过当前节点以前已经取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//获取在(2)操做以后,节点的下一个节点
Node predNext = pred.next;
//(3)将当前中断的线程对应节点状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
//(4)若是当前中断的节点是尾节点,那么则将尾节点从新指向
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//(5)若是中断的节点的上一个节点的状态,为SINGAL或者即将为SINGAL,
//那么将该当前中断节点移除
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);//(6)将该节点移除,同时唤醒下一个节点
}
node.next = node; // help GC
}
}
复制代码
观察上诉代码,咱们能够知道该方法干了如下这几件事
(1)将中断线程对应的节点对应的线程置为null
(2)跳过当前节点以前已经取消的节点(咱们已经知道在Node.waitStatus的枚举中,只有CANCELLED 大于0 )
(3)将当前中断的线程对应节点状态设置为CANCELLED
(4)在(2)的前提下,若是当前中断的节点是尾节点
,那么经过CAS操做将尾节点指向(2)操做后的的节点。
不是尾节点
,且当前中断的节点的上一个节点的状态,为SINGAL或者即将为SINGAL,那么将该当前中断节点移除。unparkSuccessor(Node node)
方法将该节点移除,同时唤醒下一个节点。具体代码以下:private void unparkSuccessor(Node node) {
//重置该节点为初始状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取中断节点的下一节点
Node s = node.next;
//判断下一节点的状态,若是为Node.CANCELED状态
if (s == null || s.waitStatus > 0) {
s = null;
//则经过尾节点向前遍历,获取最近的waitStatus<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//若是该节点不会null,则唤醒该节点中的线程。
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
这里为了方便你们理解,我仍是将图补充了出来,(图片有可能不是很清晰,建议你们点击浏览大图),
当线程获取同步状态成功并执行相应逻辑后,须要释放同步状态,使得后继线程节点可以继续获取同步状态,经过调用AQS的relase(int arg)方法,能够释放同步状态。具体代码以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
在该方法中,会调用模板方法tryRelease(int arg)
,也就是说同步状态的释放逻辑,是须要用户来本身定义的。当tryRelease(int arg)
方法返回true后,若是当前头节点不为null且头节点waitStatus!=0,接着会调用unparkSuccessor(Node node)
方法来唤醒下一节点(使其尝试获取同步状态)
。关于unparkSuccessor(Node node)方法,上文已经分析过了,这里就再也不进行描述了。
共享式获取与独占式获取最主要的区别在于同一时刻是否能有多个线程同时获取到同步状态
。以文件的读写为例,若是一个程序在对文件进行读操做
,那么这一时刻对于文件的写操做均会被阻塞
。而其余读操做可以同时进行
。若是对文件进行写操做
,那么这一时刻其余的读写操做都会被阻塞
,写操做要求对资源的独占式访问,而读操做能够是共享访问的。
在了解了共享式同步状态获取与独占式获取同步状态的区别后,如今咱们来看一看共享式获取的相关方法。在AQS中经过 acquireShared(int arg)方法来实现的。具体代码以下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
在该方法内部会调用模板方法tryAcquireShared(int arg)
,同独占式获取获取同步同步状态同样,也是须要用户自定义的。当tryAcquireShared(int arg)
方法返回值小于0时,表示没有获取到同步状态,则调用doAcquireShared(int arg)
方法获取同步状态。反之,已经获取同步状态成功,则不进行任何的操做。关于doAcquireShared(int arg)
方法具体实现以下所示:
private void doAcquireShared(int arg) {
//(1)添加共享式节点在AQS中FIFO队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//(2)自旋获取同步状态
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//当获取同步状态成功后,设置head指针
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//(3)判断线程是否须要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//(4)若是线程已经中断,则唤醒下一节点
if (failed)
cancelAcquire(node);
}
}
复制代码
总体来看,共享式获取的逻辑与独占式获取的逻辑几乎同样,仍是如下几个步骤:
(1)添加共享式节点在AQS中FIFO队列中,这里须要注意节点的构造为 addWaiter(Node.SHARED)
,其中 Node.SHARED为Node类中的静态常量(static final Node SHARED = new Node())
,且经过addWaiter(Node.SHARED)方法构造的节点状态为初始状态,也就是waitStatus= 0
。
(2)自旋获取同步状态,若是当前节点的上一节点为head节点,其获取同步状态成功,那么将调用setHeadAndPropagate(node, r);
,从新设置head指向当前节点。同时从新设置该节点状态waitStutas = Node.PROPAGATE(共享状态)
,而后直接退出doAcquireShared(int arg)方法。具体状况以下图所示:
shouldParkAfterFailedAcquire(Node pred, Node node)
方法来判断是须要否阻塞当前线程,若是该方法返回true
,则调用parkAndCheckInterrupt()
方法来阻塞线程。若是该方法返回false
,那么该方法内部会把当前节点的上一节点的状态修改成Node.SINGAL。具体状况以下图所示:前面咱们提到了,共享式与独占式获取同步状态的主要不一样在于其设置head指针的方式不一样
,下面咱们就来看看共享式设置head指针的方法setHeadAndPropagate(Node node, int propagate)
。具体代码以下:
private void setHeadAndPropagate(Node node, int propagate) {
//(1)设置head 指针,指向该节点
Node h = head; // Record old head for check below
setHead(node);
//(2)判断是否执行doReleaseShared();
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//若是当前节点的下一节点是共享式获取同步状态节点,则调用doReleaseShared()方法
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
在setHeadAndPropagate(Node node, int propagate)方法中有两个参数。 第一个参数node
是当前共享式获取同步状态的线程节点。 第二个参数propagate
(中文意思,繁殖、传播)是共享式获取同步状态线程节点的个数。
其主要逻辑步骤分为如下两个步骤:
从中咱们能够看出在共享式获取中,Head节点老是指向最进获取成功的线程节点!!!
if (s == null || s.isShared())
,其中 s为当前节点的下一节点(也就是说同一时刻有可能会有多个线程同时访问)。当该条件为true时,会调用doReleaseShared()方法。关于怎么判断下一节点是不是否共享式线程节点,具体逻辑以下://在共享式访问中,当前节点为SHARED类型
final Node node = addWaiter(Node.SHARED);
//在调用addWaiter 内部会调用Node构造方法,其中会将nextWaiter设置为Node.SHARED。
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//SHARED为Node类静态类
final boolean isShared() {
return nextWaiter == SHARED;
}
复制代码
下面咱们继续查看doReleaseShared()
方法的具体实现,具体代码以下所示:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//(1)从上图中,咱们能够得知在共享式的同步队列中,若是存在堵塞节点,
//那么head所指向的节点状态确定为Node.SINGAL,
//经过CAS操做将head所指向的节点状态设置为初始状态,若是成功就唤醒head下一个阻塞的线程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒下一节点线程,上文分析过该方法,这里就不在讲了
}
//(2)表示该节点线程已经获取共享状态成功,则经过CAS操做将该线程节点状态设置为Node.PROPAGATE
//从上图中,咱们能够得知在共享式的同步队列中,
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) //若是head指针发生改变一直循环,不然跳出循环
break;
}
}
复制代码
从代码中咱们能够看出该方法主要分为两个步骤:
那么head所指向的节点状态确定为Node.SINGAL
,经过CAS操做将head所指向的节点状态设置为初始状态,若是成功就唤醒head下一个阻塞的线程节点,反之继续循环。waitStatus = Node.PROPAGATE
,若是CAS操做失败,就一直循环。当线程获取同步状态成功并执行相应逻辑后,须要释放同步状态,使得后继线程节点可以继续获取同步状态,经过调用AQS的releaseShared(int arg)方法,能够释放同步状态。具体代码以下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
由于独占式与共享式超时获取同步状态,与其自己的非超时获取同步状态逻辑几乎同样。因此下面就以独占式超时获取同步状态的相应逻辑进行讲解。
在独占式超时获取同步状态中,会调用tryAcquireNanos(int arg, long nanosTimeout)
方法,其中具体nanosTimeout参数为你传入的超时时间(单位纳秒
),具体代码以下所示:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
复制代码
观察代码,咱们能够得知若是当前线程已经中断,会直接抛出InterruptedException
,若是当前线程可以获取同步状态( 调用tryAcquire(arg)),那么就会直接返回,若是当前线程获取同步状态失败,则调用doAcquireNanos(int arg, long nanosTimeout)
方法来超时获取同步状态。那下面咱们接着来看该方法具体代码实现,代码以下图所示:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//(1)计算超时等待的结束时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//(2)若是获取同步状态成功,直接返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//若是获取同步状态失败,计算的剩下的时间
nanosTimeout = deadline - System.nanoTime();
//(3)若是超时直接退出
if (nanosTimeout <= 0L)
return false;
//(4)若是没有超时,且nanosTimeout大于spinForTimeoutThreshold(1000纳秒)时,
//则让线程等待nanosTimeout (剩下的时间,单位:纳秒。)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//(5)若是当前线程被中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
整个方法为如下几个步骤:
nanosTimeout是否大于
spinForTimeoutThreshold(1000纳秒),若是大于,则经过 LockSupport.parkNanos(this, nanosTimeout)
方法让线程等待相应时间。(该方法会在根据传入的nanosTimeout
时间,等待相应时间后返回。),若是nanosTimeout小于等于
spinForTimeoutThreshold时,将不会使该线程进行超时等待,而是进入快速的自旋过程。缘由在于,很是短的超时等待没法作到十分精确,若是这时再进行超时等待,相反会让nanosTimeout的超时从总体上表现得反而不精确。所以,在超时很是短的场景下,线程会进入无条件的快速自旋。直接抛出InterruptedException
。到如今咱们基本了解了整个AQS的内部结构与其独占式与共享式获取同步状态的实现,可是其中涉及到的线程的阻塞、等待、唤醒(与LockSupport工具类相关)相关知识点咱们都没有具体介绍,后续的文章会对LockSupport工具
以及后期关于锁相关的等待/通知模式相关的Condition接口
进行介绍。但愿你们继续保持着学习的动力~~。