原创做品,能够转载,可是请标注出处地址:http://www.javashuo.com/article/p-fyiwubsl-e.htmlhtml
AbstractQueuedSynchronizer简称为AQS,是并发包中用于实现并发工具的基础类,很是明显,它是一个抽象类。java
它提供了一个依赖于FIFO队列的框架用于实现各类阻塞锁与同步器。node
它依赖于一个int值来表示状态,并定义了获取和修改该状态值的原子方法,具体的同步器须要实现该抽象类,而且使用它定义的这些原子方法来操做状态值。并发
它的实现类通常做为待实现的同步器的静态内部类而存在,用来提供一些方法来实现同步器的功能。框架
咱们能够将其看做是基础的同步器,并非具体的某一个同步器,而是同步器的一个抽象。工具
首先来看看其继承体系:源码分析
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {}
能够看到它继承了AbstractOwnableSynchronizer抽象类,这个类很简单,咱们能够总体来看看:ui
// 就是一个简单的独占式同步器,持有被独占拥有的线程 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; // 供子类调用的构造器 protected AbstractOwnableSynchronizer() { } // 表示独占拥有的线程,下面是其get和set方法 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
静态内部类Node用于将要加入同步队列的线程封装成为队列节点。这个队列采用双向链表实现,支持先进先出。this
static final class Node {}
该静态内部类被final修饰,代表做者但愿其不被继承修改。线程
static final class Node { // 两个节点标记,用于标识节点对应的线程获取锁的模式,是共享式获取,仍是独享式获取 static final Node SHARED = new Node();// 共享模式的节点标记 static final Node EXCLUSIVE = null;// 独享模式的节点标记 // 四个节点状态,其实还有一个状态为0-表示当前节点在同步队列中,等待着获取锁 static final int CANCELLED = 1;// 表示当前节点封装的线程被中断或者超时 static final int SIGNAL = -1;// 表示当前节点的后继节点须要被唤醒(unpark) static final int CONDITION = -2;// 表示当前节点位于等待队列中,在等待条件知足 static final int PROPAGATE = -3;// 表示当前场景下后续的acquireShared可以得以执行?? // 节点状态,其值就是上面定义的这四个状态值再加上0 volatile int waitStatus; // 同步队列的节点指针 volatile Node prev;// 双向链表中节点指向前节点的指针 volatile Node next;// 双向链表中节点指向后节点的指针 // 节点封装的执行线程 volatile Thread thread; // 等待队列的节点指针 Node nextWaiter;// 单向链表中节点指向后节点的指针 }
节点状态:
- 0:默认状态,表示节点是同步队列中等待获取锁的线程的节点
- 1:CANCELLED,表示节点被取消,缘由多是超时或者被中断,一旦置于该状态,则再也不改变
- -1:SIGNAL,表示当前节点的后继节点被阻塞(或即将被阻塞)(使用park),所以当前线程释放锁或者被取消执行时须要唤醒(unpark)后继节点
- -2:CONDITION,表示当前节点位于等待队列中,当节点被转移到同步队列的时候,状态值会被更新为0
- -3:PROPAGATE,表示持续的传播releaseShared操做
static final class Node { Node() {} Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
三个构造器各有用处:
- Node():用户初始化头结点,或者建立共享标记SHARED
- Node(Thread thread, Node mode):给同步队列添加新节点时使用,用于构造新节点
- Node(Thread thread, int waitStatus):给等待队列添加新节点时使用,用于构造新节点
注意:上面的构造器中的mode(模式)属于Node类型,它有两种模式SHARED和EXCLUSIVE,分别表示共享模式和独享模式。而waitStatus表示的是节点状态。
static final class Node { // 校验当前节点是不是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前置节点,必须为非null final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } }
方法解析:
isShared方法主要用于校验当前节点的锁获取模式,是共享仍是独享,实现方式采用nextWaiter与SHARED比较,参照上面的第二个构造器的实现,咱们能够知道在新增一个节点的时候,会对节点的nextWaiter进行赋值,而所赋的值正好是新增节点的模式标记,能够说nextWaiter持有节点的模式标记,那么拿其来与SHARED进行比较就是很显然的事情了。
predecessor方法用于获取前置节点,主要是在当前置节点不可为null时使用,这样当前置节点为null,就会抛出空指针。
Condition并不是AQS中的内部类,而是其内部类ConditionObject的父接口,为了后面的ConditionObject,咱们提早了解下Condition。
Condition是一个接口,旨在定义一系列针对获取锁的线程的操做,实现相似于Object类中wait/notify的功能。咱们经过其方法定义能够明显感受到这一点。
public interface Condition { // 使当前线程等待,知道被唤醒或者中断,注意须要在临界区使用,执行该方法以后该线程持有的锁将被释放,线程处于等待状态 // 四种状况下会退出等待状态:被signal唤醒,被signalAll唤醒,被interrupt唤醒(须要当前线程能够响应中断),发生伪唤醒 void await() throws InterruptedException; // 使当前线程等待,直到被唤醒(不响应中断),注意要在临界区使用,执行该方法以后该线程持有的锁将被释放,线程处于等待状态 // 三种状况下会退出等待状态:被signal唤醒,被signalAll唤醒,发生伪唤醒 void awaitUninterruptibly(); // 使当前线程等待,知道被唤醒或者中断或者超时,注意须要在临界区使用,执行该方法以后该线程持有的锁将被释放,线程处于等待状态 // 五种状况下会退出等待状态:被signal唤醒,被signalAll唤醒,被interrupt唤醒(须要当前线程能够响应中断),超时,发生伪唤醒 // nanosTimeout表示当前线程要等待的时间长度 // 该方法返回一个正数表示线程被提早唤醒,返回一个负数或0表示等待超时 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同上,不一样在于上面的只能传参为纳秒值,该方法能够经过单位随便传值 boolean await(long time, TimeUnit unit) throws InterruptedException; // 使当前线程等待,知道被唤醒或者中断或者过了截止日期,注意须要在临界区使用,执行该方法以后该线程持有的锁将被释放,线程处于等待状态 // 退出等待状态的状况同上,只是这里传参为一个固定的时间点,线程等待到这个时间点将自动苏醒 boolean awaitUntil(Date deadline) throws InterruptedException; // 唤醒等待队列中的一个线程,该线程从await返回时必须获取到锁 void signal(); // 唤醒等待队列中的全部线程,每一个线程从await返回时必须获取到锁 void signalAll(); }
ConditionObject是Condition的实现类,在AQS中以普通内部类的方式存在。
ConditionObject内部维护了一个单向链表实现的等待队列,队列的节点与AQS中同步队列的节点类型一致,均为上面的内部类Node类型。
下面咱们来仔细看看这个类:
public class ConditionObject implements Condition, java.io.Serializable {}
该类实现了Condition接口和Serializable接口,拥有序列化功能
public class ConditionObject implements Condition, java.io.Serializable { // 序列化ID private static final long serialVersionUID = 1173984872572414699L; // 等待队列头结点指针 private transient Node firstWaiter; // 等待队列尾节点指针 private transient Node lastWaiter; // 中断模式 private static final int REINTERRUPT = 1;// 退出等待队列时从新中断 private static final int THROW_IE = -1;// 退出等待队列时抛出InterruptedException异常 }
咱们能够看到类的五个字段中除了三个静态字段以外,剩下的两个被transient修饰,也就是说虽然该类支持序列化,可是序列化无值。
ConditionObject中的公共方法其实就是对Condition接口中定义方法的实现,下面咱们逐个分析:
await():
public class ConditionObject implements Condition, java.io.Serializable { public final void await() throws InterruptedException { // 1-响应中断,同时会清除中断标记 if (Thread.interrupted()) throw new InterruptedException(); // 2-将当前线程封装成Node节点并添加到等待队列尾部 Node node = addConditionWaiter(); // 3-释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); int interruptMode = 0; // 4-阻塞当前线程,直到被中断或者被唤醒 while (!isOnSyncQueue(node)) {// 校验当前线程是否被唤醒(是否被转移到同步队列),若是已唤醒则退出循环 LockSupport.park(this);// 阻塞当前线程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 校验当前线程是否被中断 break;// 若是被中断则退出循环 } // 5-自旋等待获取到同步状态(即获取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 6-处理被中断的状况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } }方法解析:
- 第一步:优先响应中断,首先校验当前线程是否被中断,若是被中断则抛出InterruptedException异常,不然下一步;
- 第二步:调用addConditionWaiter()方法,目的是将当前线程封装成为Node节点并添加到等待队列的尾部,源码以下:
public class ConditionObject implements Condition, java.io.Serializable { private Node addConditionWaiter() { Node t = lastWaiter;// 保存尾节点 // If lastWaiter is cancelled, clean out. // 若是尾节点线程被取消,则清除之 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 清除等待队列中全部的被取消的线程节点 t = lastWaiter; } // 将当前线程封装成为等待队列的Node节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 若是等待队列为空,则将新节点做为头节点 firstWaiter = node; else // 不然将新节点做为新的尾节点添加到等待队列中 t.nextWaiter = node; // 更新尾节点指针 lastWaiter = node; return node; } }这个方法里面除了封装节点和添加节点以外,还有针对等待队列进行清理的流程,主要是为了清理被取消的线程节点
- 第三步:调用fullyRelease(node)方法,用于释放当前线程所持有的锁并唤醒同步队列的下一节点,详情可见AQS方法解析部分;
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 获取同步状态state值 // 执行release方法,尝试释放当前线程持有的共享状态,并唤醒下一个线程 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } }
第四步:调用LockSupport.park(this)阻塞当前线程,一但消除被中断后者线程被唤醒转移到同步队列,则退出循环,继续下一步;
这里涉及到一个中断模式的问题。中断模式以前提到过,有两种:REINTERRUPT和THROW_IE,分别表示针对被中断的线程在退出等待队列时的处理方式,前者从新中断,后者则抛出异常。
此处interruptMode表示的就是中断模式的值,初始赋值为0,而后经过checkInterruptWhileWaiting(node)方法不断的进行校验,其源码以下:public class ConditionObject implements Condition, java.io.Serializable { private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } }若是线程被中断则经过方法transferAfterCancelledWait(node)判断线程是不是在被唤醒以前被中断,若是是则返回true,不然返回false;若是返回true则采用THROW_IN模式,不然采用REINTERRUPT模式。不管是上面的哪种模式都表明线程被中断了,那么此处interruptMode就再也不是0,那么条件成立,break退出循环。除此以外transferAfterCancelledWait(node)方法不管返回true仍是false,都会将现场节点转移到同步队列中
- 第五步:当前线程已经被转移到同步队列中,而后开始自旋以获取同步状态,待其获取到同步状态(锁)以后,返回该线程是否被中断,若是被中断,再根据其中断模式进行整理,如何整理呢,主要就是若是当前中断模式是THROW_IE模式,则保持不变,不然一概修改为REINTERRUPT模式,以后会再次进行一次同步队列节点清理。
第六步:最后针对不一样的中断模式进行中断处理,若是是THROW_IN则抛出异常,若是是REINTERRUPT则再次进行中断。
awaitNanos(long):
public class ConditionObject implements Condition, java.io.Serializable { public final long awaitNanos(long nanosTimeout) throws InterruptedException { // 1-优先响应中断 if (Thread.interrupted()) throw new InterruptedException(); // 2-将当前线程封装成Node节点并添加到等待队列尾部 Node node = addConditionWaiter(); // 3-释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout;// 计算截止时间点 int interruptMode = 0; // 4-阻塞当前线程,直到被中断或者被唤醒或者超时 // 4-1 校验当前线程是否被唤醒,若是没有进入循环体 while (!isOnSyncQueue(node)) { // 4-2 若是超时时间小于等于0,则表示线程当即超时,而后进行线程节点转移处理,并结束循环 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node);// 转移线程节点 break; } // 4-3 若是超时设置时间nanosTimeout大于等于spinForTimeoutThreshold,则进行定时阻塞当前线程 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 4-4 若是线程被中断,则转移线程到同步队列,并结束循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 每次循环都会计算新的nanosTimeout值,而后在下次循环的时候设置阻塞的时限 nanosTimeout = deadline - System.nanoTime(); } // 5-自旋等待获取到同步状态(即获取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); // 6-处理被中断的状况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } }方法解析:
这个方法的流程与上面的await基本一致,只是在第4步中添加了关于超时判断的逻辑,这里就着重看一下这一部分,其他部分再也不赘述。
包括两个部分的内容,第一是开始的校验,若是设置的超时时间小于等于0,表示线程等待当即超时,而后当即转移到同步队列尾部,尝试获取锁;第二是若是设置的超时时间大于等于spinForTimeoutThreshold的值,则将当前线程阻塞指定的时间,这个时间会随着循环的次数不断的减少。
另外的两个等待方法awaitUntil(Date deadline)和await(long time, TimeUnit unit)就再也不赘述了,原理彻底一致,有一个不一样的是awaitUninterruptibly()方法:
awaitUninterruptibly():
public class ConditionObject implements Condition, java.io.Serializable { public final void awaitUninterruptibly() { // 1-将当前线程封装成Node节点并添加到等待队列尾部 Node node = addConditionWaiter(); // 2-释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); boolean interrupted = false; // 3-阻塞当前线程,直到被唤醒 while (!isOnSyncQueue(node)) { LockSupport.park(this);// 阻塞当前线程 if (Thread.interrupted()) interrupted = true; } // 4-自旋尝试获取同步锁 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } }其实就是不响应中断的等待方法,从源码中能够看出,虽然不响应中断,可是仍然保存着中断标志。
下面就来看看唤醒的方法:
signal():
public class ConditionObject implements Condition, java.io.Serializable { public final void signal() { // 1-校验当前线程时候独享式持有共享锁,若是不持有则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;// 保存等待队列首节点 // 2-若是队列不为空,则执行头节点唤醒操做 if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 3-若是等待队列只有一个节点,则将lastWaiter更新为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 4-尝试将线程节点从等待队列转移到同步队列,若是成功则结束循环,若是失败则再次判断firstWaiter首节点是否为null,若是不是null,则再次循环,不然结束循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } }方法解析:
- 第一步:校验当前线程时候独享式持有共享锁,若是不持有则抛出异常
- 第二步:若是队列不为空,则执行头节点唤醒操做
- 第三步:若是等待队列只有一个节点(头节点),则将lastWaiter更新为null
- 第四步:尝试将线程节点从等待队列转移到同步队列,若是成功则结束循环,若是失败则再次判断firstWaiter首节点是否为null,若是不是null,则再次循环,不然结束循环
signalAll():
public class ConditionObject implements Condition, java.io.Serializable { public final void signalAll() { // 1-校验当前线程时候独享式持有共享锁,若是不持有则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 2-若是队列不为空,则执行节点唤醒操做 if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null;// 要唤醒全部线程节点,那么等待队列就是被清空,那么就将这两个指针置为null // 3-针对等待队列中的节点一个一个进行唤醒操做 do { Node next = first.nextWaiter;// 保存二节点 first.nextWaiter = null; transferForSignal(first);// 将首节点转移到同步队列 first = next;// 重置首节点,将二节点做为新的首节点 } while (first != null); } }
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe();// 注入Unsafe实例 private static final long stateOffset;// 同步状态偏移量 private static final long headOffset;// 等待队列的头结点偏移量 private static final long tailOffset;// 等待队列的尾节点偏移量 private static final long waitStatusOffset;// 节点等待状态偏移量 private static final long nextOffset;// 节点的下级节点偏移量 static { try { // 获取这五个字段的内存偏移量并保存到各自的字段中 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } }
从这一部份内容能够看出来AQS底层和ConcurrentHashMap同样是使用CAS来实现原子操做的。
这一部分就是引入Unsafe来实现原子以上几个字段的原子更新。知道便可。
AQS中字段很少,以下所示:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private transient volatile Node head;// 等待队列的头结点 private transient volatile Node tail;// 等待队列的尾节点 private volatile int state;// 同步状态,初始为0,获取锁时会加1,释放锁时减1,当重入锁时也会加1 static final long spinForTimeoutThreshold = 1000L;// 自旋时限1000纳秒 }
这里的head和tail分别指向的是同步器的同步队列的头结点与尾节点。这个同步队列采用双向链表实现,其节点就是以前介绍的内部类中的Node类型。
state表示同步状态,初始为0,表示未被持有,当其被某线程持有时,就会增长1,并且这个也是实现重入的基础,当该线程再次获取当前锁时,只须要state加1便可,每释放一个锁,state-1,直到state等于0时,该同步锁为彻底释放。
spinForTimeoutThreshold是一个内置的快速自旋时限,当设置的超时时间小于这个值的时候,无需再执行等待设置,直接进入快速自旋便可,缘由在于 spinForTimeoutThreshold 已经很是小了,很是短的时间等待没法作到十分精确,若是这时再次进行超时等待,相反会让nanosTimeout 的超时从总体上面表现得不是那么精确,因此在超时很是短的场景中,AQS会进行无条件的快速自旋。
AQS中的方法能够粗分为四类:获取同步状态方法、释放同步状态方法、队列检验方法、队列监控方法,咱们罗列一个表格来简单介绍下这些方法:
分类 | 序号 | 方法 | 说明 | 备注 |
---|---|---|---|---|
获取同步状态方法 | 1 | final void acquire(int arg) | 独享获取同步状态,不响应中断 | |
获取同步状态方法 | 2 | final void acquireInterruptibly(int arg) | 独享获取同步状态,响应中断 | |
获取同步状态方法 | 3 | final boolean tryAcquireNanos(int arg, long nanosTimeout) | 独享获取同步状态,响应中断,响应超时 | |
获取同步状态方法 | 4 | final void acquireShared(int arg) | 共享获取同步状态,不响应中断 | |
获取同步状态方法 | 5 | final void acquireSharedInterruptibly(int arg) | 共享获取同步状态,响应中断 | |
获取同步状态方法 | 6 | final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 共享获取同步状态,响应中断,响应超时 | |
释放同步状态方法 | 7 | final boolean release(int arg) | 独享释放同步状态 | |
释放同步状态方法 | 8 | final void acquireShared(int arg) | 共享释放同步状态 | |
队列检验方法 | 9 | final boolean hasQueuedThreads() | 校验同步队列中是否有线程在等待获取同步状态 | |
队列检验方法 | 10 | final boolean hasContended() | 校验是否有线程争用过此同步器(同步队列是否为空) | |
队列检验方法 | 11 | final boolean isQueued(Thread thread) | 校验给定线程是否在同步队列之上 | |
队列检验方法 | 12 | final boolean hasQueuedPredecessors() | 校验是否有线程等待获取同步状态比当前线程时间长(同步队列中是都有前节点) | |
队列检验方法 | 13 | final boolean owns(ConditionObject condition) | 校验给定的condition是不是使用当前同步器做为锁 | |
队列检验方法 | 14 | final boolean hasWaiters(ConditionObject condition) | 校验等待队列是否有等待线程 | |
队列监控方法 | 15 | final int getWaitQueueLength(ConditionObject condition) | 获取等待队列中线程数量 | |
队列监控方法 | 16 | final Collection
|
获取等待队列中等待线程的集合 | |
队列监控方法 | 17 | final Thread getFirstQueuedThread() | 获取同步队列中的头节点线程 | |
队列监控方法 | 18 | final int getQueueLength() | 获取同步队列中线程数量 | |
队列监控方法 | 19 | final Collection
|
获取同步队列中线程的集合 | |
队列监控方法 | 20 | final Collection
|
获取同步队列中欲独享获取同步状态的线程集合 | |
队列监控方法 | 21 | final Collection
|
获取同步队列中欲共享获取同步状态的线程集合 |
这些方法中重点就是获取同步状态方法和释放同步状态方法,下面咱们也重点就看下这些个方法的实现:
该方法表示独享式获取同步状态,但不响应中断,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } }
该方法中调用了四个方法来完成功能,依次为:
解析:首先尝试独享式获取同步状态,若是获取到了就结束,
若是未获取到则将线程封装成为Node节点并添加到同步队列尾部,而后自旋以获取同步状态,
一旦获取到同步状态,退出自旋,并返回当前线程在自旋期间是否被中断过,若是被中断过则再次自我中断,
为何须要再次自我中断呢,这只是为了保留中断现场,由于在自旋结束进行中断校验时使用的是Thread.interrupted(),
该方法会致使中断状态被清除。
tryAcquire方法是一个模板方法,须要在AQS的子类中实现,默认的实现只是抛出了一个异常
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } }
addWaiter方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private Node addWaiter(Node mode) { // 将当前线程与同步状态获取模式封装成为Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试快速进行一次enq操做,将新节点设置为同步地列尾节点, // 若是成功会结束方法但若是不成功,能够由下面的enq方法来执行, // 这个enq方法能够经过无限循环的方法直到执行成功 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 将新节点添加到同步队列中 enq(node); return node; } // 将新节点添加到同步队列中 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 这一步主要是针对同步队列未初始化时进行的初始化操做,初始化完成后下次循环就会执行新节点的添加操做 if (compareAndSetHead(new Node())) tail = head; } else { // 将以前的尾节点设置为新节点的前节点,而后原子更新尾节点为新节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } }
解析:很明显上面的addWaiter方法中出现了添加新节点到同步队列的逻辑,而在以后的enq方法中再次出现,
主要目的就是为了能在执行enq方法以前能够先进行一次尝试,看可否一次执行成功,若成功,则皆大欢喜,
没必要走下面的逻辑,若不成功,再走enq方法,来经过无限循环的方式强制执行成功。因此前面的逻辑能够当作是一次简单的enq操做。
acquireQueued方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final boolean acquireQueued(final Node node, int arg) { boolean failed = true;// 默认失败 try { boolean interrupted = false;// 中断标记 for (;;) {// 无限循环以自旋 final Node p = node.predecessor();// 获取前置节点 // 若是前节点是头节点,而且当前线程获取同步状态成功,则将当前节点置为头节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC,这里去除之前的节点对当前节点的引用,当前节点对象再也不被使用后能够被GC清理 failed = false;// 表示成功 return interrupted; } // 若是前置节点不是头节点,或者当前节点线程未获取到同步状态,则将尝试将前置节点状态更新为SIGNAL,并阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } }
解析:以无限循环的方法自旋,每次循环都尝试独享式获取同步状态,若是获取到了同步状态,
那么将当前节点置为头节点;若是前置节点不是头节点或者未获取到同步状态则尝试将前置节点的状态更新为SIGNAL,并阻塞当前线程(park),
这种状况下,当前线程须要被唤醒才能继续执行,当被唤醒以后能够再次循环,尝试获取同步状态,若是不成功,将会再次阻塞,等待再次被唤醒。
AbstractQueuedSynchronizer方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// 获取前置节点的状态 if (ws == Node.SIGNAL) // 表示后置线程节点(当前节点须要被唤醒) return true; if (ws > 0) { // 表示前置节点线程被取消,那么清理被取消的线程节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 尝试将前置节点的状态置为SIGNAL,只有置为SIGNAL以后才能返回true. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } }
解析:这个方法主要目的就是为了将前置节点状态置为SIGNAL,这个状态意思是它后面的那个节点被阻塞了,
须要被唤醒,可见这个状态就是一个标记,标记着后面节点须要被唤醒。
parkAndCheckInterrupt方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private final boolean parkAndCheckInterrupt() { LockSupport.park(this);// 阻塞当前线程 return Thread.interrupted(); } }
解析:一旦线程执行到这一步,那么当前线程就会阻塞,后面的return暂时就不会执行。只有在被唤醒以后才能接着返回中断校验的结果。
总结:acquire方法首先尝试独享式获取同步状态(tryAcquire),获取失败的状况下须要将当前线程封装成为一个Node节点,
而后首先尝试将其设置为同步队列的为节点,若是失败,则自旋直到成功为止,而后进行自旋判断当前节点是否第二节点,若是是,
则尝试获取同步状态,若是成功,将当前节点置为头节点;不然若是当前节点不是第二节点,或者获取同步状态失败,
则将前置节点状态置为SIGNAL,而后阻塞(park)当前线程,等待被唤醒,唤醒以后会重复自旋,判断节点是否第二节点和尝试获取同步状态,
若是还不成功,那么就再次阻塞...
该方法表示独享式获取同步状态,响应中断,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireInterruptibly(int arg) throws InterruptedException { // 中断校验,会清除中断状态 if (Thread.interrupted()) throw new InterruptedException(); // 尝试独享式获取同步状态,若是失败则尝试中断的获取。 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 中断的获取同步状态 private void doAcquireInterruptibly(int arg) throws InterruptedException { // 首先将当前线程封装成为Node节点,并保存到同步队列尾部 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋,逻辑桶acquire for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:一开始就进行中断校验,若是未被中断则尝试独享式获取同步状态,获取失败后则封装线程为Node节点并保存到同步队列,而后自旋,逻辑与acquire种的acquireQueued方法逻辑一致,再也不赘述。
该方法表示独享式获取同步状态,响应中断,响应超时,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 首先响应中断,进行中断校验,若被中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);// 超时获取 } // 超时获取 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是超时时间小于等于0,则直接超时,返回false if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;// 计算截止时间点 final Node node = addWaiter(Node.EXCLUSIVE);// 封装线程节点,并添加到同步队列 boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime();// 计算剩余超时时间 // 若是剩余超时时间小于等于0,这说明超时,返回false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) &&// 将前置节点状态置为SIGNAL nanosTimeout > spinForTimeoutThreshold)// 剩余超时时间大于快速自旋时限(1000纳秒) LockSupport.parkNanos(this, nanosTimeout);// 限时阻塞当前线程,超时时间为剩余超时时间 // 再次响应中断,进行中断校验,若被中断直接抛出异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
spinForTimeoutThreshold:这是系统内置的一个常量,设置为1000纳秒,这是一个很短的时间,若是要阻塞的剩余时间小于这个值,就没有必要再执行阻塞,直接进入快速自旋过程。
解析:总体逻辑基本与前面的两种相似,不一样之处在于增长了针对超时时间处理的逻辑。
与acquireInterruptibly相似,一开始就进行中断校验,若被中断则抛出异常,不然尝试独享式获取同步状态,
获取成功,则返回true,若是获取失败,则将线程封装成Node节点保存到同步队列,而后计算截止时间点(当前时间+超时时间),
而后开始自旋,自旋的逻辑中前半部分与以前相同,只有在前置节点不是头节点或者获取同步状态失败的状况下逻辑发生了改变,
先计算剩余超时时间nanosTimeout(截止时间点-当前时间),而后将前置节点的状态置为SIGNAL,判断剩余超时时间是否大于
spinForTimeoutThreshold,若是大于则限时阻塞当前线程,不然快速自旋便可。
该方法表示共享式获取同步状态,不响应中断,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } }
解析:首先尝试共享式获取同步状态,若是获取失败(返回负值),则执行doAcquireShared方法。
tryAcquireShared方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } }
该方法是一个模板方法,须要子类来完善逻辑。但大体意义以下,若是获取失败返回负数(-1),若是是该同步状态被首次共享获取成功,返回0,非首次获取成功,则返回正数(1)
doAcquireShared方法源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doAcquireShared(int arg) { // 将线程封装成功节点,保存到同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) {// 自旋 final Node p = node.predecessor();// 获取前置节点 if (p == head) { // 若是前置节点为头节点 int r = tryAcquireShared(arg); if (r >= 0) { // 若是成功获取到同步状态,则将当前节点置为头节点,并进行传播唤醒 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是前置节点非头节点或者获取同步状态失败,则将前置节点设置为SIGNAL,而后阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 预存原始头节点 setHead(node);// 将当前节点置为头节点 // propagate可为0或1,0表示同步状态被首次获取,1表示被屡次获取 // h为原始头节点 // head为新头节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next;// 获取下级节点s // 若是后继节点不存在或者后继节点是共享式的,则唤醒后继节点 if (s == null || s.isShared()) doReleaseShared();// 唤醒后继节点 } } }
解析:该方法的逻辑相对于acquireQueued只是稍有变更,大体意思是相同的。不一样之处在于此处涉及到一个传播(Propagate)。
所谓的传播,实际上是在当前节点共享式获取到同步状态以后,检查其后置节点是否也是在等待共享式获取同步状态,如果,则将唤醒其后置节点。
doReleaseShared源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doReleaseShared() { for (;;) {// 自旋 Node h = head;// 获取头节点 if (h != null && h != tail) {// 若是队列中存在多个节点的话 int ws = h.waitStatus;// 头节点状态ws // 若是头节点状态为SIGNAL,则将其 if (ws == Node.SIGNAL) {// 说明其后继节点线程被阻塞,须要唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 首先将头节点状态重置为0 continue;// 若是重置头节点状态操做失败则重试 unparkSuccessor(h);// 而后进行后继节点唤醒 } // 若是头节点状态为0,则将其状态更新为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// 头节点更新操做失败则重试 } if (h == head) break;// 头节点发生变化则退出自旋 } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next;// 获取后继节点s if (s == null || s.waitStatus > 0) { // 若是s为null或者其状态为取消,则从后遍历队列节点,找到node节点以后的首个未被取消的节点t,赋给s s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);// 执行s节点线程的唤醒操做 } }
解析:doReleaseShared方法被两处调用,一为此处,另外一为releaseShared方法,这个是用来共享式释放同步状态的方法。
doReleaseShared方法的做用就是为了唤醒后继节点,主要逻辑以下:首先获取头节点的状态ws,若是ws是SIGNAL,
表示后继节点须要被唤醒,而后自旋将头节点状态更新为0,并执行后继节点唤醒操做,这里要确保唤醒的是头节点以后首个
未被取消的线程节点,唤醒以后,后继节点的线程开始继续执行,当前线程也继续执行;若是ws是0,则将头节点的状态更新为PROPAGATE,
来确保同步状态能够顺利传播(由于若是ws为SIGNAL,会自动唤醒下一个节点,而0则不会,全部将其更新为PROPAGATE,表示共享式获取的传播)
被唤醒的线程会重置头节点,一旦重置,当前线程在最后校验头节点那一步就会成功,而后执行break退出自旋。通常来讲这里唤醒的主要目的是为了唤醒一个共享式获取同步状态的线程节点,它会直接获取到同步状态;但也存在特殊状况,好比
这个节点线程被取消了,致使唤醒了一个独享式获取的线程节点,那么在这个线程被唤醒后尝试独享式获取同步状态的时候会获取不到
(由于同步状态被共享式获取的线程持久,并且多是多个)从而再次进入阻塞。其实唤醒的主要来源仍是靠同步状态释放操做来发起的。
该方法表示共享式获取同步状态,响应中断,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 首先响应中断 if (Thread.interrupted()) throw new InterruptedException(); // 尝试共享式获取同步状态,失败则执行doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 可中断的共享式获取同步状态 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先封装线程节点,保存到同步队列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor();// 获取前置节点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 若是发生了中断则抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:这个方法与acquireShared几乎一致,仅仅是在处理中断的问题上有点区别,因此再也不赘述。
该方法表示共享式获取同步状态,响应中断,响应超时,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是超时时间小于等于0,则直接超时,返回false if (nanosTimeout <= 0L) return false; // 计算超时截止时间点(当前时间+超时时间) final long deadline = System.nanoTime() + nanosTimeout; // 封装节点并保存队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } // 计算剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); // 若是剩余超时时间小于等于0,直接超时,返回false if (nanosTimeout <= 0L) return false; // 将前置节点置为SIGNAL,而后校验剩余超时时间,若是不足spinForTimeoutThreshold,则进入快速自旋,不然执行阻塞 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 再次响应中断 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:基本雷同,能够参考共享式获取同步状态的方法和独享式响应中断超时的获取方法。
该方法表示独享式释放同步状态,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean release(int arg) { // 首先尝试独享式释放同步状态 if (tryRelease(arg)) { Node h = head;// 头节点 // 头节点存在且状态不为0,则唤醒其后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } // 释放失败返回false return false; } }
解析:首先调用tryRelease来尝试独享式释放同步状态,若是成功,则根据头节点的状态来决定是否唤醒后继节点,
头节点为0则不唤醒。唤醒操做经过调用unparkSuccessor方法来实现,具体逻辑以前已有描述,这里总结一下:
其实就是唤醒头节点以后的首个未被取消的节点线程,这个线程多是独享式的也多是共享式的。
tryRelease源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } }
解析:tryRelease方法是一个模板方法,一样须要子类来实现。
该方法表示共享式释放同步状态,源码以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean releaseShared(int arg) { // 尝试共享式释放同步状态,成功后唤醒后继节点 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } }
解析:很简单,其中的doReleaseShared方法咱们也了解了。
tryReleaseShared源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } }
解析:和前面的那几个模板方法同样,须要子类来实现。
剩下的方法都是一些校验和监控的方法,并不涉及重点逻辑,再也不赘述,下面作一个总结
总结:
- AQS同步器内部维护了一个底层为双向链表的同步队列,用于保存那些获取同步状态失败的线程。每一个AQS同步器还能够关联多个Condition,其中每一个Condition内部维护了一个底层为单向链表的等待队列,用于保存那些基于特定条件而陷入等待的线程。
- 内部类Node描述的是同步队列和等待队列中节点的类型。节点有两点须要注意,那就是节点的模式与状态
- 节点模式:
- EXCLUSIVE:独享式
- SHARED:共享式
- 节点状态:
- 0:初始状态,该状态下不会唤醒后继节点
- CANCELLED(1):取消状态,节点线程被中断或超时
- SIGNAL(-1):唤醒状态,表示该节点的后继节点被阻塞,须要唤醒
- CONDITION(-2):表示当前节点位于等待队列中,在等待条件知足
- PROPAGATE(-3):表示共享式获取同步状态的传播
- 内部类ConditionObject是Condition的实现类,做为附着在同步器上的一个功能,可用可不用;它提供了一些方法来执行等待和唤醒操做:
- 等待操做:
- await():响应中断
- awaitNanos(long):响应中断,响应超时
- awaitUninterruptibly():不响应中断,不响应超时
- 唤醒操做:
- signal()
- signalAll()
- AQS同步器提供了多个方法历来辅助实现同步状态的获取与释放:
- 独享式获取:
- acquire(int):不响应中断,不响应超时
- acquireInterruptibly(int):响应中断
- tryAcquireNanos(int, long):响应中断,响应超时
- 独享式释放:
- release(int)
- 共享式获取:
- acquireShared(int):不响应中断,不响应超时
- acquireSharedInterruptibly(int):响应中断
- tryAcquireSharedNanos(int, long):响应中断,响应超时
- 共享式释放:
- releaseShared(int)
参考: