上一篇咱们解读了LockSupport的源码,其中有提到JUC的一块重要基石是AbstractQueuedSynchronizer类,简称AQS,那么这一篇就正式学习这个类。因为我也是以学代练,确定有不少地方理解的不够到位,欢迎你们留言讨论哈!仍是友情提示,本文的分析的JDK版本是8。node
为何要在开篇就介绍AQS
的工做原理呢?由于先对一些知识点有个大概了解,能够帮咱们在看源码时更容易理解一些,作到有的放矢,事半功倍。安全
这里我总结了三个比较关键的点,须要咱们知道的。bash
AQS
的实现思路仍是很清晰的,使用一个state来维护竞争状态,使用CAS来安全的更新state,获取锁失败的线程进入等待队列unpark,锁被释放后,从队列中唤醒一个线程来继续尝试获取锁。并发
AQS支持独占和共享二种模式,独占模式相对容易理解一些,光说不练假把式,咱们先利用AQS实现一个独占锁SmartLock来加深理解。ide
public class SmartLock {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
return false;
}
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
setExclusiveOwnerThread(null);
return true;
}
}
private Sync mSync;
public SmartLock() {
mSync = new Sync();
}
public void lock() {
mSync.acquire(1);
}
public void unLock() {
mSync.release(1);
}
}
复制代码
咱们新建一个内部类Sync继承AQS,重写它的tryAcquire和tryRelease方法,能够理解为它们分别对应独占模式下的尝试获取锁和尝试释放锁,返回true表示成功,false表示失败。oop
这里咱们能够停下来想一下,既然AQS内部有一个state能够利用,那咱们能够这样设定游戏规则,state=1时表示锁被占用,state=0表示锁没有被某个线程持有。学习
protected boolean tryAcquire(int arg) {
// 先判断当前持有锁的线程是否是本线程,若是是,直接返回true,因此咱们这个锁是支持可冲入的
if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
// CAS的方式更新state,只有当state=0时会成功更新为1
if (compareAndSetState(0, 1)) {
// 当前线程已经获取了锁,设置为Owner thread
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
// 返回true,当前线程会被加入等待队列中
return false;
}
}
protected boolean tryRelease(int arg) {
// 状态更新为0,
setState(0);
// Owner thread设置为null
setExclusiveOwnerThread(null);
return true;
}
复制代码
咱们在SmartLock类中定义二个方法lock和unLock,分别调用acquire和release便可,这里的参数没有用到,传1便可。测试
public void lock() {
mSync.acquire(1);
}
public void unLock() {
mSync.release(1);
}
复制代码
咱们用SmartLock来实现一个线程安全的累加器,逻辑很简单就是提供一个increase方法,对counter进行++操做,咱们知道++操做不是原子的,因此咱们用SmartLock来保证原子性。ui
public class SmartAdder {
private volatile int counter;
private SmartLock lock;
public SmartAdder() {
lock = new SmartLock();
}
public void increase() {
lock.lock();
try {
counter++;
} finally {
lock.unLock();
}
}
public int getCounter() {
return this.counter;
}
}
复制代码
咱们写一段测试的case来验证一下,新建了一个固定有20个核心线程的线程池,而后提交了40个累加任务,每一个任务循环100000次,这样获得的正确结果应该是4000000。this
public static void main(String[] args) {
int threadCount = 20;
int addCount = 100000;
SmartAdder smartAdder = new SmartAdder();
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount * 2; i++) {
executorService.submit(() -> {
for (int j = 0; j < addCount; j++) {
smartAdder.increase();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + smartAdder.getCounter());
executorService.shutdown();
}
// 打印结果
count:4000000
复制代码
打印就结果验证了咱们的SmartLock确实可以正常工做,这样一个简单的互斥锁就完成了,其实也并不复杂嘛!其中CountDonwLatch也是JUC提供的一个并发同步类,关于它的用法后面会详解,这里你们只须要知道await可让当前线程等待线程池中的任务执行完成便可。
有了前面的铺垫,咱们如今先看下AQS中独占模式的acquire和release二个方法的具体实现。
先看acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
能够看到acquire是一个final方法,咱们无法重写它,可是有预留一个tryAcquire方法让咱们重写,咱们在上面的SmartLock类中也是重写了tryAcquire该方法,若是tryAcquire返回false,会调用acquireQueued方法,它的参数是addWaiter(Node.EXCLUSIVE)的结果,咱们先来具体跟进看一下addWaiter的实现。
private Node addWaiter(Node mode) {
// 新建一个Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若是队列的尾标兵tail不为空,将新加入的node插入到队尾,并更新tail
if (pred != null) {
node.prev = pred;
// 若是CAS设置tail成功,直接返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若是tail为空,或者CAS设置tail失败
enq(node);
return node;
}
复制代码
这里的思路就是将新建的node插入到队尾,可是因为到考虑到线程安全的问题,采用了CAS更新,若是更新失败,调用enq方法,继续跟进看一下实现。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 若是检查到队列没有初始化,先执行初始化,注意head对头是标兵
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 在循环中执行CAS插入尾部操做
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
因此看下来,addWaiter逻辑也很清晰,就是要将当前线程,封装为node插入到队列尾部。再看下acquireQueued的实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回node前驱
final Node p = node.predecessor();
// 若是前驱是head,说明当前队列中该线程排在第一顺位,再次调用tryAcquire
// 由于后面调用的parkAndCheckInterrupt会让线程等待,当锁被release时,线程会被unpark
// 因此从新tryAcquire来获取锁,若是获取成功,会将当前node设为头结点,至关于将当前
// node从队列中删除了,由于头结点只是一个标兵,
if (p == head && tryAcquire(arg)) {
setHead(node);
// 这里之因此能够直接将.next置为null,而没有考虑node的next,由于是刚加入的node
// 它在队尾,而又是head的next,说明队列中就它一个,直接将head.next = null就能够了
p.next = null; // help GC
failed = false;
return interrupted;
}
// 先对head设置waitStatus标示位,而后park线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
复制代码
shouldParkAfterFailedAcquire这个方法颇有意思,它是将head的waitStatus设为SINGLE,用来标识有任务须要被唤醒,在后面unpark的时候会检查该标识位。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取node的pre的waitStatus,
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 若是已是Node.SIGNAL,能够安全的park,直接返回
return true;
if (ws > 0) {
// 说明pred被取消了,并发时会出现这种状况,由于咱们没有加锁
// 继续向前查找waitStatus <= 0的node,
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将pred的waitStatus设为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
再看下parkAndCheckInterrupt这个方法的实现
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
比较简单,直接调用了LockSupport.park,因此AQS中让线程等待的方式就是park,这也就是为何咱们前一篇文章要分析LockSupport源码的缘由。
那么线程park等待了,那固然就要有唤醒,咱们看下AQS中release的实现。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
一样的AQS中release是一个final方法,不能被重写,咱们能够重写tryRelease方法。当head不为空,切waitStatus不为0时,调用unparkSuccessor方法,跟进去看下实现
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 先将waitStatus设为0
compareAndSetWaitStatus(node, ws, 0);
// 通常须要被唤醒的是node.next,可是若是next的node被取消了,或者waitStatus>0,这时候这里的
// 策略是从尾部开始从新选择一个node来unpark
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// unpark唤醒线程
LockSupport.unpark(s.thread);
}
复制代码
release的实现相对简单一些,前面介绍tryAcquire失败后,会将当前线程插入到等待队列中时,而后将head的waitStatus置为SINGAL,那么在release时,会先检查这个标识,而后unpark,这里有个小细节,若是head.next被取消了或者waitStatus>0,会从队列的尾部开始往前查找到第一个符合条件的node来unpark。
这里有个细节你们要注意,release只是将队列中第一个知足条件等待的线程唤醒,因此接下来的逻辑仍是在acquireQueued方法中,继续尝试调用tryAcquire,若是成功,则会被出队列(当前节点设为头结点),线程继续执行,不然继续等待。
介绍完了独占模式,再来看下共享模式。与独占模式相似,AQS也对共享模式提供了模板方法。分别是acquireShared和releaseShared,它们也都是final的,咱们可以重写的方法是tryAcquireShared和tryReleaseShared。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
复制代码
acquireShared先调用了tryAcquireShared方法,若是返回值小于0, doAcquireShared一样构建SHARED类型的Node加入等待队列。这里要提一下,tryAcquireShared方法使咱们须要重写的,注意它的返回值是int类型的,而上面咱们分析独占模式tryAcquire的返回值是boolean,由于在共享模式下这个返回值须要有三种状态,因此须要是int类型。
好,咱们继续看下doAcquireShared的实现
// 添加当前节点到队列,与独占模式相似,再也不赘述
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前驱
final Node p = node.predecessor();
if (p == head) {
// 再次尝试获取共享锁
int r = tryAcquireShared(arg);
// 若是获取成功
if (r >= 0) {
// 检查队列中后续的节点,是否须要被唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// park让线程等待,与独占模式相似,再也不赘述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// propagate > 0, 或者 当前头结点或者当前节点node的waitStatus < 0时,调用doReleaseShared
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
大部分的逻辑跟独占模式相似,可是多了一个检查后续节点是否须要被唤醒的逻辑。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
能够看到doReleaseShared是在一个循环中,若是在调用中,head发生了变化,继续循环,不然挑出循环,而在都独占模式下,没有这样的并发问题,因此独占模式下不须要循环,另外干活的就是unparkSuccessor方法,它来唤醒等待的线程,上面在分析独占模式时已经分析过了,这里再也不赘述。
文章先是概述了一下AQS的基本实现原理,而后利用AQS实现了一个简单的互斥锁,最后详细分析了AQS中独占和共享二种模式的关键方法的实现。以上。