点赞的靓仔,你时人群中最闪耀的光芒
AQS,英文全称AbstractQueuedSynchronizer,直接翻译为抽象的队列同步器。是JDK1.5出现的一个用于解决并发问题的工具类,由大名鼎鼎的Doug Lea打造,与synchornized关键字不一样的是,AQS是经过代码解决并发问题。html
并发问题是指在多线程运行环境下,共享资源安全的问题。
如今的银行帐户,经过银行卡和手机银行均可以操做帐户, 若是咱们同时拿着银行卡和存折去银行搞事情,会怎么样呢?java
package demo.pattren.aqs; public class Money { /** * 假设如今帐户有1000块钱 */ private int money = 1000; /** * 取钱 */ public void drawMoney(){ this.money--; } public static void main(String[] args) throws InterruptedException { Money money = new Money(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("当前帐户余额:" + money.money); } }
这样想着是否是立刻能够去银行搞一波事情? 哈哈,你想太多了,若是能这样搞,银行早破产了。咱们主要是来分析一下出现这个问题的缘由,JVM内存是JMM结构的,每一个线程操做的数据是从主内存中复制的一个和备份,而多个线程就会存在多个备份,当线程中的备份数据被修改时,会将值刷新到主内存,好比多个线程同时获取到了帐户的余额为500元,A线程存钱100,线程A将600刷新到主内存,$\color{red}{主内存并不会主动通知其余线程此时值已经被修改}$,因此主内存的值此时与其余线程的值是不一样的,若是其余线程再操做帐户余额,是在500的基础上进行的,这显然不是咱们想要的结果。node
JDK提供了多种解决多线程安全的方式。安全
volatile是JDK提供的关键字,用来修饰变量,volatile修饰的变量可以保证多个线程下的可见性,如上个案例,A修改了帐户的余额,而后将最新的值刷新到主内存,此时主内存会将最新的值同步到其余线程。
volatile解决了多线程下数据读取一致的问题,$\color{red}{即保证可见性,可是其并不能保证写操做的原子性}$,
当多个线程同时写操做的时候,即多个线程同时去将线程中最新的值刷新到主内存,将会出现问题。
经过volatile关键字修饰money变量,发下并不能解决线程安全问题。多线程
原子操做类是JDK提供的一系列保证原子操做的工具类,原子类能够保证多线程环境下对其值的操做是安全的。并发
package demo.pattren.aqs; import java.util.concurrent.atomic.AtomicInteger; public class AtomicMoney { /** * 假设如今帐户有1000块钱 */ private AtomicInteger money = new AtomicInteger(1000); /** * 取钱 */ public void drawMoney(){ //AtomicInteger的自减操做 this.money.getAndDecrement(); } public static void main(String[] args) throws InterruptedException { AtomicMoney money = new AtomicMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("当前帐户余额:" + money.money); } }
屡次测试结果都是0,与预期一致。原子操做类是使用CAS(Compare and swap 比较并替换)的机制来保证操做的原子性,相对于锁,他的并发性更高。jvm
synchronized关键字是jvm层面来保证线程安全的,经过在代码块先后添加monitorenter与monitorexit命令来保证线程的安全,并且在JDK1.6对synchronized关键字作了较大的优化,性能有了较大的提高。能够肯定的是,经过synchronized确定能够保证线程安全,因此使用synchronized也是很好的选择,固然synchronized锁的升级不可逆特征,致使在高并发下性能是不能很好的保证。高并发
终于迎来了本篇文章的主角,前面的内容,其实与文章的主题AQS并无直接的关联,就简单带过。前面不少都是JVM层面来保证线程安全的,而AQS则是彻底经过代码层面来处理线程安全的。
(PS:小节标题明明是Lock锁,怎么写AQS了,骗我读书少)工具
博主怕挨打,正在全力解释中~。先上类图压场!
如上图,左边是抽象队列同步器,而右边则是使用队列同步器实现的功能——锁、信号量、发令枪等。
能够先不看源码,我们本身思考,要以纯代码的方式实现应当考虑哪些问题?性能
对于一、2两点,难度应带不大,而三、4两点如何去设计呢?咱们经过伪代码预演操做流程。
在业务端,是这样操做的。
加锁 {须要被锁住的代码} 释放锁
加锁与释放锁的逻辑
if(state == 0) 获取到锁 set(state == 1) else 继续等待 while(true){ if(state == 0) 再次尝试获取锁 }
这样设计以后,整个操做流程再次变成了串行操做。
这和咱们去食堂排队打饭是同样的,食堂不可能为每一个学生都开放一个窗口,因此多个学生就会争抢有限的窗口,若是没有必定的控制,那么食堂每到吃饭的时候都是乱套的,一群学生围着窗口同时去打饭,想一想都是多么的恐怖。而由此出现了排队的机制,一个窗口同一时间打饭的人只能有一个,当前一我的离开窗口后,后面排队的学生才能去打饭。
下面咱们深刻JDK源码,领略大师级的代码设计。
业务调用代码:
package demo.aqs; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockMoney { Lock lock = new ReentrantLock(); /** * 假设如今帐户有1000块钱 */ private int money = 1000; //private int money = 1000; /** * 取钱 */ public void drawMoney(){ lock.lock(); this.money--; lock.unlock(); } public static void main(String[] args) throws InterruptedException { LockMoney money = new LockMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("当前帐户余额:" + money.money); } }
追踪Lock方法:
直接看源码基本一下子就晕车,我尝试绘制出lock方法的调用链路。而后结合源码解释。
你们跟着箭头走一遍源码,多多少少可以体会到AQS的实现机制。
final void lock() { //CAS尝试将state从0更新为1,更新成功则执行if下面的代码。 if (compareAndSetState(0, 1)) //获取锁成功,执行线程执行 setExclusiveOwnerThread(Thread.currentThread()); else //获取锁失败,线程入队列 acquire(1); }
看到这段代码,是否是瞬间明白前面提到的一、2两点问题。首先compareAndSetState方法是使用Unsafe直接操做内存而且使用乐观锁的方式,可以保证有且仅有一个线程可以操做成功,是多线程安全的。即设置将state设置为1成功的线程可以抢占到锁(线程互斥),而没有设置成功的线程将进行入队操做(排队等候),这样感受瞬间明朗了许多,那咱们接着往下看。
public final void acquire(int arg) { //tryAcquire失败而且acquireQueued成功,则调用selfInterrupt if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //当线程获取锁失败而且线程阻塞失败会中断线程 selfInterrupt(); }
AbstractQueuedSynchronizor的tryAcquire方法,其最终调用到了Sync的nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取当前锁的状态值 int c = getState(); // state = 0,表示当前锁为空闲状态,其实这一段代码和前面lock的方法是同样的 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //不等于0 则判断当前线程是否为持有锁的线程,若是是则执行代码,这里解决了重入锁问题 else if (current == getExclusiveOwnerThread()) { //当前状态值 + 1(能够看前面的传参) int nextc = c + acquires; // 囧, 这里是超出了int的最大值才会出现这样的状况 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //更新state的值 setState(nextc); return true; } return false; }
经过阅读源码,能够发现,tryAcquire方法在当前线程获取锁成功或者是重入锁的状况下返回true,不然返回false。而同时这个方法解决了上面提到的第4点锁重入的问题。ok,感受愈来愈接近真相了,接着看addWaiter方法。
理解addWaiter方法的代码,先看方法中用的得Node对象。 Node对象是对Thread对象的封装,使其具备线程的功能,同时他还有prev、next等属性。那么很明了,Node是一个链表结构的对象
//前一个结点 volatile Node prev; //下一个结点 volatile Node next;
同时AbstractQueuedSynchronizor中包含head、tail属性
//Node链表的头结点 private transient volatile Node head; //Node链表的尾结点 private transient volatile Node tail;
private Node addWaiter(Node mode) { //将当前线程包装为Node对象 Node node = new Node(Thread.currentThread(), mode); //获取尾节点,当这段代码第一次运行的时候,并无尾结点 //因此确定值为null,那么会执行下面的enq方法 Node pred = tail; //当再次运行代码的时候,尾结点再也不为null(enq方法初始化了尾结点,能够先往下看enq方法源码) if (pred != null) { //当前结点的前置结点指向以前的尾结点 node.prev = pred; //CAS尝试将尾结点从pred设置为node if (compareAndSetTail(pred, node)) { //设置成功则将pred的next结点执行node pred.next = node; return node; } } enq(node); return node; }
上面的解释听着有点绕脑壳。
不着急,咱们先看enq方法
private Node enq(final Node node) { //死循环 for (;;) { //获取尾结点 Node t = tail; //尾结点为空,则初始化尾结点和头结点为同一个新建立的Node对象 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //将当前结点设为为尾结点,并将前一个尾结点的next指向当前结点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //退出循环 return t; } } } }
enq具体作了什么事情呢:
这里须要一些时间 + 空间的想象力,但若是对链表结构比较熟悉的话,这里理解也是不太困难的。
咱们动态的想想执行过程:
final boolean acquireQueued(final Node node, int arg) { //局部变量 boolean failed = true; try { //局部变量 boolean interrupted = false; //死循环 for (;;) { //获取前置结点 final Node p = node.predecessor(); //前置结点为head而且尝试获取锁成功,则不阻塞 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //阻塞操做 , 判断是否应该阻塞 而且 阻塞是否成功 if ( //是否在抢占锁失败后阻塞 shouldParkAfterFailedAcquire(p, node) && //Unsafe操做使线程阻塞 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
//Node pred 前置结点, Node node 当前结点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置结点的等待状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 唤醒信号,即前结点正常,就设置waitStatus为SIGNAL,表示前置结点能够唤醒当前结点,那 * 么当前结点才会安心的被阻塞(若是前置结点不正常,可能就会致使本身不能被唤醒,那确定不 * 能安心睡觉的) */ return true; if (ws > 0) { /* * 找到前置结点中waitStatus <= 0 的Node结点并设置为当前结点的前置结点 * 此状态表示结点不是处于正常状态,那么将他从链表中删除,直到找到状态正常的结点 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 当waitStatus = 0 或者 PROPAGATE(-3) 时,CAS设置值为SIGNAL(-1) * 此状态表示线程正常,但没有设置唤醒,通常为tail的前一个结点,那么须要将其设置为可唤醒 * 状态(SIGNAL) */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
图解以下。
至此,咱们了解了AQS对须要等待的线程存储的过程。
而AQS的解锁以及公平锁、非公平锁,共享锁、独享锁等后续跟上。
参考资料:
https://www.cnblogs.com/water...
https://www.jianshu.com/p/d61...