AbstractQueuedSynchronizer AQS源码分析

申明:jdk版本为1.8java

AbstractQueuedSynchronizer是jdk中实现锁的一个抽象类,有排他和共享两种模式。node

咱们这里先看排他模式,共享模式后面结合java.util.concurrent.locks.ReentrantReadWriteLock单独写一篇随笔。并发

后面还会分析可重入锁java.util.concurrent.locks.ReentrantLock。ui

言归正传,一块儿来看看吧。this

AQS中主要是经过state变量来控制锁状态的,子类经过重写下面的某些方法并控制state值来达到获取锁或者解锁效果:spa

通常状况下要实现本身的锁,会实现java.util.concurrent.locks.Lock接口,并经过内部类继承自java.util.concurrent.locks.AbstractQueuedSynchronizer实现。形以下面这样:线程

好了,AQS的扩展就先到这里,后面讲具体的锁时再详细分析。3d

下面分析AQS自己的代码实现。code

核心方法是java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(int)获取指定数量的锁(实际就是给state加多少的问题)以及java.util.concurrent.locks.AbstractQueuedSynchronizer.release(int)释放指定数量的锁(实际就是给state减多少的问题)对象

acquire

代码以及注释说明以下;

看注释就明白了,这是以排他模式获取锁,并忽略线程中断,与acquireInterruptibly相对应。acquireInterruptibly在遇到线程被中断时会抛出InterruptedException异常。

这里会先调用一次tryAcquire尝试获取锁,若是获取失败,就会新建一个Node并加到双向链表尾部,该Node会在自旋里面重复地尝试直到得到锁为止,过程当中线程可能会阻塞或者不阻塞,这点彻底取决于Node中的waitStatus状态值,这一点后面会详细分析。

tryAcquire是子类实现的,负责操做state值,经过state值的控制,子类能够实现各类各样的锁,父类的主要逻辑就是控制好Node的链表,线程的阻塞、唤醒等。

下面看下addWaiter方法,方法的逻辑就是用一个Node对象包装当前线程,并将Node加入到双向链表的尾部:

acquire采用的是排他模式,这里的入参是Node.EXCLUSIVE,建立一个排他模式的Node。大部分状况下tail是不为null的,看注释就知道了,

这里外部加了层判断尝试以最快方式将新建的Node挂到链表尾部。

由于是并发环境,因此compareAndSetTail有可能失败,失败的话就进入enq方法,以自旋方式往链表尾部添加。

addWaiter完以后将刚刚新建的Node传入acquireQueued,自旋尝试得到锁:

经过上面代码可知,若是node的前任node是head,而且tryAcquire得到锁成功,就将当前node设为head并返回是不是由于线程中断而从阻塞状态唤醒的。

不然,shouldParkAfterFailedAcquire,先检查是否要阻塞当前线程,若是须要阻塞,则调用parkAndCheckInterrupt,阻塞线程,

并在唤醒后检查是不是终端致使的线程唤醒,同时设置interrupted = true

下面分别看下这两个方法:

shouldParkAfterFailedAcquire:

根据前任节点的状态决定是否应该阻塞,若是前任节点状态为Node.SIGNAL就直接阻塞当前节点对应的线程,不然检查前任节点状态是否为cancelled,

若是是的话,就将前面全部状态为cancelled的节点剔除,剩下的逻辑就是前任状态设置为Node.SIGNAL,总结一句话,在自旋过程当中若是没有成功得到锁

的状况下,兜兜转转最终会返回true,阻塞当前线程。

node状态描述以下:

返回true后就会执行parkAndCheckInterrupt方法,以下:

调用LockSupport.park阻塞当前线程,LockSupport的分析见个人另一篇随笔。

这里唤醒通常经过LockSupport.unpark唤醒,注意,线程的interrupt方法也会唤醒该线程,因此这里调用了Thread.interrupted()静态方法

判断唤醒方式。唤醒后就会继续进入自旋尝试得到锁。

release

释放指定数量的锁,调用子类tryRelease,将State减去入参对应的数值,若是为0,则表示锁完全释放,进入unparkSuccessor方法,唤醒head后面节点对应的线程。

 正常状况下唤醒head后面一个节点的thread,可是,若是nextNode为cancel状态,就从tail往前找最前面符合条件的node。

至此,锁的获取、释放都讲完了,还有一个涉及到共享模式的那一路逻辑后面结合java.util.concurrent.locks.ReentrantReadWriteLock再分析。

ConditionObject

首先,一把AQS锁能够new多个ConditionObject对象,调用await和signal必须在锁的lock和unlock中间,也就是必须得到锁的状况下才能调用,不然会抛出异常。

相似synchronized中调用监视器的wait和notify,不一样的是AQS同一把锁能够有多个conditionObject。

下面详细分析下AQS的await和signal。

await

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
       // 检查线程是否已经中断
if (Thread.interrupted()) throw new InterruptedException();
       // 将当前线程包装进Node,状态为Node.CONDITION,并加到条件队列末尾 Node node
= addConditionWaiter();
       // 释放掉该线程获取的全部锁状态
int savedState = fullyRelease(node); int interruptMode = 0;
       // 循环等待当前node直到该node从条件队列转入等待队列
       // 或者该线程被中断,也就是checkInterruptWhileWaiting返回0
while (!isOnSyncQueue(node)) {
         // 若是尚未转移到等待队列则阻塞当前线程
         
// 这里的线程唤醒有一下几种状况:
         // 1.调用signal后转移到等待队列并排队到该node正常唤醒
         // 2.线程中断唤醒
         // 3.signal时调用transferForSignal入队后检查发现前驱节点取消了,或者对前驱节点设置状态为Node.SIGNAL时CAS失败
         LockSupport.park(this);
         // 唤醒后检查状态,是否被中断唤醒,若是不是返回0,若是是,再看是在signal以前仍是以后,
         // 以前的话返回THROW_IE,await结束后抛出InterruptedException异常
         // 以后的话返回REINTERRUPT,须要从新设置中断标志
         // 值得说明的是,线程被中断唤醒后,无论处在条件队列的什么位置,都会直接将对应node转移到等待队列
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
}
       // 分析checkInterruptWhileWaiting可知,即便线程中断也会转移到等待队列,因此这里acquireQueued自旋排队获取锁
       // 若是acquireQueued返回true则说明等待过程当中发生了中断,若是不是signal以前发生的中断,须要从新设置中断状态以便外部逻辑感知到
       // 由于checkInterruptWhileWaiting和parkAndCheckInterrupt中都是调用的Thread.interrupted静态方法,这个方法会清除中断状态
       if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
       // 正常signal的node会doSignal的时候设置nextWaiter = null,
       //因此这里的条件知足状况就是在直到得到锁以前都没有调用signal(由于signal开始执行就设置了
nextWaiter = null)
       if (node.nextWaiter != null) // clean up if cancelled
         // 在上述状况下,将node从条件队列中移除
          unlinkCancelledWaiters();
      // 若是发生中断,则根据中断状况向外反馈
      if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
   }

整个主线逻辑就是上面分析的那样,下面详解下关键的方法:

checkInterruptWhileWaiting,检查是不是中断致使的线程唤醒并返回后续如何处理的标志:

THROW_IE:抛出异常

REINTERRUPT:从新设置中断标志

从上面代码可知,若是不是中断,则返回0,从新进入while循环判断是否在等待队列中,若是是中断,则判断中断时机:

这里的中断时机主要针对的signal以前仍是以后,由于在signal中会先将node状态设置为0再将node转移到等待队列,以下图所示:

由于都是CAS操做,因此这里经过判断可否将状态由CONDITION设置为0判断是在signal以前仍是以后,若是设置成功,说明在signal操做以前,则将node加入到等待队列并返回true,对应的状态是THROW_IE,若是设置失败,说明signal中的CAS操做抢先了,那就while循环等待调用signal的线程将该node转移到等待队列,对应的状态是REINTERRUPT。

备注:ConditionObject在jdk的BlockingQueue中有很好应用,能够结合一块儿看下效果更佳。

相关文章
相关标签/搜索