本文基于JDK1.8java
AQS即AbstractQueuedSynchronizer
,队列同步器,他是构建众多同步组件的基础框架,如ReentrantLock
、ReentrantReadWriteLock
等,是J.U.C并发包的核心基础组件。node
AQS框架基于模板方法设计模式构建,子类经过继承它,实现它提供的抽象方法来管理同步状态。算法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 序列化版本号 private static final long serialVersionUID = 7373984972572414691L; /** * 建立AQS实例,初始化state为0,为子类提供 */ protected AbstractQueuedSynchronizer() { } /*---------------- 同步队列构成 ---------------*/ // 等待队列节点类型 static final class Node { //...省略 } /** * 除了初始化以外,它只能经过setHead方法进行修改。注意:若是head存在,它的waitStatus保证不会被取消 */ private transient volatile Node head; /** * 等待队列的尾部,懒初始化,以后只在enq方法加入新节点时修改 */ private transient volatile Node tail; /*---------------- 同步状态相关 ---------------*/ /** * volatile修饰, 标识同步状态,state为0表示锁空闲,state>0表示锁被持有,能够大于1,表示被重入 */ private volatile int state; /** * 返回当前同步状态 */ protected final int getState() { return state; } /** * 设置同步状态 */ protected final void setState(int newState) { state = newState; } /** * 利用CAS操做更新state值 */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } static final long spinForTimeoutThreshold = 1000L; // 这部分和CAS有关 // 获取Unsafe实例 private static final Unsafe unsafe = Unsafe.getUnsafe(); // 记录state在AQS类中的偏移值 private static final long stateOffset; static { try { // 初始化state变量的偏移值 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); } catch (Exception ex) { throw new Error(ex); } } }
以上包括AQS的基本字段,比较核心的就是两个部分:编程
接下来咱们将一一学习这些内容。c#
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer,它提供了设置或获取独占锁的拥有者线程的功能。设计模式
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; // 自己是抽象类,该构造方法实为为子类提供 protected AbstractOwnableSynchronizer() { } /* 互斥模式同步下的当前线程 */ private transient Thread exclusiveOwnerThread; /* 设置当前拥有独占访问的线程。锁的拥有线程,null 参数表示没有线程拥有访问。 * 此方法不另外施加任何同步或 volatile 字段访问。 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /* 返回由 setExclusiveOwnerThread 最后设置的线程; * 若是从未设置,则返回 null。 * 此方法不另外施加任何同步或 volatile 字段访问。 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
这里exclusiveOwnerThread字段用来判断当前线程是否是持有锁,由于锁能够重入嘛,所以就会产生下面这样的伪代码:安全
if (currThread == getExclusiveOwnerThread()) { state++; }
tips:同步队列被称为CLH队列,是Craig,Landin,Hagersten的合称。多线程
AQS经过内置的FIFO同步双向队列来完成资源获取线程的排队工做,内部经过节点head【其实是虚拟节点,真正的第一个线程在head.next的位置】和tail记录队首和队尾元素,队列元素类型为Node。并发
CLU同步队列的结构以下,具体的操做以后再作总结:框架
图源:【AQS】核心实现
/** * 等待队列中的节点类 */ static final class Node { /** 标识共享式节点 */ static final Node SHARED = new Node(); /** 标识独占式节点 */ static final Node EXCLUSIVE = null; /** ------------ 等待状态 ---------------*/ /** 表示该线程放弃对锁的争夺 */ static final int CANCELLED = 1; /** 当前node的后继节点对应的线程须要被唤醒 */ static final int SIGNAL = -1; /** 线程在条件队列中等待 */ static final int CONDITION = -2; /** 释放共享资源时须要通知其余节点 */ static final int PROPAGATE = -3; /** waitStatus == 0 表示不是以上任何一种 */ // 记录当前线程的等待状态,以上五种 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // node存储的线程 volatile Thread thread; // 当前节点在Condition中等待队列上的下一个节点 Node nextWaiter; // 判断是否为共享式获取同步状态 final boolean isShared() { return nextWaiter == SHARED; } /** * 为何不直接判断prev,而是用p变量判断呢? * 避免并发的状况下,prev判断完为null,刚好被修改 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 用于SHARED的建立 Node() { } // 用于addWaiter(Node mode)方法,指定模式的 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 用于addConditionWaiter()方法 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
注释中已经标注各个等待状态的取值和表示,这边再总结一下:
等待状态使用waitStatus字段表示,用来控制线程的阻塞和唤醒,除了上面写的四种,实际还有一种状态是0,这在源码的注释中已经明确。
【CANCELLED = 1】取消状态,因为超时或中断,节点会被设置为取消状态,且一直保持,不会参与到竞争中。若是waitStatus>0则能够认为是该线程取消了等待。
【SIGNAL = -1】后继节点的线程处于等待状态,当前节点的线程若是释放了同步状态或被取消,将通知后继节点,使后继节点的线程得以运行。
【CONDITION = -2】节点在等待队列中,节点线程等待在Condition上,当其余线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中。
【PROPAGATE = -3】表示下一次共享式同步状态获取,将会无条件地传播下去。
0:初始状态,上面几种啥也不是。
关于AQS内部维护的同步队列,这里只是了解一些基本概念,后续对队列操做注意点进行深刻学习。
AQS使用一个int类型的成员变量state来表示同步状态,它用volatile修饰,而且提供了关于state的三个线程安全的方法:
getState()
,读取同步状态。setState(int newState)
,设置同步状态为newState。compareAndSetState(int expect, int update)
,CAS操做更新state值。state为0表示锁空闲,state>0表示锁被持有,能够大于1,表示被重入。不一样的子类实现,对state的含义表示略有差别,举几个例子吧:
对于AQS来讲,线程同步的关键是对state进行操做,根据state是否属于一个线程,操做state的方式分为独占方式和共享方式。
好比独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操做把state状态值从0变为1,而后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另一个线程获取锁时发现本身并非该锁的持有者就会被放入AQS阻塞队列后挂起。
// 独占式获取同步状态,成功后,其余线程须要等待该线程释放同步状态才能获取同步状态 public final void acquire(int arg) { // 首先调用 tryAcquire【须要子类实现】尝试获取资源,本质就是设置state的值,获取成功就直接返回 if (!tryAcquire(arg) && // 获取失败,就将当前线程封装成类型为Node.EXCLUSIVE的Node节点,并插入AQS阻塞队列尾部 // 而后经过自旋获取同步状态 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 与 acquire(int arg) 相同,可是该方法响应中断。 // 若是其余线程调用了当前线程的interrupt()方法,响应中断,抛出异常。 public final void acquireInterruptibly(int arg) throws InterruptedException { // interrupted()方法将会获取当前线程的中断标志并重置 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } //尝试获取锁,若是获取失败会将当前线程挂起指定时间,时间到了以后当前线程被激活,若是仍是没有获取到锁,就返回false。 //另外,该方法会对中断进行的响应,若是其余线程调用了当前线程的interrupt()方法,响应中断,抛出异常。 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } // 独占式释放同步状态 public final boolean release(int arg) { // 尝试使用tryRelease释放资源,本质也是设置state的值 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // LockSupport.unpark(thread) 激活AQS里面被阻塞的一个线程 // 被激活的线程则使用tryAcquire 尝试,看当前状态变量state的值是否能知足本身的须要, //知足则该线程被激活,而后继续向下运行,不然仍是会被放入AQS队列并被挂起。 unparkSuccessor(h); return true; } return false; }
须要注意:tryRelease和tryAcquire方法并无在AQS中给出实现,实现的任务交给了具体的子类,子类根据具体的场景需求实现,经过CAS算法,设置修改state的值。
好比Semaphore信号量,当一个线程经过acquire()方法获取信号量时,会首先看当前信号量个数是否知足须要,不知足则把当前线程放入阻塞队列,若是知足则经过自旋CAS获取信号量。
//共享式获取同步状态,若是当前线程未获取到同步状态,将会进入同步队列等待, // 与独占式的主要区别是在同一时刻能够有多个线程获取到同步状态; public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 尝试获取资源,若是成功则直接返回 // 若是失败,则将当前线程封装为类型为Node.SHARED的Node节点并插入AQS阻塞队列尾部 // 并使用LockSupport.park(this)挂起本身 doAcquireShared(arg); } // 共享式获取同步状态,响应中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //共享式获取同步状态,增长超时限制 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } //共享式释放同步状态 public final boolean releaseShared(int arg) { // 尝试释放资源 if (tryReleaseShared(arg)) { // 调用LockSupport.unpark(thread)激活AQS队列里被阻塞的一个线程。 // 被激活的线程使用tryReleaseShared查看当前状态变量state是否能知足本身的须要。 // 若是知足须要,则线程被激活继续向下运行,不然仍是放入AQS队列并被挂起 doReleaseShared(); return true; } return false; }
Interruptibly的方法表示对中断须要进行响应,线程在调用带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起,其余线程中断了该线程,那么该线程会抛出InterruptedException
异常而返回。
Contition是一种广义上的条件队列,它利用await()和signal()为线程提供了一种更为灵活的等待/通知模式。
Condition必需要配合Lock一块儿使用,由于对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,所以await和signal的调用必须在lock和unlock之间,有锁以后,才能使用condition嘛。以ReentrantLock为例,简单使用以下:
public class ConditionTest { public static void main(String[] args) { final ReentrantLock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); Thread thread1 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " <==成功获取到锁" + lock); try { System.out.println(name + " <==进入条件队列等待"); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " <==醒了"); lock.unlock(); System.out.println(name + " <==释放锁"); }, "等待线程"); thread1.start(); Thread thread2 = new Thread(() -> { String name = Thread.currentThread().getName(); lock.lock(); System.out.println(name + " ==>成功获取到锁" + lock); try { System.out.println("========== 这里演示await中的线程没有被signal的时候会一直等着 ==========="); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " ==>通知等待队列的线程"); condition.signal(); lock.unlock(); System.out.println(name + " ==>释放锁"); }, "通知线程"); thread2.start(); } }
等待线程 <==成功获取到锁java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待线程] 等待线程 <==进入条件队列等待 通知线程 ==>成功获取到锁java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知线程] ========== 这里演示await中的线程没有被signal的时候会一直等着 =========== 通知线程 ==>通知等待队列的线程 通知线程 ==>释放锁 等待线程 <==醒了 等待线程 <==释放锁
AQS,Lock,Condition,ConditionObject
之间的关系:
ConditionObject是AQS的内部类,实现了Condition接口,Lock中提供newCondition()方法,委托给内部AQS的实现Sync来建立ConditionObject对象,享受AQS对Condition的支持。
// ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // Sync#newCondition final ConditionObject newCondition() { // 返回Contition的实现,定义在AQS中 return new ConditionObject(); }
ConditionObject用来结合锁实现线程同步,ConditionObject能够直接访问AQS对象内部的变量,好比state状态值和AQS队列。ConditionObject是条件变量,每一个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,ConditionObject维护了首尾节点:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }
看到这里咱们须要明确这里的条件队列和上面分析的同步队列是不同的:
AQS维护的是当前在等待资源的队列,Condition维护的是在等待signal信号的队列。
每一个线程会存在上述两个队列中的一个,lock与unlock对应在AQS队列,signal与await对应条件队列,线程节点在他们之间反复横跳。
这里咱们针对上面的demo来分析一下会更好理解一些:
为了简化,接下来我将用D表示等待线程,用T表示通知线程。
lock.lock()
方法,此时无竞争,【D】被加入到AQS等待队列中。condition.await()
方法,此时【D】从AQS等待队列中移除,并加入到condition对应的条件等待队列中。condition.signal()
方法,这时condition对应的条件队列中只有一个节点【D】,因而【D】被取出,并被再次加入AQS的等待队列中。此时【D】并无被唤醒,只是单纯换了个位置。lock.unlock()
,释放锁锁以后,会唤醒AQS队列中的【D】,此时【D】真正被唤醒且执行。你看,【D】线程确实在两个队列中反复横跳吧,有关Condition的内容本文也只是抛砖引玉,以后会做详细学习总结,若是你对这一系列感兴趣,能够关注一下,后续会陆续对并发进行深刻学习。