深刻理解Java并发框架AQS系列(一):线程
深刻理解Java并发框架AQS系列(二):AQS框架简介及锁概念
深刻理解Java并发框架AQS系列(三):独占锁(Exclusive Lock)html
优秀的源码就在那里java
通过了前面两章的铺垫,终于要切入正题了,本章也是整个AQS的核心之一api
从本章开始,咱们要精读AQS源码,在欣赏它的同时也要学会质疑它。固然本文不会带着你们逐行过源码(会有“只在此山中,云深不知处”的弊端),而是从功能入手,对其架构进行逐层剖析,在核心位置重点解读,并提出质疑;虽然AQS源码读起来比较“跳”,但我仍是建议你们花时间及精力去好好读它数据结构
本章咱们采用经典并发类ReentrantLock
来阐述独占锁多线程
独占锁,顾名思义,即在同一时刻,仅容许一个线程执行同步块代码。比如一伙儿人想要过河,但只有一根独木桥,且只能承受一人的重量架构
相信咱们平时写独占锁的程序大抵是这样的:并发
ReentrantLock lock = new ReentrantLock(); try { lock.lock(); doBusiness(); } finally { lock.unlock(); }
上述代码分为三部分:框架
lock.lock()
doBusiness()
lock.unlock()
加锁部分,必定是众矢之的,兵家争抢的要地,对于高并发的程序来讲,同一时刻,大量的线程争相涌入,而lock()
则保证只能有一个线程进入doBusiness()
逻辑,且在其执行完毕unlock()
方法以前,不能有其余线程进入。因此相对而言,unlock()
方法相对轻松,不用处理多线程的场景ide
waitStatus
本章中,咱们引入节点中一个关键的字段waitStatus
(后文简写为ws
),在独占锁模式中,可能会使用到的等待状态以下:高并发
0
ws
为0SIGNAL (-1)
SIGNAL
,即代表其后续节点处于(或即将处于)阻塞状态。因此当前节点在执行完同步代码或被取消后,必定要记得唤醒其后续节点CANCELLED (1)
tryAcquire
发生异常,都会致使当前节点取消。而当节点一旦取消,便永远不会再变为0
或者SIGNAL
状态了咱们先上一张ReentrantLock
加锁功能(非公平)的总体流程图,在并发或关键部分有注释
第一眼看上去,确实有点复杂,不过不用怕,咱们逐一分析解读后,它其实就是只纸老虎
大致上能够分为三大部分
按照正常的理解,可能只会有a、b两部分就够了,为何会有c呢?何时会发生异常?
当一个线程尝试加锁失败后,便会放入阻塞队列的队尾;这节咱们来讨论一下这个动做的细节
在加入阻塞队列以前,首先会查看头节点是否为null,若是是null的话,须要新建ws
为0的头结点,(为何在AQS初始化的时候,不直接新建头结点呢?其实因而可知做者细节处理的严谨,由于若是当咱们的独占锁并发度不大,在尝试加锁的过程当中,总能获取到锁,这时便不会向阻塞队列添加内容,假如初始化便新建头结点,会致使其白白占用内存空间而得不到有效利用)而后将当前节点添加至阻塞队列的尾部,固然头结点初始化、向尾部节点追加新节点都是经过CAS操做的。而阻塞队列呢,正如咱们前文说起的是一个FIFO的队列,且带有next
、prev
两个引用来标记前、后节点;咱们在阻塞队列中加入第一个节点后,阻塞队列的样子:
这一节属于独占锁很核心的部分,里面涉及ws
更改、线程挂起与唤醒、更换头结点等
咱们接着3.1继续,在节点进入调度后,首先检查下当前节点的前节点是否为head
节点,若是是的话,那么有一次尝试加锁的机会,加锁成功或失败将致使2个分支
咱们首先看加锁加锁成功的状况,一旦加锁成功,当前节点便从阻塞队列中“消失”(实际上是当前节点变为了头结点,而原头结点内存不可达,等待垃圾回收),当全部节点都加锁成功,阻塞队列便为空了,但并不表明阻塞队列的长度为0,由于有头结点的存在,因此空阻塞队列的长度是1
而加锁失败或者当前节点的前节点不是head
节点呢?是立刻将线程挂起吗?答案是不肯定的,要看前节点的ws
状态而定。而此步骤还有个隐藏任务:将当前节点以前的全部已取消节点从阻塞队列中剔除。
从上图中咱们看到,一个节点若是想正常进入挂起状态,那么必定要将前节点的ws
改成SIGNAL (-1)
状态,但若是前节点已经变为CANCELLED (1)
状态后,就要递归向前寻找第一个非CANCELLED
的节点。
针对“线程挂起并等待其余线程唤醒”,咱们提出2个问题
问题1
问题2
park/unpark
,即使是unpark发生在park以前,在执行park操做时,也会成功唤醒。这个特质区别于wait/notify
而针对阻塞队列的调度,还有一些没有解释的问题:
CANCELLED
状态的节点?SIGNAL
状态,但通过一段时间运行,前节点变为了CANCELLED
状态,岂不是致使当前节点永远没法被唤醒?要回答这两个问题,就要引出异常处理了
咱们首先讨论若是AQS不作异常处理能够吗? 不能够,例如第一个节点被唤醒后,在加锁阶段发生了异常,若是没有异常处理,这个异常节点将永远处于阻塞队列,成为“僵尸节点”,且后续节点也不会被唤起
官方标明可能会出现异常的部分,诸如“等待超时”、“打断”等,那若是咱们调用acquire()
方法,而非acquireInterruptibly()
、tryAcquireNanos(time)
是否是就不会出现异常?不是的,由于还有AQS下放给咱们本身实现的tryRelease()
等方法。咱们实现一个本身的AQS,并模拟tryRelease()
报错,看AQS可否正常应对
public class FindBugAQS { public volatile static int FLAG = 0; private static ThreadLocal<Integer> FLAG_STORE = new ThreadLocal<>(); private static ThreadLocal<Integer> TIMES = ThreadLocal.withInitial(() -> 0); private Sync sync = new Sync(); private static class Sync extends AbstractQueuedSynchronizer { private Sync() { setState(1); } public void lock() { FLAG_STORE.set(++FLAG); int state = getState(); if (state == 1 && compareAndSetState(state, 0)) { return; } acquire(1); } @Override protected boolean tryAcquire(int acquires) { if (FLAG_STORE.get() == 2) { Integer time = TIMES.get(); if (time == 0) { TIMES.set(1); } else { // 模拟发生异常,第二个节点在第二次访问tryAcquire方法时,将会扔出运行期异常 System.out.println("发生异常"); throw new RuntimeException("lkn aqs bug"); } } int state = getState(); if (state == 1 && compareAndSetState(state, 0)) { return true; } return false; } @Override protected final boolean tryRelease(int releases) { setState(1); return true; } public void unlock() { release(1); } } public void lock() { sync.lock(); } public void unlock() { sync.unlock(); } } // 测试用例以下: public class BugTest { private static volatile int number = 0; @Test public void test2() throws InterruptedException { List<Thread> list = Lists.newArrayList(); FindBugAQS aqs = new FindBugAQS(); Thread thread1 = new Thread(() -> { aqs.lock(); PubTools.sleep(5000); number++; aqs.unlock(); }); thread1.start(); list.add(thread1); PubTools.sleep(500); for (int i = 0; i < 4; i++) { Thread thread2 = new Thread(() -> { aqs.lock(); PubTools.sleep(500); number++; aqs.unlock(); }); thread2.start(); list.add(thread2); } for (Thread thread : list) { thread.join(); } System.out.println("number is " + number); } }
运行结果:
发生异常 Exception in thread "Thread-1" java.lang.RuntimeException: lkn aqs bug at org.xijiu.share.aqs.bug.FindBugAQS$Sync.tryAcquire(FindBugAQS.java:42) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:863) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at org.xijiu.share.aqs.bug.FindBugAQS$Sync.lock(FindBugAQS.java:31) at org.xijiu.share.aqs.bug.FindBugAQS.lock(FindBugAQS.java:64) at org.xijiu.share.aqs.bug.BugTest.lambda$test2$2(BugTest.java:61) at java.lang.Thread.run(Thread.java:748) number is 4
咱们自定义了AQS实现类FindBugAQS.java
,模拟第二个节点在第二次访问tryAcquire
会扔出异常;而后启动5个线程,对number
进行累加。可见,最后的结果符合预期,AQS处理的很完美。那程序发生异常后,阻塞队列究竟如何应对?
举例说明吧,假定如今除去头结点外,阻塞队列中还有3个节点,当第1个节点被唤醒执行时,发生了异常,那么第1个节点会将ws
置为CANCELLED
,且将向后的链条打断(指向本身),但向前链条保持不变,并唤醒下一个节点
由上图可见,当某个节点响应中断/发生异常后,其会主动打断向后链条,但依旧保留向前的链条,这样作的目的是为了后续节点在寻找前节点时,能够找到标记为CANCELLED
状态的节点,而不是找到null
。至此便解答了3.2提出的两个问题
a、为何阻塞队列内有这么多CANCELLED
状态的节点?
CANCELLED
状态,但仍存在于阻塞队列中,直到正常执行的节点将其剔除b、当前节点在挂起前,前节点为SIGNAL
状态,但通过一段时间运行,前节点变为了CANCELLED
状态,岂不是致使当前节点永远没法被唤醒?
原本想针对“解锁逻辑”画一张流程图,但猛然发现解锁部分仅仅10行左右的代码,那就索性把源码贴上,逐一论述下
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
ReentrantLock
解锁源码protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
咱们发现当tryRelease()
方法返回true
时,AQS便会负责唤醒后续节点,由于ReentrantLock
支持了可重入的特性,因此当前线程的每次加锁都会对state
累加,而每次tryRelease()
方法则会对state
累减,直到state
变为初始状态0时,tryRelease()
方法才会返回true
,即唤醒下一个节点
解锁逻辑相对简洁,且不存在并发,本文再也不赘述
再次强调本文是经过ReentrantLock
的视角来分析独占锁,且主要分析的是ReentrantLock.lock()/unlock()
方法,目的是让你们对AQS总体的数据结构有个全面认识,方便后续在实现本身的并发框架时,明白api背后发生的事情,作到游刃有余
而像ReentrantLock
的lockInterruptibly()
、tryLock(TimeUnit)
或者其余独占锁的实现类,读者可自行阅读源码,原理相似,核心代码也是同样的