AQS的java的同窗面试常常被问到的一个问题。不少同窗被面到这个问题的时候,一脸蒙圈。可是说实话,这个AQS对于java的同窗来讲应该是一个比较重要的知识,由于咱们不少并发的对象都是基于这个实现的,因此考察java同窗的并发知识的功底,问这个AQS也是一个质量比较好的问题。java
AQS的全称是AbstractQueuedSynchronizer(文中就叫:抽象队列),说白了其实就是一个抽象类。node
是咱们并发包里面的基石,重入锁,读写锁,不少并发的工具都是基于它实现的。因此理解好AQS是什么东西对于掌握好并发知识是有帮助的。面试
不过,咱们若是分析AQS的时候,直接就读AQS的代码有点无聊,咱们就分析ReentantLock的源码,由于咱们平时真正使用的时候咱们使用的是ReentantLock,而不是AQS。可是ReentantLock就是基于AQS实现的。并发
在分析AQS以前,我先给你们解析一下AQS的原理。以下图所示:工具
图展现的一个ReentantLocak(基于非公平锁分析)加锁的过程。ui
加锁失败了之后线程二就加入到队列里面去(AQS内部实现了一个双向队列)this
AQS重入加锁大概就是这么个原理。咱们接下来分析一下它的底层源码。我就以ReentantLocak为例写个例子。spa
public class ReentrantLockDemo { ReentrantLock reentrantLock = new ReentrantLock(); int sum=0; public void count(){ //这个是加锁的代码 //咱们这次主要基于非公平锁分析源码,等到合适的时机再给你们 //解释公平锁和非公平锁的区别。 //首先咱们开始分析lock的方法。 reentrantLock.lock(); for (int i = 0; i < 10; i++) { sum++; System.out.println(sum); } //这个是咱们释放锁的代码 reentrantLock.unlock(); } } }
final void lock() { //线程一第一次进来之间使用cas操做修改state的值 //这句代码的语义就是当前state的值是否为0,若是是0,那么就把修改成1。 if (compareAndSetState(0, 1)) //设置当前线程为本身,其实线程一第一次进来加锁的时候,到这儿就加锁成功了!! setExclusiveOwnerThread(Thread.currentThread()); else //若是线程1第二次进来 //那么由于state不是0了,因此会cas操做失败 //因此会走这个方法 acquire(1); } 接下来咱们分析一下,线程一第二次进来是如何加锁的: public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 咱们慢慢分析这段代码 首先应该分析的是:tryAcquire(arg)方法 因此咱们要想分析acquire方法,那么先分析里面的tryAcouire方法 //执行的是这个方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
这个方法调用的是以下的方法: final boolean nonfairTryAcquire(int acquires) { //获取当前线程,当前线程固然是线程一喽 final Thread current = Thread.currentThread(); //获取当前的state 那么state 是1 int c = getState(); //若是c == 0 //其实咱们知道,代码之因此走到这儿就是由于前面c != 0 //可是jdk的源码为了健壮性,因此这儿仍是再次判断了一下 //意思就是若是当前的state是0,那么直接加锁就能够了。 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是发现上一个加锁也是本身 //那么直接进行重入加锁就能够了 else if (current == getExclusiveOwnerThread()) { // 1+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //修改状态值为2,可重入加锁成功,返回true. setState(nextc); return true; } return false; } 若是上个结果返回true以后。咱们再回过头来看这个方法: public final void acquire(int arg) { //(!true) 结果就是false //那么代码就不继续执行了 //也就是说若是是重入加锁,那么这儿加锁成功之后就退出去了。 //换句话说,若是是重入加锁,代码执行到这儿重入加锁也就成功了!! if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 如今咱们的的 线程是1,state=2 接下来咱们在分析一个状况,线程2进来了。咱们的代码又是如何走的。 首先state这个时候不是0了,那么直接走的是acquire方法。 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //直接走这个方法 acquire(1); } //咱们再次分析这段代码 //咱们分析一下这个方法。tryAcquire //经过咱们前面的分析咱们知道,若是重入加锁成功了,那么这儿直接返回的是true //可是若是发现当前的线程 和 里面加锁的线程不是同一个线程 //那么重入加锁失败。这儿就会返回来false //若是tryAcquire返回的是false。那么 (!false) = true //代码就会运行到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //接下来咱们分析一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 分析acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法 咱们首先须要分析addWariter这个方法。注意Node.EXCLUSIVE这个值是null //进入到这个方法的时候咱们进入到了 类 AbstractQueuedSynchonzied里面。 private Node addWaiter(Node mode) { //根据当前线程建立了一个Node //这个node的nextWaiter = mode = null Node node = new Node(Thread.currentThread(), mode); //tail一开始就是等于mull Node pred = tail; //因此第一次进来等式不成立 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //第一次进来代码走的是这儿 enq(node); return node; } //调的这个方法,建立来的参数的是当前线程的Node private Node enq(final Node node) { for (;;) {//自旋 //第一次指针有变化 //第二次进来指针仍是有变化 Node t = tail; //注意咱们第一次进来,知足这个条件,因此t==null,是知足条件的 //第二次进来,那么这个时候t就不等于Null了,因此这儿的这个条件就不知足了 //去执行else语句 if (t == null) { // Must initialize //使用cas设置一个head头 if (compareAndSetHead(new Node())) //指针要发生变化 tail = head; //接着代码就会执行到这儿,你们必定要注意这儿。 //代码执行到这儿之后,由于这是一个死循环,因此 //执行到这儿之后再次运行。 } else { //当前的线程的prev指向t node.prev = t; //使用cas操做设置队列的尾部 //这个cas的意思是当前的tail是否就是t,若是是t //那么就把值修改成当前node,那很明显,当前的tail就是t //因此这个就把当前node设置为tail if (compareAndSetTail(t, node)) { t.next = node; //返回头结点结束这个死循环 return t; } } } } 到目前为止指针变化以下:
接下来假设线程三要进来了。 若是线程三进来,确定就会走到这段代码。 private Node addWaiter(Node mode) { //建立线程三Node Node node = new Node(Thread.currentThread(), mode); //pred指向tail Node pred = tail; //此次pred就不等于null if (pred != null) { //当前的node.pred指向 tail node.prev = pred; //判断当前pred是否是tail,若是是 //就把当前node设置为tail,当前tail确定就是pred if (compareAndSetTail(pred, node)) { //pred.netxt指向了线程三的node pred.next = node; //返回当前node return node; } } enq(node); return node; } 致使到此队列的指针变化以下:
到目前为止咱们分析清楚了addWaiter的方法,可是不要忘记了咱们的目标。咱们是分析 acquireQueued的这个方法。咱们只须要知道若是加锁失败了,那么就会调用addWaiter方法,addWaiter方法返回来的是当前的node acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //须要注意一下咱们这儿是死循环 for (;;) { //获取当前节点的上一个节点,那么咱们线程三的 //上一个节点是线程二 final Node p = node.predecessor(); //线程二不知足这儿条件 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是上面一个条件不知足的话,接下来就会调用以下的方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //传进来的参数是上一个节点和当前节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //当一个节点刚床架你的时候 waitStatus默认值应该是0 int ws = pred.waitStatus; //这个条件不知足 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //这个条件不知足 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //那么就会直接执行这个方法。 //这个方法就会把上一个节点的waitStatus 设置为SINGAL //也就是-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //而后返回false return false; }
咱们再回过头来分析上一段代码: final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //刚刚咱们知道这儿的返回值是false //那么若是是false的话,if条件就不知足了。 //不知足了之后再次执行for循环。 //继续执行shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 当前的队列指针状况以下:
//再次执行这个方法 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取状态,如今ws=-1 int ws = pred.waitStatus; //符合这个提交 if (ws == Node.SIGNAL) //结果返回true return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //这个shouldParkAfterFailedAcquire方法结果为true了之后 //那么接下来执行parkAndCheckInterrupt方法 //因此接下来咱们分析一下parkAndCheckInterrupt方法。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { //这儿操做就比较简单了,这儿就直接把线程挂起,线程就停在这儿不动了 //必需要等另一个线程去执行unpark操做代码才能往下执行。 LockSupport.park(this); return Thread.interrupted(); } 由于代码执行到这儿就已经卡住了。因此咱们回到源头看到如下,最终会让哪段代码卡住。 //这个地方是调动acquireQueued方法致使代码卡住,因此这儿的代码也会卡住不动。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
到目前为止咱们分析了一个加锁失败的线程进去到队列之后的状况。.net
咱们如今能够解释一下公平锁和非公平锁的区别了。
咱们以前的全部的代码分析的都是非公平锁的,非公平锁最大的特色就是在这儿。线程
final void lock() { //加锁的时候不分青红皂白,也无论队列里面是否有人在排着队 //上来就是直接加锁,因此咱们想一下,假设咱们虽然如今队列里面有线程在排队加锁 //可是恰好当前的独占锁释放锁了,新进来的这个线程就加锁成功了。也就是插队成功了。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 若是是公平锁的话,加锁的时候走的是这个逻辑: static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { //调用这个方法 acquire(1); } public final void acquire(int arg) { //首先执行的是这儿的tryAcquire方法 //其实到这儿的代码跟咱们以前看到的代码是同样的。 //可是在往下实现就跟非公平锁那儿不同了 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); int c = getState(); //若是真的当前的state=0 if (c == 0) { //须要这儿进行判断,咱们先单独把hasQueuedPredecessors //代码拿出来分析一下,分析后获得,这个方法是判断队列里面 //是否还有节点了。若是还有节点,那么这个方法就返回true //!true 就是false,代码就不走这儿了。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //这儿返回false return false; } } public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //若是队列里面还有节点 //若是还有节点那么就返回来true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
这样的话,咱们再回过头来看这段代码: public final void acquire(int arg) { // tryAcquire方法返回来的是false,那么!flase的结果就是等于true if (!tryAcquire(arg) && //而后接下来就是走这个方法,那么这个方法,就是跟咱们分析的同样了。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
因此综上所述,咱们的公平队列就是每次在加锁的时候,先判断队列里面是否有线程,若是有就加到队列后面,若是没有,那么就直接加锁成功。 接下来咱们再分析一个场景,就是重入锁释放锁的逻辑。 public void unlock() { sync.release(1); } 接下来调用的是这段代码: public final boolean release(int arg) { //重点调用的是这个方法 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { //加锁如今是咱们的线程一进行重入锁的释放,一开始state的值2 //如今传进来的参数releases 是1 //那么c的值是1 int c = getState() - releases; //若是释放锁的线程不是当前独占锁的线程,那么就会报错。 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是把整个重入锁都释放完了,那么其实c==0 //可是第一次释放重入锁的时候,这儿c是1 if (c == 0) { free = true; //若是整个重入锁都释放了,那么就放当前的独占锁置为null setExclusiveOwnerThread(null); } //更改线程的重入锁的个数 setState(c); //若是整个锁都释放完了,那么返回的是true //若是只是释放了一部分,那么返回的是false。 return free; }
public final boolean release(int arg) { //若是线程一有两个锁重入,当前只是减小了一个 //锁重入,那么tryRelease返回值是false。那么这个条件就不知足 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } //对两个锁重入,由于第一次进来,那么直接返回的是false。 return false; } 其实若是是只释放了线程一的第一个重入锁,那么这个返回值也没什么意义,咱们看到的是,就是把state的值减一了。 接下来咱们继续分析,若是线程一,再释放一个重入锁,也就是state由1变为0了。 咱们回过头来再分析以下代码: public final boolean release(int arg) { //条件知足 if (tryRelease(arg)) { Node h = head; //我这儿的分析是h.waitStatus就是等于0 //可是若是这个等于0的话,咱们的等于就走不下去了,可见这儿的值应该不等于零。 //这样咱们就执行里面的uparkSuccessor方法 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { //获取waitStatus状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取到第一个节点 Node s = node.next; //在目前咱们虚拟的环境中,s!=null if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //这儿直接unpakr把线程唤醒 LockSupport.unpark(s.thread); } 接下来咱们分析一下unpank之后代码如何走: 其实咱们的代码以前卡住了,而后unpark之后会致使代码继续往下执行。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //以前咱们的代码卡在这个地方。 //由于如今线程被唤醒了,因此这儿就能够继续往下执行。 //咱们要注意这儿是一个死循环。 //因此继续重复执行。 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //重复执行到这儿,获取当前节点的上一个节点 //当前节点的上一个节点固然是head了 final Node p = node.predecessor(); //p==head条件知足,因此接着就执行tryAcquire //这个方法就 开始加锁了,修改当前加锁线程的名字 //把state 改成了1 if (p == head && tryAcquire(arg)) { //而后把当前线程的node设置为head setHead(node); //把当前线程的Node 置为null进行垃圾回收 p.next = null; // help GC failed = false; //返回状态 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }