Java的内置锁一直都是备受争议的,在JDK 1.6以前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略,可是与Lock相比synchronized仍是存在一些缺陷的:虽然synchronized提供了便捷性的隐式获取锁释放锁机制(基于JVM机制),可是它却缺乏了获取锁与释放锁的可操做性,可中断、超时获取锁,且它为独占式在高并发场景下性能大打折扣。node
AQS,AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其余同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的做者(Doug Lea)指望它可以成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。安全
AQS解决了子类实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器能够带来不少好处。它不只可以极大地减小实现工做,并且也没必要处理在多个位置上发生的竞争问题。多线程
AQS的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态。并发
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操做,固然AQS能够确保对state的操做是安全的。框架
AQS经过内置的FIFO同步队列来完成资源获取线程的排队工做,若是当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构形成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。高并发
AQS能够实现独占锁和共享锁,RenntrantLock实现的是独占锁,ReentrantReadWriteLock实现的是独占锁和共享锁,CountDownLatch实现的是共享锁。oop
下面咱们经过源码来分析下AQS的实现原理性能
经过AQS的类结构咱们能够看到它内部有一个队列和一个state的int变量。
队列:经过一个双向链表实现的队列来存储等待获取锁的线程。
state:锁的状态。
head、tail和state 都是volatile类型的变量,volatile能够保证多线程的内存可见性。优化
同步队列的基本结构以下:ui
同步队列
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //表示当前的线程被取消; static final int CANCELLED = 1; //表示当前节点的后继节点包含的线程须要运行,也就是unpark; static final int SIGNAL = -1; //表示当前节点在等待condition,也就是在condition队列中; static final int CONDITION = -2; //表示当前场景下后续的acquireShared可以得以执行; static final int PROPAGATE = -3; //表示节点的状态。默认为0,表示当前节点在sync队列中,等待着获取锁。 //其它几个状态为:CANCELLED、SIGNAL、CONDITION、PROPAGATE volatile int waitStatus; //前驱节点 volatile Node prev; //后继节点 volatile Node next; //获取锁的线程 volatile Thread thread; //存储condition队列中的后继节点。 Node nextWaiter; ...... }
从Node结构prev和next节点能够看出它是一个双向链表,waitStatus存储了当前线程的状态信息
waitStatus
下面咱们经过如下五个方面来介绍AQS是怎么实现的锁的获取和释放的
public final void acquire(int arg) { //尝试得到锁,获取不到则加入到队列中等待获取 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //将该节点添加到队列尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //若是前驱节点为null,则进入enq方法经过自旋方式入队列 enq(node); return node; }
将构造的同步节点加入到同步队列中
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //若是队列为空,则经过CAS把当前Node设置成头节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //若是队列不为空,则向队列尾部添加Node if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
该方法使用CAS自旋的方式来保证向队列中添加Node(同步节点简写Node)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //找到当前节点的前驱节点 final Node p = node.predecessor(); //检测p是否为头节点,若是是,再次调用tryAcquire方法 if (p == head && tryAcquire(arg)) { //若是p节点是头节点且tryAcquire方法返回true。那么将当前节点设置为头节点。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是p节点不是头节点,或者tryAcquire返回false,说明请求失败。 //那么首先须要判断请求失败后node节点是否应该被阻塞,若是应该 //被阻塞,那么阻塞node节点,并检测中断状态。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //若是有中断,设置中断状态。 interrupted = true; } } finally { if (failed) //最后检测一下若是请求失败(异常退出),取消请求。 cancelAcquire(node); } }
在acquireQueued方法中,当前线程经过自旋的方式来尝试获取同步状态,
经过上面的代码咱们能够发现AQS内部的同步队列是FIFO的方式存取的。节点自旋获取同步状态的行为以下图所示
节点自旋获取同步状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //得到前驱节点状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是前驱节点状态为SIGNAL,当前线程则能够阻塞。 return true; if (ws > 0) { do { //判断若是前驱节点状态为CANCELLED,那就一直往前找,直到找到最近一个正常等待的状态 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //并将当前Node排在它的后边。 pred.next = node; } else { //若是前驱节点正常,则修改前驱节点状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
状态 | 值 | 说明 |
---|---|---|
CANCELLED | 1 | 等待超时或者中断,须要从同步队列中取消 |
SIGNAL | -1 | 后继节点出于等待状态,当前节点释放锁后将会唤醒后继节点 |
CONDITION | -2 | 节点在等待队列中,节点线程等待在Condition上,其它线程对Condition调用signal()方法后,该节点将会从等待同步队列中移到同步队列中,而后等待获取锁。 |
PROPAGATE | -3 | 表示下一次共享式同步状态获取将会无条件地传播下去 |
INITIAL | 0 | 初始状态 |
private final boolean parkAndCheckInterrupt() { //阻塞当前线程 LockSupport.park(this); //判断是否中断来唤醒的 return Thread.interrupted(); }
public final boolean release(int arg) { //尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
tryRelease(int arg) 方法应该由实现AQS的子类来实现具体的逻辑。
public final void acquireShared(int arg) { //尝试获取的锁,若是获取失败执行doAcquireShared方法。 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()尝试获取锁,若是获取失败则经过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
这里tryAcquireShared()须要自定义同步器去实现。
AQS中规定:负值表明获取失败,非负数标识获取成功。
private void doAcquireShared(int arg) { //构建共享Node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); //若是是头节点进行尝试得到锁 if (p == head) { //若是返回值大于等于0,则说明得到锁 int r = tryAcquireShared(arg); if (r >= 0) { //当前节点设置为队列头,并 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireQueued方法中,当前线程也经过自旋的方式来尝试获取同步状态,同独享式得到锁同样
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //若是propagate >0,说明共享锁还有能够进行得到锁,继续唤醒下一个节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
设置当前节点为头结点,并调用了doReleaseShared()方法,acquireShared方法最终调用了release方法,得看下为何。缘由其实也很简单,shared模式下是容许多个线程持有一把锁的,其中tryAcquire的返回值标志了是否容许其余线程继续进入。若是容许的话,须要唤醒队列中等待的线程。其中doReleaseShared方法的逻辑很简单,就是唤醒后继线程。
所以acquireShared的主要逻辑就是尝试加锁,若是容许其余线程继续加锁,那么唤醒后继线程,若是失败,那么入队阻塞等待。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(int arg) 方法应该由实现AQS的子类来实现具体的逻辑。
private void doReleaseShared() { for (;;) { // 获取队列的头节点 Node h = head; // 若是头节点不为null,而且头节点不等于tail节点。 if (h != null && h != tail) { // 获取头节点对应的线程的状态 int ws = h.waitStatus; // 若是头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”须要被unpark唤醒。 if (ws == Node.SIGNAL) { // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒“头节点的下一个节点所对应的线程”。 unparkSuccessor(h); } // 若是头节点对应的线程是空状态,则设置“尾节点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是头节点发生变化,则继续循环。不然,退出循环。 if (h == head) // loop if head changed break; } }
该方法主要是唤醒后继节点。对于可以支持多个线程同时访问的并发组件(好比Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,通常是经过循环和CAS来保证的,由于释放同步状态的操做会同时来自多个线程。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; //计算出超时时间点 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //计算剩余超时时间,超时时间点deadline减去当前时间点System.nanoTime()获得还应该睡眠的时间 nanosTimeout = deadline - System.nanoTime(); //若是超时,返回false,获取锁失败 if (nanosTimeout <= 0L) return false; //判断是否须要阻塞当前线程 //若是须要,在判断当前剩余纳秒数是否大于1000 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //阻塞 nanosTimeout纳秒数 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
该方法在自旋过程当中,当节点的前驱节点为头节点时尝试获取同步状态,若是获取成功则从该方法返回,这个过程和独占式同步获取的过程相似,可是在同步状态获取失败的处理上有所不一样。若是当前线程获取同步状态失败,则首先从新计算超时间隔nanosTimeout,则判断是否超时(nanosTimeout小于等于0表示已经超时),若是没有超时,则使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
若是nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行 超时等待,而是进入快速的自旋过程。缘由在于,很是短的超时等待没法作到十分精确,若是 这时再进行超时等待,相反会让nanosTimeout的超时从总体上表现得反而不精确。所以,在超 时很是短的场景下,同步器会进入无条件的快速自旋。