AQS是什么?

AQS介绍
AQS,即AbstractQueuedSynchronizer, 队列同步器,它是Java并发用来构建锁和其余同步组件的基础框架。来看下同步组件对AQS的使用:

 

AQS是一个抽象类,主是是以继承的方式使用。AQS自己是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。从图中能够看出,在java的同步组件中,AQS的子类(Sync等)通常是同步组件的静态内部类,即经过组合的方式使用。java

抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如经常使用的ReentrantLock/Semaphore/CountDownLatchnode

它维护了一个volatile int state(表明共享资源)和一个FIFO(双向队列)线程等待队列(多线程争用资源被阻塞时会进入此队列)segmentfault

AQS原理简介

 

AQS的实现依赖内部的同步队列(FIFO双向队列),若是当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构形成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。安全

上面说的有点抽象,来具体看下,首先来看AQS最主要的三个成员变量:多线程

private transient volatile Node head; private transient volatile Node tail; private volatile int state;

上面提到的同步状态就是这个int型的变量state. head和tail分别是同步队列的头结点和尾结点。假设state=0表示同步状态可用(若是用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用)。并发

下面举例说下获取和释放同步状态的过程:app

获取同步状态

假设线程A要获取同步状态(这里想象成锁,方便理解),初始状态下state=0,因此线程A能够顺利获取锁,A获取锁后将state置为1。在A没有释放锁期间,线程B也来获取锁,此时由于state=1,表示锁被占用,因此将B的线程信息和等待状态等信息构成出一个Node节点对象,放入同步队列,head和tail分别指向队列的头部和尾部(此时队列中有一个空的Node节点做为头点,head指向这个空节点,空Node的后继节点是B对应的Node节点,tail指向它),同时阻塞线程B(这里的阻塞使用的是LockSupport.park()方法)。后续若是再有线程要获取锁,都会加入队列尾部并阻塞。框架

释放同步状态

当线程A释放锁时,即将state置为0,此时A会唤醒头节点的后继节点(所谓唤醒,实际上是调用LockSupport.unpark(B)方法),即B线程从LockSupport.park()方法返回,此时B发现state已经为0,因此B线程能够顺利获取锁,B获取锁后B的Node节点随之出队。源码分析

上面只是简单介绍了AQS获取和释放的大体过程,下面结合AQS和ReentrantLock源码来具体看下JDK是如何实现的,特别要注意JDK是如何保证同步和并发操做的。ui

AQS源码分析

接下来以ReentrantLock的源码入手来深刻理解下AQS的实现。
上面说过AQS通常是以继承的方式被使用,同步组件内部组合一个继承了AQS的子类。
在ReentrantLock类中,有一个Sync成员变量,便是继承了AQS的子类,源码以下:

public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { ... } }

这里的Sync也是一个抽象类,其实现类为FairSync和NonfairSync,分别对应公平锁和非公平锁。ReentrantLock的提供一个入参为boolean值的构造方法,来肯定使用公平锁仍是非公平锁:

public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

获取锁

这里以NonfairSync类为例,看下它的Lock()的实现:

final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

lock方法先经过CAS尝试将同步状态(AQS的state属性)从0修改成1。若直接修改为功了,则将占用锁的线程设置为当前线程。看下compareAndSetState()和setExclusiveOwnerThread()实现:

protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

能够看到compareAndSetState底层实际上是调用的unsafe的CAS系列方法。

protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }

exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程。

若是CAS操做未能成功,说明state已经不为0,此时继续acquire(1)操做,这个acquire()由AQS实现提供:

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

代码很短,不太好了理解,转换下写法(代码1):

public final void acquire(int arg) { boolean hasAcquired = tryAcquire(arg); if (!hasAcquired) { Node currentThreadNode = addWaiter(Node.EXCLUSIVE); boolean interrupted = acquireQueued(currentThreadNode, arg); if (interrupted) { selfInterrupt(); } } }

简单解释下:
tryAcquire方法尝试获取锁,若是成功就返回,若是不成功,则把当前线程和等待状态信息构适成一个Node节点,并将结点放入同步队列的尾部。而后为同步队列中的当前节点循环等待获取锁,直到成功。

首先看tryAcquire(arg)在NonfairSync中的实现(这里arg=1):

protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

首先获取AQS的同步状态(state),在锁中就是锁的状态,若是状态为0,则尝试设置状态为arg(这里为1), 若设置成功则表示当前线程获取锁,返回true。这个操做外部方法lock()就作过一次,这里再作只是为了再尝试一次,尽可能以最简单的方式获取锁。

若是状态不为0,再判断当前线程是不是锁的owner(即当前线程在以前已经获取锁,这里又来获取),若是是owner, 则尝试将状态值增长acquires,若是这个状态值越界,抛出异常;若是没有越界,则设置后返回true。这里能够看非公平锁的涵义,即获取锁并不会严格根据争用锁的前后顺序决定。这里的实现逻辑相似synchroized关键字的偏向锁的作法,便可重入而不用进一步进行锁的竞争,也解释了ReentrantLock中Reentrant的意义。

若是状态不为0,且当前线程不是owner,则返回false。
回到上面的代码1,tryAcquire返回false,接着执行addWaiter(Node.EXCLUSIVE),这个方法建立结点并入队,来看下源码:

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } 

首先建立一个Node对象,Node中包含了当前线程和Node模式(这时是排他模式)。tail是AQS的中表示同步队列队尾的属性,刚开始为null,因此进行enq(node)方法,从字面能够看出这是一个入队操做,来看下具体入队细节:

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 

方法体是一个死循环,自己没有锁,能够多个线程并发访问,假如某个线程进入方法,此时head, tail都为null, 进入if(t==null)区域,从方法名能够看出这里是用CAS的方式建立一个空的Node做为头结点,由于此时队列中只一个头结点,因此tail也指向它,第一次循环执行结束。注意这里使用CAS是防止多个线程并发执行到这儿时,只有一个线程可以执行成功,防止建立多个同步队列。

进行第二次循环时(或者是其余线程enq时),tail不为null,进入else区域。将当前线程的Node结点(简称CNode)的prev指向tail,而后使用CAS将tail指向CNode。看下这里的实现:

private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } 

expect为t, t此时指向tail,因此能够CAS成功,将tail从新指向CNode。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向CNode,返回头结点。通过上面的操做,头结点和CNode的关系如图:

 

其余线程再插入节点以此类推,都是在追加到链表尾部,而且经过CAS操做保证线程安全。

经过上面分析可知,AQS的写入是一种双向链表的插入操做,至此addWaiter分析完毕。

addWaiter返回了插入的节点,做为acquireQueued方法的入参,看下源码:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

能够看到,acquireQueued方法也是一个死循环,直到进入 if (p == head && tryAcquire(arg))条件方法块。仍是接着刚才的操做来分析。acquireQueued接收的参数是addWaiter方法的返回值,也就是刚才的CNode节点,arg=1。node.predecessor()返回CNode的前置节点,在这里也就是head节点,因此p==head成立,进而进行tryAcquire操做,即争用锁, 若是获取成功,则进入if方法体,看下接下来的操做:

1) 将CNode设置为头节点。
2) 将CNode的前置节点设置的next设置为null。

此时队列如图:

 

上面操做即完成了FIFO的出队操做。
从上面的分析能够看出,只有队列的第二个节点能够有机会争用锁,若是成功获取锁,则此节点晋升为头节点。对于第三个及之后的节点,if (p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操做(争用锁失败的第二个节点也如此), 来看下源码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,若是是,是说明此节点已经将状态设置若是锁释放,则应当通知它,因此它能够安全的阻塞了,返回true。

若是前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操做实际是把队列中CANCELLED的节点剔除掉。

前节点状态小于0的状况是对应ReentrantLock的Condition条件等待的,这里不进行展开。

若是shouldParkAfterFailedAcquire返回了true,则会执行:“parkAndCheckInterrupt()”方法,它是经过LockSupport.park(this)将当前线程挂起到WATING状态,它须要等待一个中断、unpark方法来唤醒它,经过这样一种FIFO的机制的等待,来实现了Lock的操做。

释放锁

经过ReentrantLock的unlock方法来看下AQS的锁释放过程。来看下源码:

public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 

unlock调用AQS的release()来完成, AQS的若是tryRelease方法由具体子类实现。tryRelease返回true,则会将head传入到unparkSuccessor(Node)方法中并返回true,不然返回false。首先来看看Sync中tryRelease(int)方法实现,以下所示:

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free }

这个动做能够认为就是一个设置锁状态的操做,并且是将状态减掉传入的参数值(参数是1),若是结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增长1(固然能够本身修改这个值),在解锁的时候减掉1,同一个锁,在能够重入后,可能会被叠加为二、三、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,并且也只有这种状况下才会返回true。

在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),看下源码:

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

内部首先会发生的动做是获取head节点的next节点,若是获取到的节点不为空,则直接经过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁。

相关文章
相关标签/搜索