AQS(AbstractQueuedSynchronizer)是JAVA中众多锁以及并发工具的基础,其底层采用乐观锁,大量使用了CAS操做, 而且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。java
提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(semaphore等)。 此类旨在为大多数依赖单个原子int值表示状态的同步器提供有用的基础。 子类必须定义更改此状态的受保护方法,并定义该状态对于获取或释放此对象而言意味着什么。 鉴于这些,此类中的其余方法将执行全部排队和阻塞机制。 子类能够维护其余状态字段,可是相对于同步,仅跟踪使用方法getState,setState和compareAndSetState操做的原子更新的int值。node
此类支持默认独占模式和共享模式之一或二者。 当以独占方式进行获取时,其余线程尝试进行的获取将没法成功。 由多个线程获取的共享模式可能(但不必定)成功。 该类不“理解”这些差别,当共享模式获取成功时,下一个等待线程(若是存在)还必须肯定它是否也能够获取。 在不一样模式下等待的线程共享相同的FIFO队列。 一般,实现子类仅支持这些模式之一,但例如能够在ReadWriteLock发挥做用。 仅支持独占模式或仅支持共享模式的子类无需定义支持未使用模式的方法。安全
private volatile int state; // 同步状态
state是整个工具的核心,一般整个工具都是在设置和修改状态,不少方法的操做都依赖于当前状态是什么。因为状态是全局共享的,通常会被设置成volatile类型,为了保证其修改的可见性;数据结构
AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是经过将每条请求共享资源的线程封装成一个节点来实现锁的分配。队列采用的是悲观锁的思想,表示当前所等待的资源,状态或者条件短期内可能没法知足。所以,它会将当前线程包装成某种类型的数据结构,扔到一个等待队列中,当必定条件知足后,再从等待队列中取出。多线程
CAS操做是最轻量的并发处理,一般咱们对于状态的修改都会用到CAS操做,由于状态可能被多个线程同时修改,CAS操做保证了同一个时刻,只有一个线程能修改为功,从而保证了线程安全。CAS采用的是乐观锁的思想,所以经常伴随着自旋,若是发现当前没法成功地执行CAS,则不断重试,直到成功为止。并发
要将此类用做同步器的基础,请使用getState,setState或compareAndSetState检查或修改同步状态,以从新定义如下方法(如适用):框架
独占方式,arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。函数
独占方式,arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。工具
共享方式,arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。ui
共享方式,arg为释放锁的次数,尝试释放资源,若是释放后容许唤醒后续等待结点返回True,不然返回False。
该线程是否正在独占资源。只有用到Condition才须要去实现它。
默认状况下,这些方法中的每个都会引起UnsupportedOperationException 。 这些方法的实现必须在内部是线程安全的,而且一般应简短且不阻塞。 定义这些方法是使用此类的惟一受支持的方法。 全部其余方法都被声明为final方法,由于它们不能独立变化。
AQS中维护了一个名为state的字段,意为同步状态,是由volatile修饰的,用于展现当前临界资源的获锁状况。
private volatile int state;
下面提供了几个访问这个state字段的方法:
返回同步状态的当前值。 此操做具备volatile读取的内存语义
protected final int getState() { return state; }
设置同步状态的值。 此操做具备volatile写操做的内存语义。
protected final void setState(int newState) { state = newState; }
若是当前状态值等于指望值,则以原子方式将同步状态设置为给定的更新值。 此操做具备volatile读写的内存语义
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
这几个方法都是Final修饰的,说明子类中没法重写它们。咱们能够经过修改State字段表示的同步状态来实现多线程的独占模式和共享模式
state的值即表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,这里之因此说大于0而不说等于1是由于可能存在可重入的状况。你能够把state变量当作是当前持有该锁的线程数量。
public abstract class AbstractOwnableSynchronizer protected AbstractOwnableSynchronizer() { private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
exclusiveOwnerThread 属性的值即为当前持有锁的线程独占模式获取锁流程:
共享模式获取锁流程:
static final class Node { // 表示线程以共享的模式等待锁 static final Node SHARED = new Node(); // 表示线程正在以独占的方式等待锁 static final Node EXCLUSIVE = null; // 为1,表示线程获取锁的请求已经取消了 static final int CANCELLED = 1; // 为-1,表示线程已经准备好了,就等资源释放了 static final int SIGNAL = -1; // 为-2,表示节点在等待队列中,节点线程等待唤醒 static final int CONDITION = -2; // 为-3,当前线程处在SHARED状况下,该字段才会使用 static final int PROPAGATE = -3; // 当前节点在队列中的状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后续节点 volatile Node next; // 当前节点的线程 volatile Thread thread; // 指向下一个处于CONDITION状态的节点 Node nextWaiter; ... }
// 队列头节点 private transient volatile Node head; // 队列尾节点 private transient volatile Node tail;
在AQS中的队列是一个FIFO队列,它的head节点永远是一个虚拟结点(dummy node), 它不表明任何线程,所以head所指向的Node的thread属性永远是null。可是咱们不会在构建过程当中建立它们,由于若是没有争用,这将是浪费时间。 而是构造节点,并在第一次争用时设置头和尾指针。只有从次头节点日后的全部节点才表明了全部等待锁的线程。也就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即便队列是空的,它也会排在第二个,咱们会在它的前面新建一个虚拟节点。
构造函数源代码
// 默认建立非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // 经过传值为true来进行建立公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock 里面有三个内部类:
ReentrantLock 种获取锁的方法
public void lock() { sync.lock(); }
ReentrantLock 的非公平锁实现
static final class NonfairSync extends Sync { final void lock() { // 若是设置state的值从0变为1成功 if (compareAndSetState(0, 1)) // 则将当前线程设置为独占线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
compareAndSetState(0,1)
protected final boolean compareAndSetState(int expect, int update) { // 经过unsafe.compareAndSwapInt方法来进行设置值 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
stateOffset 为AQS种维护的state属性的偏移量
setExclusiveOwnerThread(Thread.currentThread());
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
acquire(1); 调用的是AQS 中的acquire(int arg) 方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg) 该方法是protected的由子类去具体实现的
咱们须要看的是NonfairSync中实现的tryAcquire方法,里面又调用了nonfairTryAcquire方法,再进去看看
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
nonfairTryAcquire(int acquires) 方法实现
final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取当前state的值 int c = getState(); if (c == 0) { // 看看设置值是否能成功 if (compareAndSetState(0, acquires)) { // 则将当前线程设置为独占线程 setExclusiveOwnerThread(current); return true; } } // 返回由setExclusiveOwnerThread设置的最后一个线程;若是从不设置,则返回null else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置state的值 setState(nextc); return true; } return false; }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法实现,先看里addWaiter(Node.EXCLUSIVE)方法注意:Node.EXCLUSIVE 此时是空值,因此mode 就是空的,因此此时建立的Node节点中的nextWaiter是空值。
static final class Node { Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } } private Node addWaiter(Node mode) { // 建立一个新的节点 Node node = new Node(Thread.currentThread(), mode); // 将当前CLH队列的尾部节点赋予给 pred Node pred = tail; if (pred != null) { // 若是尾节点不为空 node.prev = pred; // 将当前node节点的前驱节点指向CLH队列的尾部节点 if (compareAndSetTail(pred, node)) { // CAS设置值 pred.next = node; // CLH队列的尾部节点的后继节点指向新的node节点 return node; } } enq(node); return node; }
若是CLH队列的尾部节点为空值的话,执行enq(node)方法
// 经过CAS方式设置队列的头节点 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } // 经过CAS方式设置队列的尾部节点 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // 节点入队操做 private Node enq(final Node node) { for (;;) { Node t = tail; // 若是尾部节点为空值 if (t == null) { // 则进行初始化一个节点,将其设置为头节点 if (compareAndSetHead(new Node())) tail = head; //头尾指向同一个空节点 } else { // 若是尾部节点不为空,那么将传进来的入队节点的前驱节点指向当前队列的尾部节点 node.prev = t; // 将当前传进来的入队节点设置为当前队列的尾部节点 if (compareAndSetTail(t, node)) { // 将当前队列的尾部节点的后继节点指向传进来的新节点 t.next = node; return t; } } } }
查看 acquireQueued 方法实现
// 获取当前节点的前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 检查并更新没法获取的节点的状态。 若是线程应阻塞,则返回true private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //SIGNAL这个状态就有点意思了,它不是表征当前节点的状态,而是当前节点的下一个节点 //的状态。当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继 // 节点)已经被挂起了(或者立刻就要被挂起了),所以在当前节点释放了锁或者放弃获取 // 锁时,若是它的waitStatus属性为SIGNAL,它还要完成一个额外的操做——唤醒它的后继节点。 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { // 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点 // 已经取消了等待锁(因为超时或者中断等缘由) // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点 // 而后咱们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 前驱节点的状态既不是SIGNAL,也不是CANCELLED // 用CAS设置前驱节点的ws为 Node.SIGNAL,给本身定一个闹钟 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 中止的便捷方法,而后检查是否中断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // 取消正在进行的获取尝试 private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } } // 能执行到该方法, 说明addWaiter 方法已经成功将包装了当前Thread的节点添加到了等待队列的队尾 // 该方法中将再次尝试去获取锁 // 在再次尝试获取锁失败后, 判断是否须要把当前线程挂起 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 在前驱节点就是head节点的时候,继续尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 将当前线程挂起,使CPU再也不调度它 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
为何前面获取锁失败了, 这里还要再次尝试获取锁呢?
首先, 这里再次尝试获取锁是基于必定的条件的,即:当前节点的前驱节点就是HEAD节点,由于咱们知道,head节点就是个虚拟节点,它不表明任何线程,或者表明了持有锁的线程,若是当前节点的前驱节点就是head节点,那就说明当前节点已是排在整个等待队列最前面的了。
setHead(node); 方法
// 这个方法将head指向传进来的node,而且将node的thread和prev属性置为null private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
能够看出,这个方法的本质是丢弃原来的head,将head指向已经得到了锁的node。可是接着又将该node的thread属性置为null了,这某种意义上致使了这个新的head节点又成为了一个虚拟节点,它不表明任何线程。为何要这样作呢,由于在tryAcquire调用成功后,exclusiveOwnerThread属性就已经记录了当前获取锁的线程了,此处没有必要再记录。这某种程度上就是将当前线程从等待队列里面拿出来了,是一个变相的出队操做。
shouldParkAfterFailedAcquire(Node pred, Node node)方法
acquireQueued方法中的Finally代码
private void cancelAcquire(Node node) { // 将无效节点过滤 if (node == null) return; // 设置该节点不关联任何线程,也就是虚节点 node.thread = null; Node pred = node.prev; // 经过前驱节点,跳过取消状态的node while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取过滤后的前驱节点的后继节点 Node predNext = pred.next; // 把当前node的状态设置为CANCELLED node.waitStatus = Node.CANCELLED; // 若是当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点 // 更新失败的话,则进入else,若是更新成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 若是当前节点不是head的后继节点, // 1.判断当前节点前驱节点的是否为SIGNAL // 2.若是不是,则把前驱节点设置为SINGAL看是否成功 // 若是1和2中有一个为true,再判断当前节点的线程是否为null // 若是上述条件都知足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 若是当前节点是head的后继节点,或者上述条件不知足,那就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
非公平锁获取锁成功的流程图
非公平锁获取锁失败的流程图
尝试释放此锁。若是当前线程是此锁的持有者,则保留计数将减小。 若是保持计数如今为零,则释放锁定。 若是当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。
## ReentrantLock public void unlock() { sync.release(1); }
sync.release(1) 调用的是AbstractQueuedSynchronizer中的release方法
## AbstractQueuedSynchronizer public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
分析tryRelease(arg)方法
tryRelease(arg)该方法调用的是ReentrantLock中
protected final boolean tryRelease(int releases) { // 获取当前锁持有的线程数量和须要释放的值进行相减 int c = getState() - releases; // 若是当前线程不是锁占有的线程抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是此时c = 0就意味着state = 0,当前锁没有被任意线程占有 // 将当前所的占有线程设置为空 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 设置state的值为 0 setState(c); return free; }
若是头节点不为空,而且waitStatus != 0,唤醒后续节点若是存在的话。
这里的判断条件为何是h != null && h.waitStatus != 0?
由于h == null的话,Head还没初始化。初始状况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。因此说,这里若是还没来得及入队,就会出现head == null 的状况。
private void unparkSuccessor(Node node) { // 获取头结点waitStatus int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取当前节点的下一个节点 Node s = node.next; //若是下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点 if (s == null || s.waitStatus > 0) { s = null; // 就从尾部节点开始找往前遍历,找到队列中第一个waitStatus<0的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 若是当前节点的下个节点不为空,并且状态<=0,就把当前节点唤醒 if (s != null) LockSupport.unpark(s.thread); }
为何要从后往前找第一个非Cancelled的节点呢?
看一下addWaiter方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
咱们从这里能够看到,节点入队并非原子操做,也就是说,node.prev = pred, compareAndSetTail(pred, node) 这两个地方能够看做Tail入队的原子操做,可是此时pred.next = node;还没执行,若是这个时候执行了unparkSuccessor方法,就没办法从前日后找了,因此须要从后往前找。还有一点缘由,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,所以也是必需要从后往前遍历才可以遍历彻底部的Node。
因此,若是是从前日后找,因为极端状况下入队的非原子操做和CANCELLED节点产生过程当中断开Next指针的操做,可能会致使没法遍历全部的节点。因此,唤醒对应的线程后,对应的线程就会继续往下执行。
因为篇幅较长公平锁的实如今下一篇的博客中讲述,谢谢你们的关注和支持!有问题但愿你们指出,共同进步!!!