关于AbstractQueuedSynchronizerhtml
JDK1.5以后引入了并发包java.util.concurrent,大大提升了Java程序的并发性能。关于java.util.concurrent包我总结以下:java
能够说AbstractQueuedSynchronizer是并发类的重中之重。其实以前在ReentrantLock实现原理深刻探究一文中已经有结合ReentrantLock详细解读过AbstractQueuedSynchronizer,但限于当时水平缘由,回看一年半前的此文,感受对于AbstractQueuedSynchronizer的解读理解还不够深,所以这里更新一篇文章,再次解读AbstractQueuedSynchronizer的数据结构即相关源码实现,本文基于JDK1.7版本。node
AbstactQueuedSynchronizer的基本数据结构算法
AbstractQueuedSynchronizer的基本数据结构为Node,关于Node,JDK做者写了详细的注释,这里我大体总结几点:数据结构
下面我用一张表格总结一下Node中持有哪些变量且每一个变量的含义:并发
关于SIGNAL、CANCELLED、CONDITION、PROPAGATE四个状态,JDK源码的注释中一样有了详细的解读,再用一张表格总结一下:app
AbstractQueuedSynchronizer供子类实现的方法ide
AbstractQueuedSynchzonizer是基于模板模式的实现,不过它的模板模式写法有点特别,整个类中没有任何一个abstract的抽象方法,取而代之的是,须要子类去实现的那些方法经过一个方法体抛出UnsupportedOperationException异常来让子类知道。性能
AbstractQueuedSynchronizer类中一共有五处方法供子类实现,用表格总结一下:ui
这里的acquire很差翻译,因此就直接原词放上来了,由于acquire是一个动词,后面并无带宾语,所以不知道具体acquire的是什么。按照我我的理解,acquire的意思应当是根据状态字段state去获取一个执行当前动做的资格。
好比ReentrantLock的lock()方法最终会调用acquire方法,那么:
这种理解我认为应当是比较准确的。
独占模式acquire实现流程
有了上面的这些基础,咱们看一下独占式acquire的实现流程,主要是在线程acquire失败后,是如何构建数据结构的,先看理论,以后再用一个例子画图说明。
看一下AbstractQuueuedSynchronizer的acquire方法实现流程,acquire方法是用于独占模式下进行操做的:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
tryAcquire方法前面说过了,是子类实现的一个方法,若是tryAcquire返回的是true(成功),即代表当前线程得到了一个执行当前动做的资格,天然也就不须要构建数据结构进行阻塞等待。
若是tryAcquire方法返回的是false,那么当前线程没有得到执行当前动做的资格,接着执行"acquireQueued(addWaiter(Node.EXCLUSIVE), arg))"这句代码,这句话很明显,它是由两步构成的:
分别看一下每一步作了什么。
addWaiter
先看第一步,addWaiter作了什么,从传入的参数Node.EXCLUSIVE咱们知道这是独占模式的:
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node prev = tail; 5 if (prev != null) { 6 node.prev = prev; 7 if (compareAndSetTail(prev, node)) { 8 prev.next = node; 9 return node; 10 } 11 } 12 enq(node); 13 return node; 14 }
首先看第4行~第11行的代码,得到当前数据结构中的尾节点,若是有尾节点,那么先获取这个节点认为它是前驱节点prev,而后:
所以在数据结构中有节点的状况下,全部新增节点都是做为尾节点插入数据结构。从注释上来看,这段逻辑的存在的意义是以最短路径O(1)的效果完成快速入队,以最大化减少开销。
假如当前节点没有被设置为尾节点,那么执行enq方法:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 if (compareAndSetHead(new Node())) 6 tail = head; 7 } else { 8 node.prev = t; 9 if (compareAndSetTail(t, node)) { 10 t.next = node; 11 return t; 12 } 13 } 14 } 15 }
这段代码的逻辑为:
看完了代码,用一张图表示一下AbstractQueuedSynchronizer的总体数据结构(比较简单,就不本身画了,网上随便找了一张图):
acquireQueued
队列构建好了,下一步就是在必要的时候从队列里面拿出一个Node了,这就是acquireQueued方法,顾名思义,从队列里面acquire。看下acquireQueued方法的实现:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.prevecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
这段代码描述了几件事:
看一下第一步shouldParkAfterFailedAcquire代码作了什么:
1 private static boolean shouldParkAfterFailedAcquire(Node prev, Node node) { 2 int ws = prev.waitStatus; 3 if (ws == Node.SIGNAL) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true; 9 if (ws > 0) { 10 /* 11 * prevecessor was cancelled. Skip over prevecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = prev = prev.prev; 16 } while (prev.waitStatus > 0); 17 prev.next = node; 18 } else { 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don't park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(prev, ws, Node.SIGNAL); 25 } 26 return false; 27 }
这里每一个节点判断它前驱节点的状态,若是:
若是判断判断应当park,那么parkAndCheckInterrupt方法:
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
利用LockSupport的park方法让当前线程阻塞。
独占模式release流程
上面整理了独占模式的acquire流程,看到了等待的Node是如何构建成一个数据结构的,下面看一下释放的时候作了什么,release方法的实现为:
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 }
tryRelease一样是子类去实现的,表示当前动做我执行完了,要释放我执行当前动做的资格,讲这个资格让给其它线程,而后tryRelease释放成功,获取到head节点,若是head节点的waitStatus不为0的话,执行unparkSuccessor方法,顾名思义unparkSuccessor意为unpark头结点的继承者,方法实现为:
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 int ws = node.waitStatus; 8 if (ws < 0) 9 compareAndSetWaitStatus(node, ws, 0); 10 11 /* 12 * Thread to unpark is held in successor, which is normally 13 * just the next node. But if cancelled or apparently null, 14 * traverse backwards from tail to find the actual 15 * non-cancelled successor. 16 */ 17 Node s = node.next; 18 if (s == null || s.waitStatus > 0) { 19 s = null; 20 for (Node t = tail; t != null && t != node; t = t.prev) 21 if (t.waitStatus <= 0) 22 s = t; 23 } 24 if (s != null) 25 LockSupport.unpark(s.thread); 26 }
这段代码比较好理解,整理一下流程:
最后,若是拿到了一个不等于null的节点s,就利用LockSupport的unpark方法让它取消阻塞。
实战举例:数据结构构建
上面的例子讲解地过于理论,下面利用ReentrantLock举个例子,可是这里不讲ReentrantLock实现原理,只是利用ReentrantLock研究AbstractQueuedSynchronizer的acquire和release。示例代码为:
1 /** 2 * @author 五月的仓颉http://www.cnblogs.com/xrq730/p/7056614.html 3 */ 4 public class AbstractQueuedSynchronizerTest { 5 6 @Test 7 public void testAbstractQueuedSynchronizer() { 8 Lock lock = new ReentrantLock(); 9 10 Runnable runnable0 = new ReentrantLockThread(lock); 11 Thread thread0 = new Thread(runnable0); 12 thread0.setName("线程0"); 13 14 Runnable runnable1 = new ReentrantLockThread(lock); 15 Thread thread1 = new Thread(runnable1); 16 thread1.setName("线程1"); 17 18 Runnable runnable2 = new ReentrantLockThread(lock); 19 Thread thread2 = new Thread(runnable2); 20 thread2.setName("线程2"); 21 22 thread0.start(); 23 thread1.start(); 24 thread2.start(); 25 26 for (;;); 27 } 28 29 private class ReentrantLockThread implements Runnable { 30 31 private Lock lock; 32 33 public ReentrantLockThread(Lock lock) { 34 this.lock = lock; 35 } 36 37 @Override 38 public void run() { 39 try { 40 lock.lock(); 41 for (;;); 42 } finally { 43 lock.unlock(); 44 } 45 } 46 47 } 48 49 }
所有是死循环,至关于第一条线程(线程0)acquire成功以后,后两条线程(线程一、线程2)阻塞,下面的代码就不考虑后两条线程谁先谁后的问题,就一条线程(线程1)流程执行到底、另外一条线程(线程2)流程执行到底这么分析了。
这里再把addWaiter和enq两个方法源码贴一下:
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node prev = tail; 5 if (prev != null) { 6 node.prev = prev; 7 if (compareAndSetTail(prev, node)) { 8 prev.next = node; 9 return node; 10 } 11 } 12 enq(node); 13 return node; 14 }
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 if (compareAndSetHead(new Node())) 6 tail = head; 7 } else { 8 node.prev = t; 9 if (compareAndSetTail(t, node)) { 10 t.next = node; 11 return t; 12 } 13 } 14 } 15 }
首先第一个acquire失败的线程1,因为此时整个数据结构中么没有任何数据,所以addWaiter方法第4行中拿到的prev=tail为空,执行enq方法,首先第3行获取tail,第4行判断到tail是null,所以头结点new一个Node出来经过CAS算法设置为数据结构的head,tail一样也是这个Node,此时数据结构为:
为了方便描述,prev和next,我给每一个Node随便加了一个地址。接着继续enq,由于enq内是一个死循环,因此继续第3行获取tail,new了一个空的Node以后tail就有了,执行else判断,经过第8行~第10行代码将当前线程对应的Node追加到数据结构尾部,那么当前构建的数据结构为:
这样,线程1对应的Node被加入数据结构,成为数据结构的tail,而数据结构的head是一个什么都没有的空Node。
接着线程2也acquire失败了,线程2既然acquire失败,那也要准备被加入数据结构中,继续先执行addWaiter方法,因为此时已经有了tail,所以不须要执行enq方法,能够直接将当前Node添加到数据结构尾部,那么当前构建的数据结构为:
至此,两个阻塞的线程构建的三个Node已经所有归位。
实战举例:线程阻塞
上述流程只是描述了构建数据结构的过程,并无描述线程一、线程2阻塞的流程,所以接着继续用实际例子看一下线程一、线程2如何阻塞。贴一下acquireQueued、shouldParkAfterFailedAcquire两个方法源码:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.prevecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
1 private static boolean shouldParkAfterFailedAcquire(Node prev, Node node) { 2 int ws = prev.waitStatus; 3 if (ws == Node.SIGNAL) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true; 9 if (ws > 0) { 10 /* 11 * prevecessor was cancelled. Skip over prevecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = prev = prev.prev; 16 } while (prev.waitStatus > 0); 17 prev.next = node; 18 } else { 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don't park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(prev, ws, Node.SIGNAL); 25 } 26 return false; 27 }
首先是线程1,它的前驱节点是head节点,在它tryAcquire成功的状况下,执行第8行~第11行的代码。作几件事情:
所以,若是线程1执行tryAcquire成功,那么数据结构将变为:
从上述流程能够总结到:只有前驱节点为head的节点会尝试tryAcquire,其他都不会,结合后面的release选继承者的方式,保证了先acquire失败的线程会优先从阻塞状态中解除去从新acquire。这是一种公平的acquire方式,由于它遵循"先到先得"原则,可是咱们能够动动手脚让这种公平变为非公平,好比ReentrantLock默认的非公平模式,这个留在后面说。
那若是线程1执行tryAcquire失败,那么要执行shouldParkAfterFailedAcquire方法了,shouldParkAfterFailedAcquire拿线程1的前驱节点也就是head节点的waitStatus作了一个判断,由于waitStatus=0,所以执行第18行~第20行的逻辑,将head的waitStatus设置为SIGNAL即-1,而后方法返回false,数据结构变为:
看到这里就一个变化:head的waitStatus从0变成了-1。既然shouldParkAfterFailedAcquire返回false,acquireQueued的第13行~第14行的判断天然不经过,继续走for(;;)循环,若是tryAcquire失败显然又来到了shouldParkAfterFailedAcquire方法,此时线程1对应的Node的前驱节点head节点的waitStatus已经变为了SIGNAL即-1,所以执行第4行~第8行的代码,直接返回true出去。
shouldParkAfterFailedAcquire返回true,parkAndCheckInterrupt直接调用LockSupport的park方法:
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
至此线程1阻塞,线程2阻塞的流程与线程1阻塞的流程相同,能够本身分析一下。
另外再提一个问题,不知道你们会不会想:
我认为这是AbstractQueuedSynchronizer开发人员作了相似自旋的操做。由于不少时候获取acquire进行操做的时间很短,阻塞会引发上下文的切换,而很短期就从阻塞状态解除,这样相对会比较耗费性能。
所以咱们看到线程1自构建完毕Node加入数据结构到阻塞,一共尝试了两次tryAcquire,若是其中有一次成功,那么线程1就没有必要被阻塞,提高了性能。