AQS的源码分析 <一> java
目录结构node
一、什么是CAS ?数据库
二、同步器类结构安全
三、CLH同步队列多线程
四、AQS中静态内部类Node并发
五、方法分析源码分析
5.一、acquire(int arg )ui
5.二、release(int arg) 释放锁this
六、总结线程
在多线程环境下,咱们通常会对临界区资源(共享资源)进行加锁,释放锁,保证同一时刻最多只有一个线程(独占模式),就如去公共厕所里,在使用一个小房间时会加锁避免本身在使用的时候,别人忽然闯进来同样,引发没必要要的麻烦,在使用完后,再打开锁,其余人才可以使用;还有生产者消费者模型中,线程之间要同步,须要等待和通知机制,来协调线程合做。那么这些是这么实现的?如可重入锁ReentrantLock, 读写锁ReadWriteLock, 信号量 Semaphore, 计数器CountDownLatch,这些都会涉及线程之间的协调同步,那么会有一个抽象的结构,将这些须要共用的功能抽离出来,统一来知足要求吗?咱们一块儿来看看AbstractQueuedSynchronizer 这个抽象类,如何来实现这些功能和其设计的巧妙, 咱们能看到Doug lea 大佬在不少地方使用的循环CAS操做(自旋锁)。
CAS 即 compare and swap 比较并交换, 涉及到三个参数,内存值V, 预期值A, 要修改的新值B, 拿着预期值A与内存值V比较,相等则符合预期,将内存值V更新为B, 不然不相等,不能更新V;
好比我想去果篮拿1个苹果,我预期篮子是5个,而果篮中实际有2个,被其余人早已偷偷吃了几个我还不知道!因而个人指望值5与果篮值实际2不符,那么我就不能更新篮子的苹果数量了。这和数据库中乐观锁机制同样。后面我想用一篇文章单独总结一下CAS, 如它存在的ABA问题以及加上版本号来解决ABA问题, 还有用CAS实现自旋锁等。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
AbstractQueuedSynchronizer (之后简称AQS)是一个抽象类,定义了一些通用功能以及子类须要根据自身特性重写实现的方法,有两个内部类 Node和ConditionObject, 以及2个链表结构的队列,一个是双向链表的同步队列,用来存放尝试获取锁,却未获取到锁需等待的线程,另外一个是单向链表的条件队列,用来存放某些线程已经获取到锁了,为了等待某些事件(如IO事件,mq消息等)主动放弃锁挂起等待条件的线程。
这里主要分析2个方法,分别是独占式获取锁 acquire, 释放锁release。
// 独占模式 EXCLUSIVE acquire(int arg) release(int arg) // 共享模式 SHARED acquireShared(int arg) releaseShared(int arg)
AQS 内部持有一个双向链表的队列(CLH队列,也称线程同步队列,用来存储尝试获取锁,未获取到等待的线程),AQS 持有head, tail节点,以及资源同步状态state 属性, 来表示有限的锁资源占用状况。
注:Head节点是一个空节点,是不关联线程的,由于Head节点表示当前已经占用资源在使用的线程,该线程已经不须要等待锁,在同步队列中做占位做用,它只是一个标志节点,其余线程才是对锁资源有需求,在队列中等待的,慢慢看来,后面代码会更有理解。
双向链表结构以下图:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 等待队列中的头节点,懒初始化,若是头节点存在,表示当前正在占用锁资源的线程。 * */ private transient volatile Node head; /** * 等待队列中尾节点 */ private transient volatile Node tail; /** * 同步状态,在独占模式下 state=0,表示锁空闲,可获取,state =1,则表示被其余线程占用,需等待 * 就如火车上的厕所,绿灯表示可用,红灯表示里面有人,须要等待 */ private volatile int state;
CLH双向链表中每一个node 中有关联的线程,节点状态信息,以及节点的先后指针,每一个属性有volatile修饰,可保证多线程之间的可见性。
static final class Node { // 获取锁的模式分为独占式和共享式 static final Node SHARED = new Node(); //独占式 static final Node EXCLUSIVE = null; // 如下是节点在等待队列中的5种状态,初始为0,即waitStatus的值 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 如下4个属性分别用volatile修饰,保证多线程之间可见性 //节点的状态 volatile int waitStatus; //前置节点 volatile Node prev; // 后置节点 volatile Node next; // 节点关联的线程 volatile Thread thread;
Node中在队列中的等待状态 waitStatus,初始为0,还有如下4种状态和其含义:
锁资源只有1个,同一时刻最多只有一个线程可获取到锁,独占式获取锁,其余未获取到锁的线程需在同步队列中等待。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //若是在等待过程当中,线程被标记中断了,响应中断 selfInterrupt(); }
变形为:
public final void acquire(int arg) { boolean ta = tryAcquire(arg) if(!ta) { Node aw = addWaiter(Node.EXCLUSIVE); boolean isInterrupt = acquireQueued(aw,arg); if(isInterrupt) { //若是在等待过程当中,线程被标记中断了,响应中断 selfInterrupt(); } } }
1) tryAcquire(arg): 尝试去获取锁资源,具体怎样获取锁由每一个子类实现,AQS不作处理,统一抛出 throw new UnsupportedOperationException() 异常;
2) addWaiter(Node.EXCLUSIVE):若是尝试获取锁失败,将该线程封装到一个新node节点中,添加到等待队列中
3) acquireQueued(node, arg) :若是在head节点后,再次尝试去获取锁,可能占用锁的线程已经释放资源了,若是再次获取失败,在队列中寻找到安全位置(告知前置节点,使用完资源唤醒一下本身)后,则在队列中挂起,等待被唤醒,返回布尔类型,表示线程是否被中断。
下面来分别查看以上3个方法的源码:
1)tryAcquire() 是尝试去获取锁,须要每一个具体实现类去实现,默认抛出异常
protected boolean tryAcquire(int arg) { //抛出不支持操做异常 throw new UnsupportedOperationException(); }
来看看 ReentrantLock 是怎样实现 tryAcquire() 去获取锁的,ReentrantLock 有公平方式和非公平两种方式,这里简单看看非公平方式的实现细节。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
内部调用nonfairTryAcquire 方法,代码以下:
final boolean nonfairTryAcquire(int acquires) { //1,获取当前线程对象 final Thread current = Thread.currentThread(); //2,获取同步状态state int c = getState(); //3, c为0表示锁可用,cas更新为1,compareAndSetState内部有unsafe类 调用本地方法(native修饰)执行原子更新操做,多个线程同时进来更新,只会有一个更新成功。更新成功就设置当前线程对象,用来标识当前线程拥有锁了,是有主的,其余线程在我没释放锁以前不可占用,也是用来支持可重入的。 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //4,若是当前线程等于当前已获取锁线程,可重入,同步状态 +1,返回true,容许再次获取锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可重入锁 : 指线程在获取锁资源后,再次去获取同步锁,是容许再次得到的,否则存在产生死锁的风险,由上可知ReentrantLock 是支持可重入的,可重入屡次,每重入一次 state +1,那么重入n 次,也须要释放n次(同理,每释放一次,state -1 ),否则state 没法恢复到到空闲状态0,致使其余线程没法获取到锁。
2)addWaiter(Node mode),将当前线程封装为一个节点,添加到等待队列中。该方法分为2步操做,第一步利用CAS快速将节点入队,若是能入队成功,则返回,不然第二步执行 enq() 方法。
为何?
为了执行效率和安全性两方面考虑,因在多线程环境下,同一时间可能并发执行多个线程,若是此时有2个线程都在执行入队操做,就有可能其中一个线程执行CAS 失败,他引用 的tail节点已经被更新,须要从新获取新的,此时就须要执行enq() 循环轮询去获取最新的tail节点执行入队操做,直到成功。
如图所示:
线程A在刚获取到pred = tail后,cpu调度切换到线程B,线程B此时也执行入队操做,入队成功,队列长度+1, tail更新,此时cpu调度 线程A,线程A再执行 compareAndSetTail(pred, node) 操做就会失败。
快速添加到队尾失败了,有两种状况,
1.是tail节点 为空,队列为空,那么须要初始化队列,添加一个空的head节点,表示当前已经获取锁资源,正在执行的线程。
2.是在cas添加到队尾后面时,有其余线程也在操做,抢先入队成功,致使本身获取的tail节点不是最新的,那就轮询获取最新的tail执行更新。
流程图以下:
addWaiter(Node mode) 代码以下:
private Node addWaiter(Node mode) { /*1,建立 node节点,节点信息保存线程及模式 * Node.EXCLUSIVE for exclusive 独占模式 * Node.SHARED for shared 共享模式 */ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //2,若是队尾节点不为空,尝试将节点添加到 tail后面,创建双向连接 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //3,若是队尾节点为空,或者cas添加节点失败 enq(node); return node; }
enq(node) 节点入队操做,for无限循环尝试去添加节点到队尾,直至成功。
private Node enq(final Node node) { for (;;) { //每新的一次循环,获取的是最新的tail,因tail节点是volatile修饰的,多线程之间内存可见,每次更新都会被刷新到内存 Node t = tail; if (t == null) { // Must initialize //1,head节点是一个 node对象,可是没有绑定线程,表示当前已经获取到锁资源,正在执行的线程,此时首尾相同,会再次走一遍循环,添加当前节点到head节点后面 if (compareAndSetHead(new Node())) tail = head; } else { //2,轮询去设置节点到队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
boolean acquireQueued(final Node node, int arg) 返回线程是否被中断
在上一步方法将节点添加到同步队列后,执行该方法的目的是:
若是node的前置节点是head, 再次尝试一次获取锁资源,若是已经被释放了,那(线程)就去获取锁执行。若是不是头节点或者尝试获取锁资源失败,那就在队列找个位置挂起等待了 。
此时须要找一个安全点,确保前面的有效节点(没被取消的节点)的线程执行后,可以通知唤醒本身,当前节点线程才安心挂起等待。
流程图以下:
源代码以下:
final boolean acquireQueued(final Node node, int arg) { //默认是失败 boolean failed = true; try { //默认是没有被中断 boolean interrupted = false; //无限循环 for (;;) { final Node p = node.predecessor(); //1,若是前置节点是head节点,再去尝试获取锁,若是获取成功了,则将本身更新为head,此时当前线程能够执行了。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 此时返回 false,没被中断 return interrupted; } //2,1失败或前置不是head,那么要定位一个有效的位置去阻塞等待,前面有些节点多是被取消的,须要跳过这些节点,清除出队列,shouldParkAfterFailedAcquire就是来作这个事 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //3,标记中断标记,返回被中断过 interrupted = true; } } finally { // 4,正常状况下是一直会进行for循环,当跳出该循环,即出现异常,如被标记了中断,去调用阻塞,抛出中断异常,此时没有在队列中找到安全位置,所以将该节点移出队列 if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 在失败获取锁资源后,寻找能够安全挂起的位置
若是前置节点被取消,一直向前找到节点 ws<=0 为止,注意蓝色箭头,指向的先后指针还在,可是node1对象已经没有被引用,下次gc会被回收,则出队。
流程图以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //1,前置节点已是signal,在它释放资源后,会通知唤醒队列中下一个线程,因此当前位置就是安全阻塞等待点 return true; if (ws > 0) { // ws> 0,前置节点被取消 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 2,ws>0,那么就继续往前找,直到找到一个节点没被取消的,跳过了那些被取消的节点,这些节点后面会被GC pred.next = node; } else { //3,尝试将前置节点 状态更新为signal,ws= -1, 进行下一次轮询 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在找到安全位置后,挂起当前线程,等待被唤醒,若是下次被唤醒,首先检查一下本身是被前置节点唤醒 仍是被中断唤醒的
private final boolean parkAndCheckInterrupt() { // 挂起当前线程 LockSupport.park(this); //当被唤醒,返回是否被中断标记位 return Thread.interrupted(); }
最后finally中,线程被中断,出现异常,取消节点;cancelAcquire(Node node) 将该节点移出等待队列,线程都被中断了,就再也不
须要去等待锁资源了。
出队有三种状况:
注: 我理解这里最终实现是要达到第3步的效果,可是在cancelAcquire方法里并无更新第2步蓝色箭头的前置指针,而是留给了其余线程在获取锁资源时,执行shouldParkAfterFailedAcquire方法查找安全位置挂起来实现的,这时 node 将没有再被引用,所以会被GC。
3, 被取消节点在head的后面,第二个节点
这里惟一作的就是去唤醒 node 的后继节点(若是为没被取消的节点),若是后继节点为空或者ws =1,那么就在队列从后向前遍历去唤醒一个有效节点,这个节点醒来作的事情是尝试去获取锁,而且将排在它队列前连续被取消的节点出队。因此这里更新节点的先后指针操做仍是交给其余线程来作的,在 shouldParkAfterFailedAcquire方法实现的。
源代码以下:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //1,将节点与线程解绑 node.thread = null; //2,跳过前面全部被取消的节点 // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //3,获取前置节点后一个位置节点,不必定是 node,有多是跳过的被取消的节点,留做cas更新用 Node predNext = pred.next; //4,将当前节点状态改成取消 ws =1 node.waitStatus = Node.CANCELLED; //5,第一种状况:尾节点,将node前置节点更新为tail,并断开前置节点的next后置指针,node将无引用,被gc if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 第三种状况:node在中间位置,要作的就是让node前置节点 pred的next指针指向node后的没被取消的节点,跳过node if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 第二种状况,在head后面,那么要需向node后找没被取消节点,让head跳过node指向他,并唤醒该节点的线程。 unparkSuccessor(node); } node.next = node; // help GC } }
唤醒后继者,该方法在release 方法也被执行,在线程使用完资源后,去唤醒下一个等待该资源的线程去执行。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 状态置为0,初始状态,也不让她通知下一个节点,此时节点应该是取消状态1,是大于0的,为啥还需再判断一次?,这个方法不仅这里用,释放资源release也用,正常ws =-1,因此更新状态为0 //首先判断node后置节点,若是不行,再从尾向前遍历去找一个没被取消的节点 去唤醒其线程 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
public final boolean release(int arg) { //1,释放资源前提是已经获取到 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //2,head的waitStatus在独占模式应该是signal =-1 // 唤醒下一个节点,代码如上 unparkSuccessor(h); return true; } return false; }
一样,tryRelease() 与 tryAcquire() 同样,尝试释放锁,AQS都未具体实现,抛出异常,留个子类具体去实现。
AQS中代码:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
这里仍是看一下ReentrantLock 的具体实现
protected final boolean tryRelease(int releases) { //1,在没有可重入状况下,state =1, releases =1, 此时 c应该 =0 int c = getState() - releases; //2,判断当前释放锁线程是否等于拿到锁的线程,不等就抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //3,可见只有c=0,状况才可成功释放锁 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
注:因ReentrantLock可重入的锁,由上代码可见,只有在 state=0的状况,才可成功释放锁,常见错误场景:在使用ReentrantLock lock n次,却没有unlock相应的n次,致使没有成功释放锁。
一、 AQS的CLH队列的 pre指针可保证可靠性,next是保证不了的,看上面的代码分析,在遍历队列时,会向前遍从来查找安全点。
二、Condition 队列与 CLH同步队列二者的区别,同步队列中的全部线程是在等待获取锁的,Condition条件队列是某些以前已经获取到锁,由于要等待某个事件(如IO事件)或者与某个线程同步(等待另外一线程执行结果),主动调用await方法,主动释放锁后,将该线程放入条件队列中的,因此在条件队列中线程不是等待获取锁,而是在等某个条件,下一篇我会来分析AQS的重要机制- 等待通知,靠其内部类ConditionObject实现,await和 signal来阻塞与 通知机制,可替代传统Object类提供的wait和 notify功能,而AQS可有多个条件来分类,比之应用更加灵活。