AbstractQueueSynchronized的缩写,也叫抽象的队列式同步器。定义了一套多线程访问共享资源的同步器框架。html
字如其名,他是一个抽象类,因此大部分同步类都是继承于它,而后重写部分方法便可。java
好比说ReentrantLock/Semanode
phore/CountDownLatch都是AQS的具体实现类。多线程
AQS维护了一个共享资源State和一个FIFO的等待队列,当有多个线程争抢资源的时候就会阻塞进入此队列。app
线程在争抢State这个共享资源的时候,会被封装成一个Node节点,也就是说在AQS的等待队列里面的元素都是Node类型的对象。框架
PS:阻塞队列中,不包括Head节点。函数
在了解AQS以前,咱们先了解下Node内部是怎样的,咱们先来看看源码oop
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;
// ======== 下面的几个int常量是给waitStatus用的 ===========
// 代码此线程取消了争抢这个锁
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示当前node的后继节点对应的线程须要被唤醒
//通俗的话来讲就是,若是A节点被设置为SIGNAL,假如B是A的后继节点,那么B须要依赖A节点来唤醒才能拿到锁
static final int SIGNAL = -1;
// 本文不分析condition,因此略过吧,下一篇文章会介绍这个
static final int CONDITION = -2;
// 一样的不分析,略过吧
static final int PROPAGATE = -3;
// =====================================================
// 取值为上面的一、-一、-二、-3,或者0(之后会讲到)
// 这么理解,暂时只须要知道若是这个值大于0表明此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是能够指定timeouot的
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是本线程
volatile Thread thread;
}
复制代码
//头结点,当前持有锁的线程
private transient volatile Node head;
//尾节点,每次有新的节点进来,都要放在尾节点后面
private transient volatile Node tail;
//当前锁的状态,值为0的时候,表示共享资源没有被占用,1的时候表示有一个线程占用,若是大于1则表示重入
private volatile int state;
// 表明当前持有独占锁的线程,举个最重要的使用例子,由于锁能够重入
// reentrantLock.lock()能够嵌套调用屡次,因此每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;
复制代码
由于AQS只是提供了一个模板,那么具体资源的获取方式和释放方式就由具体的实现类来决定。源码分析
下面咱们跟着看源码一块儿看看AQS究竟是什么东西post
对于State的访问,AQS定义了如下3种方式
上面咱们说过,具体的同步实现器就是实现资源state的获取和释放的方式就行了,关于队列的维护,Node节点的入队出队或者获取资源失败等操做,AQS已经实现好。
自定义的同步器只要实现如下方法,就能够实现出不一样的同步器
PS:以上的方法在AQS上是没有实现的,只有在具体的同步类实现器才会实现。
以独占模式的ReentractLock的公平锁为例子
其实在每一个具体的同步类(独占模式)的操做资源的接口中,最终调用的是AQS的acquire方法(好比说ReentractLock的公平锁)
因此咱们看acquire的方法具体是怎么实现,至于其余不一样的同步器的方法调用,也差很少都理解了。
关于解释都在代码里面了
public final void acquire(int arg) {//arg = 1,表示同步器想要1个state资源
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// tryAcquire,顾名思义,就是先尝试获取一下,若是获取成功,就返回true,那么获取资源也就结束了,不然,就把当前线程设置为独占模式(EXCLUSIVE),压到阻塞队列中。
// addWaiter就是把当前线程封装成Node对象,而且设置为独占模式(EXCLUSIVE),加入阻塞队列
复制代码
下面继续看tryAcquire的源码
注意:这里用ReentranctLock只是为了方便举例子,不一样的同步器实现不一样的方法而已.
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取资源
int c = getState();
//若是c为0,说明资源没有线程占用,则能够去抢
if (c == 0) {
//既然是公平锁,那么确定就讲究先来后到
//hasQueuedPredecessors先看看前面有没有Node节点在等待,若是没有,就经过CAS去获取一下
//在这里存在着线程竞争,因此有可能成功有可能失败,若是成功得到资源,那么compareAndSetState返回true,不然false
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//到这里说明前面没有线程在等待而且成功抢占到临界资源
//因此就设置当前线程为占有资源的线程,方便后面判断重入
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//到这里说明是重入的问题
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//到这里就是临界资源被占用,并且不是重入的状况,也就是说head节点都还没释放资源!
return false;
}
复制代码
下面继续看addWaiter的源码和acquireQueued的源码
private Node addWaiter(Node mode) {//传入的是Node.EXCLUSIVE
//将当前线程封装成Node对象,并设置为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//找到尾节点
Node pred = tail;
//若是找到队尾,说明队列不为空(若是只有head,其实队列式为空)
if (pred != null) {
//把队尾设置为插入节点的前缀节点
node.prev = pred;
//经过CAS操做,将传入的线程放到队尾,这里用CAS操做,是由于此时可能会有多个线程插入队尾,因此在此时队尾元素是不太肯定的
if (compareAndSetTail(pred, node)) {
//进入这里,说明当前队尾元素就是当前线程,设置前缀节点就行了!
pred.next = node;
return node;
}
}
//若是代码执行到这一步,说明有两种状况
//1. 如今队列为空
//2. 将当前线程表明的节点插入队列的时候,有其余线程也要插入该队列而且成功成为队尾元素.
enq(node);
return node;
}
/*enq函数------------------分界线------------------------------------*/
private Node enq(final Node node) {//传入的是当前线程所表明的节点
for (;;) {//自旋
Node t = tail; //找到队尾元素
if (t == null) { // Must initialize
//到这里表明队列为空,那么经过CAS操做加入头结点,此时仍是可能有多个线程会跑到这里竞争
if (compareAndSetHead(new Node()))
/* 到这里说明当前线程设置head成功(竞争成功),注意,这个head是直接新建的,此时waitStatus == 0(到后面会说) 虽然设置了head,tail仍是null,所设置一下tail,让它不为null,方便下次for循环执行else语句从而进行将当前线程表明的节点设置在head后面,本身跟着思路走一下。 */
tail = head;
} else {
/* 到达这里也要分下状况 1. 是addWaiter想把当前线程加入队尾失败的时候 2. 是上个if语句设置head节点成功以后,下一次for循环了 不过上面的两种状况,都是想要把当前线程设置为队尾节点,也是经过CAS操做。由于此时也是有多个线程竞争的,若是成功就设置成功,若是失败就自旋操做,不断地尝试设置为队尾节点。 */
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
经过上面的简要分析,咱们知道addWaiter最终的结果就是返回一个插入队尾或者head后面的节点。接下来acquireQueued就是插入队列的线程进行等待,若是轮到这个线程去拿资源了就去拿。至关于在饭堂排队打菜同样。一个窗口只能为一位学生打菜,没轮到打菜的同窗能够休息作其余事情。
解释一下:若是acquireQueued函数返回true,则表示会进入中断处理,不会进行挂起,也就是说打菜的同窗不会休息,因此通常是返回false的
final boolean acquireQueued(final Node node, int arg) {//传入的是当前已经加入队尾的节点和想要获取的State
boolean failed = true;
try {
boolean interrupted = false;//默认设置没有中断
for (;;) {//这里也是自旋
//获取传入这个节点的前驱节点
final Node p = node.predecessor();
/* tryAcquire 若是p是head,也就是说当前这个线程的节点是阻塞队列的第一个节点,那么就去尝试获取state,毕竟这个是队列嘛,先来先到。有可能成功,也有可能失败。 由于head节点表示的是当前正在拥有资源的线程,不知道可否成功是由于不知道head节点有没有释放资源,其实在ReentractLock的tryAcquire就是判断state是否为0,若是为0,则表示没有线程拥有该资源,也就是说head节点释放了该资源,那么便可获取。 还有一个缘由就是在enq的时候,若是队列没有节点,也就是初始化head节点的时候,没有设置任何线程,也就是说head没有占用资源,那么当前线程做为阻塞队列的对头,能够去尝试去获取state,万一得了呢?! */
if (p == head && tryAcquire(arg)) {
//到这里是当前线程获取state成功,将当前节点设置为head节点,
setHead(node);
p.next = null; // help GC,让以前的head方便被JVM回收
failed = false;//表示获取state成功
return interrupted;//表示期间有没有被中断过
}
//到这里是说明 要么表明当前线程的节点不是阻塞队列的头结点 要么尝试获取state资源失败
//不论是哪一种状况,说明当前加入节点的线程想要知道本身此时的状态是什么,如果休息,可是谁告诉我下一次到我了?若不是休息,那么就找到能够休息的地方或者说到我打菜了。因此这里就用了waitStatus的变量表示
//
/* 若是有A Node对象,直接排在A后面,队列是这样的 A<=>B<=>C 1. 若是A的waitStatus = -1 ,表示说A 若是占用了state资源,那么排队在A后面的第一个Node节点(B节点)能够先休息(B线程挂起)了,若是A释放了资源那么就会唤醒B,也就是A对B说,你先去休息吧,我好了就叫你 2 .若是 A的waitStatu = 1,表示说A这个线程已经不想排队获取这个资源了,这里设置这个值主要是方便当前节点找到可让它能够他安心休息的地方。 3. waitStatu = 0 表示A是初始化状态 */
//这里是 主要是为了找到node能够休息的地方。若是找到就休息,若是找不到,那么说明node前面就是head了,下一次循环检查能不能获取资源就行了!
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//---------------setHead源码-------------------
private void setHead(Node node) {
//将head指针指向传入的节点
head = node;
//这里设置head节点的线程为null,同步实现器在实现tryAcquire成功的时候会把当前线程保存下来的
node.thread = null;
//这里是当前node的前缀
node.prev = null;
}
//---------------
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//传入的是前驱节点和当前节点
//获取前驱节点的状态,方便当前节点的接下来的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//进到这里就说明waitStatus = -1,也就说明node应该能够休息了,也就是线程挂起
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
//这里就是说前驱节点已经放弃排队了,当前节点往前找看看能不能找到没有取消排队的节点,看看能不能排在他们后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
//到这里颇有多是waitStatus为0的分支,上面咱们都没有设置waitStatus
// 咱们在enq的时候用了new Node()和在addWaiter刚开始的时候也是用了 new Node(mode,arg),这两处都是添加tail的时候
// 若是没有设置waitStatus的时候,是默认为0的,也就是说是初始化状态
// 若是到达这里前驱节点都是tail,咱们就要将队尾的状态设置为-1,让传进来的node节点能够找到休息点。或者是已经释放资源的head,那么下次node能够变为head了!!
// 设置可能会失败,由于这里也会有线程竞争,竞争不过,这里也是经过自旋,直到能找到休息点为止。
//也有多是pre节点已是head节点了,尚未释放state资源,此时pre(head)的waitStatus == -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//这里为何返回false呢,是由于若是这时候head已是node的pre,那么若是到这里head已经释放完资源以后,node下一次就能够直接获取资源啦!若是head还没释放资源,那么下一次node就直接去休息。
// 若是返回的是true的话,若是这时候head已是node的pre,head已经释放完资源,那么到后面线程节点就挂起,那么谁来唤醒node节点?
return false;
}
//--------------------------
private final boolean parkAndCheckInterrupt() {
//若是shouldParkAfterFailedAcquire返回的是true,如果false则不会执行到这里!
LockSupport.park(this);//线程从这里挂起,若是被唤醒和中断那么继续从这里往下执行。
return Thread.interrupted();
}
复制代码
从上面的解释咱们知道,若是挂起等待的线程须要获取资源,是须要前缀节点的waitStatus为SIGNAL的线程唤醒的,也就是head节点。
在独占模式中,具体的同步器实现类最终用到的是AQS的release方法,开始的时候说过了,具体的同步实现器只要实现tryRelease方法便可。
好比说ReentranctLock的unlock
public final boolean release(int arg) {
if (tryRelease(arg)) {//这里是先尝试释放一下资源,通常均可以释放成功,除了屡次重入但只释放一次的状况。
Node h = head;
//这里判断的是 阻塞队列是否还存在和head节点是不是tail节点,由于以前说过,队列的尾节点的waitStatus是为0的
if (h != null && h.waitStatus != 0)
//到这里就说明head节点已经释放成功啦,就先去叫醒后面的直接节点去抢资源吧
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
protected final boolean tryRelease(int releases) {
//对state的操做就是释放资源
int c = getState() - releases;
//若是执行释放操做的不是所拥有资源的线程,抛出异常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判断是否是有嵌套锁
if (c == 0) {
//若是到达这里,说明临界资源已经得到自由啦,没有线程占用它啦!因此设置free = true
free = true;
//同时会把拥有资源的线程设置为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
private void unparkSuccessor(Node node) {//传入的是head节点
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
//这里设置一下head的waitStatus,由于以前除了有节点加入队列的时候会把head节点ws = -1,基本没有其余地方设置,因此这里基本都是为-1的,CAS设置为0主要是head后面的直接节点不会挂起等待。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//下面的代码是若是阻塞队列有取消等待的节点,那么就把他们移除阻塞队伍,找到真正想要获取资源在等待的head后面的直接节点。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//到这里就说明找到了那个节点,也就是head后面的第一个没有取消等待的节点,这个节点可能已经挂起或者还在挂起的过程当中,反正都会执行唤醒线程的函数。这样若是是挂起的线程,就继续执行下一次自旋,下一次自旋确定拿到锁,进行操做。由于已经知足了是1. 唤醒的节点是阻塞队列的第一个节点,2. head节点已经释放资源了!
LockSupport.unpark(s.thread);
}
复制代码