在一篇博客中,咱们看了下CopyOnWriteArrayList的源码,不是很难,里面用到了一个可重入的排他锁: ReentrantLock,这东西看上去和Synchronized差很少,可是和Synchronized是彻底不一样的东西。node
Synchronized锁的特性是JVM保证的,ReentrantLock锁的特性是上层的Java代码控制的。而ReentrantLock的基础就是AQS,事实上,不少并发容器都用了ReentrantLock,也就间接的用到了AQS,还有并发框架,如CountDownLatch,CyclicBarrier,Semaphore也都用到了AQS,可见AQS的重要性。安全
可是要想稍微深刻一点理解AQS实属不易,牵扯到很多东西,因此本篇博客将会拆分红两篇,第一篇将会介绍AQS的前置知识:LockSupport,AQS的核心概念,以及独占、共享模式下,AQS的核心源码解析等,第二篇将会介绍AQS对条件变量的支持,以及AQS的应用等。bash
要深刻一些学习AQS,首先要掌握一个前置知识:LockSupport。并发
LockSupport是一个工具类,它的主要做用是挂起和唤醒线程,它的底层是调用的native方法,这个咱们不去深刻探究,主要看下LockSupport的应用。框架
若是调用park方法的线程已经拿到了与LockSupport关联的许可证,调用park后,会当即返回,不然该线程会被阻塞,直到拿到了许可证。 若是一个线程调用了unpark方法,就会得到与LockSupport关联的许可证,若是该线程以前调用了park而被阻塞,那么会被唤醒,若是该线程以前没有调用park方法,那么调用park方法后,会马上返回。工具
public static void main(String[] args) {
System.out.println("Hello,LockSupport");
LockSupport.park();
System.out.println("Bye,LockSupport");
}
复制代码
运行结果: 学习
public static void main(String[] args) {
System.out.println("Hello,LockSupport");
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("Bye,LockSupport");
}
复制代码
运行结果: ui
public static void main(String[] args) {
Thread thread=new Thread(()->{
System.out.println("Hello,LockSupport");
LockSupport.park();
System.out.println("Bye,LockSupport");
});
thread.start();
LockSupport.unpark(thread);
}
复制代码
运行结果: this
可是不论是哪一种状况,最后的结果都是同样的。只是过程有些区别,第一种状况是 主线程调用了unpark方法后,让子线程拿到了许可证,子线程内部调用park后当即返回,第二种状况是子线程的park方法先调用到,由于目前尚未拿到许可证,因此被阻塞,随后主线程调用了unpark,让子线程拿到了许可证,子线程被返回。spa
和park方法相似,不一样之处在于多了个超时时间,若是调用parkNanos,线程被阻塞了,超过了nanos后,无论有没有得到许可,都会被返回。
public static void main(String[] args) {
System.out.println("Hello,LockSupport");
LockSupport.parkNanos(Integer.MAX_VALUE);
System.out.println("Bye,LockSupport");
}
复制代码
运行结果:
此方法是比较推荐使用的,由于使用它,能够经过jstack命令查看有关阻塞对象的信息。
public class Main {
public void test() {
LockSupport.park(this);
}
public static void main(String[] args) {
Main main = new Main();
main.test();
}
}
复制代码
使用jstack pid命令:
还有几个方法,就不一一介绍了。
有了上面的基础,咱们终于能够进入今天的正题了:AQS。
AQS的全称是AbstractQueuedSynchronizer,翻译是中文是抽象同步队列。刚接触AQS的时候,第一感受这个东西和抽象有关系,由于Abstract。。。后来发现,这个东西和抽象没有半毛钱关系,慢慢的,又有新的理解,这个东西和抽象还真的有点关系,由于它把实现同步队列的一些方法给抽象出来了,供其余上层组件重写或者复用。重点来了,其余上层组件须要重写其中的方法!再说的详细点,就是其余组件须要继承AbstractQueuedSynchronizer,对其中的部分方法进行重写。
咱们先来看下AQS的UML图:
咱们先要对AQS进行一个大概的介绍,了解下AQS中比较核心的东西。
AQS维护了一个FIFO的双向队列,什么是FIFO?就是先进先出的意思,双向队列就是上一个节点指向下一个节点的同时,下一个节点也指向上一个节点,咱们从AbstractQueuedSynchronizer关联的Node类中就能够看出来这一点:prev保存的是当前节点上一个node,next保存的是当前节点的下一个节点,有一个专业的名词,分别是前驱节点,后继节点,同时AbstractQueuedSynchronizer类有两个字段,一个是head,一个是tail,顾名思义,head保存了头节点,tail保存了尾节点。
Node类中的SHARED是用来标记该线程是获取共享资源时被放入等待队列的,EXCLUSIVE用来标记该线程是获取独占资源时被放入等待队列的,从这句话,咱们能够看出Node类其实就是保存了放入等待队列的线程,而有的线程是由于获取共享资源失败放入等待队列的,而有的线程是由于获取独占资源失败而被放入等待队列的,因此这里须要有一个标记去区分。
再啰嗦一句,FIFO双向队列其实就是AQS中的等待队列。
在Node类中,还有一个字段:waitStatus,它有五个取值,分别是:
在AbstractQueuedSynchronizer类中,有一个state字段,被标记为volatile,是为了保证可见性,这个字段的设计可厉害了。对于ReentrantLock来讲,state保存的是重入次数,对于ReentrantReadWriteLock来讲,state保存的是获取读锁的重入次数和写锁的重入次数。
AbstractQueuedSynchronizer类中,还有一个内部类:ConditionObject,用来提供条件变量的支持。
AQS提供了两种方式来获取资源,一种是独占模式,一种是共享模式。
上面提到过,须要去定义一个类去继承AbstractQueuedSynchronizer类,重写其中的方法,通常来讲
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
此方法是独占模式下获取资源的顶级方法,若是线程调用tryAcquire(arg)方法成功了,说明已经获取到了资源,直接返回,若是不成功,则将当前线程封装成waitStaus为Node.EXCLUSIVE的Node插入到AQS等待队列的尾部。
咱们来看下tryAcquire方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
纳尼,直接报错了,这是什么鬼?别忘了,咱们须要重写这个方法。
咱们再来看下addWaiter(Node.EXCLUSIVE), arg)方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//封装成Node,新的Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//把尾节点赋值给pred ,pred也就是尾节点了
if (pred != null) {//若是pred不为NULL
node.prev = pred;//pred赋值给新节点的前驱节点,也就是新节点的前驱节点是尾节点
if (compareAndSetTail(pred, node)) {//CAS,若是pred仍是尾节点,则把新节点设置成尾节点,设置成功后,进入if
pred.next = node;//把新节点赋值给pred的后继节点
return node;//返回新节点
}
}
enq(node);
return node;
}
复制代码
此方法先把线程封装成一个(Node.EXCLUSIVE的Node,先尝试把这个Node直接放入队尾,若是成功的话,直接返回,若是失败的话,调用enq(node)进行入队操做:
private Node enq(final Node node) {
for (;;) {//自旋
Node t = tail;//把尾节点赋值给t
//若是尾节点为空,则新建一个空的Node,用CAS把空的Node设置成头节点
//成功后,再把尾部节点也指向空的Node
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;//把尾节点赋值给传进来的node的前驱节点
if (compareAndSetTail(t, node)) {//CAS,若是t仍是尾部节点,则用传进来的node替换旧的尾部节点
t.next = node;//设置t的后继节点为传进来的node
return t;
}
}
}
}
复制代码
这个方法归纳的来讲,就是把获取资源失败的node放入AQS等待队列。
咱们再回到顶级方法看下acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//拿到node的前驱节点,赋值给p
if (p == head && tryAcquire(arg)) {//若是p已是头节点了,表明这个时候
//node是第二个节点,再次调用tryAcquire获取资源
setHead(node);//设置头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//判断此node是否能够被park
parkAndCheckInterrupt())//park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
又是CAS自旋,首先拿到node的前驱节点,赋值给p,若是p已是头节点了,表明这个时候node是第二个节点,再次尝试调用tryAcquire获取资源,若是成功,设置头节点为node,返回中断标记位,若是失败,先判断本身是否能够被park,若是能够的话,就park,等待unpark。
再来看下parkAndCheckInterrupt方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱节点的waitStatus,赋值给ws
if (ws == Node.SIGNAL)//若是是SIGNAL
return true;
if (ws > 0) {//若是是ws>0,则说明前驱节点被取消了,经过while循环,
//找到最近的一个没有取消的节点,排到后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//CAS设置前驱节点的waitStatus为SIGNAL
}
return false;
}
复制代码
若是前驱节点的waitStatus为SIGNAL,直接返回,若是前驱节点被取消了,则经过while循环,找到最近的一个没有被取消的节点,排到后面去,若是前驱节点处于其余状态,则经过CAS把前驱节点的waitStatus设置为SIGNAL。
再来看下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
这方法就比较简单了,就是park本身,返回当前线程是否被中断。
咱们来为acquireQueued方法作一个总结: 找到一个安全点park本身,若是被唤醒了,检查本身是不是第二个节点,若是是的话,再次尝试获取资源,成功的话,就把本身设置为头节点。
好了,整个顶级的acquire核心内容已经分析完毕了,咱们来作一个总结:
最后,画个流程图帮助理解整个流程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
此方法是独占模式下释放资源的顶级方法。 首先调用tryRelease方法,若是成功了,把头节点赋值给h,若是h不为null而且waitStatus 不等于0,调用unparkSuccessor方法,唤醒下一个node。
tryRelease:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
复制代码
此方法仍是直接报错,由于咱们须要重写。这里咱们须要尤为注意,此方法是判断资源是否被彻底释放了,若是锁是能够重入的,可能屡次得到了锁,因此必须把最后一个锁也释放了,这里才能返回ture,不然返回false。
unparkSuccessor:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;//拿到当前节点的waitStatus,赋值给ws
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//当前节点的下一个节点赋值给s
if (s == null || s.waitStatus > 0) {//若是s==null或者已经被取消了,就经过for循环找到下一个须要被唤醒的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
复制代码
此方法的核心就是唤醒下一个须要被唤醒的节点。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
该方法是共享模式下获取资源的顶级方法。 首先调用tryAcquireShared,来尝试获取资源,成功的话,则调用doAcquireShared,进入等待队列,直到获取了资源。
tryAcquireShared:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
咱们须要重写tryAcquireShared方法。
doAcquireShared:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//入队
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//拿到当前节点的前驱节点,赋值给p
if (p == head) {//若是p是头节点
int r = tryAcquireShared(arg);//调用tryAcquireShared尝试获取资源
if (r >= 0) {
setHeadAndPropagate(node, r);//设置头节点,若是还有剩余资源,唤醒下一个节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
此方法和独占模式下的流程区别不大,最大的不一样在于setHeadAndPropagate方法,咱们来看看这个方法作了什么:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//设置头节点
//若是还有剩余资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//找到后继节点
if (s == null || s.isShared())
doReleaseShared();//调用doReleaseShared方法
}
}
复制代码
首先设置当前节点为头节点,若是还有剩余的资源,就找到后继节点,调用doReleaseShared方法,这个方法咱们后面再看,可是从方法名称来看,咱们能够知道它与释放共享资源有关。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
此方法是共享模式下释放资源的顶级方法。 tryReleaseShared方法仍是须要咱们去重写的,若是成功了,调用doReleaseShared方法:
private void doReleaseShared() {
for (;;) {
Node h = head;//把头节点赋值给h
if (h != null && h != tail) {
int ws = h.waitStatus;//拿到h的waitStatus赋值给ws
if (ws == Node.SIGNAL) {//若是为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
复制代码
这个方法在共享模式下获取共享资源的顶级方法acquireShared中的doAcquireShared中的setHeadAndPropagate也会调用。
好了,独占模式,共享模式下的获取资源,释放资源核心流程已经分析完毕了。
细心的你,必定发如今AQS中还有acquireInterruptibly()/acquireSharedInterruptibly()这两个方法,这两个方法从名称上来看仅仅是多了一个Interruptibly,它们是会对中断进行响应的,而咱们上面介绍的acquire,acquireShared是忽略中断的。
本篇博客到这里就结束了,可是还有一块东西没有讲到:对条件变量的支持,这部份内容将放到下一篇博客再详细介绍。