本文经过ReentrantLock来窥探AbstractQueuedSynchronizer(AQS)的实现原理,在看此文以前。你须要了解一下park、unpark的功能,请移步至上一篇《深刻剖析park、unpark》;html
根据AbstractQueuedSynchronizer的官方文档,若是想实现一把锁的,须要继承AbstractQueuedSynchronizer,并须要重写tryAcquire、tryRelease、可选择重写isHeldExclusively提供locked state、由于支持序列化,因此须要重写readObject以便反序列化时恢复原始值、newCondition提供条件;官方提供的java代码以下(官方文档见参考链接);java
public class MyLock implements Lock, java.io.Serializable { private static class Sync extends AbstractQueuedSynchronizer { // Acquires the lock if state is zero @Override public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero @Override protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } // Reports whether in locked state @Override protected boolean isHeldExclusively() { return getState() == 1; } } /** * The sync object does all the hard work. We just forward to it. */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } private static volatile Integer value = 0; public static void main(String[] args) { MyLock myLock = new MyLock(); for (int i = 0; i < 1000; i++) { new Thread(()->{ myLock.lock(); value ++; myLock.unlock(); }).start(); } System.out.println(value); } }
上面是一个不可重入的锁,它实现了一个锁基础功能,目的是为了跟ReentrantLock的实现作对比;node
ReentrantLock意思为可重入锁,指的是一个线程可以对一个临界资源重复加锁。ReentrantLock跟经常使用的Synchronized进行比较;c#
Synchronized的分析能够参考《深刻剖析synchronized关键词》,ReentrantLock能够建立公平锁、也能够建立非公平锁,接下来看一下ReentrantLock的简单用法,非公平锁实现比较简单,今天重点是公平锁;api
public class ReentrantLockTest { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(true); reentrantLock.lock(); try { log.info("lock"); } catch (Exception e) { log.error(e); } finally { reentrantLock.unlock(); log.info("unlock"); } } }
先看一下加锁方法lock安全
非公平锁lock方法数据结构
compareAndSetState很好理解,经过CAS加锁,若是加锁失败调用acquire;架构
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
final void lock() { acquire(1); }
线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续,分析实现原理oracle
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
总结:公平锁的上锁是必须判断本身是否是须要排队;而非公平锁是直接进行CAS修改计数器看能不能加锁成功;若是加锁不成功则乖乖排队(调用acquire);因此无论公平仍是不公平;只要进到了AQS队列当中那么他就会排队;app
美团画的AQS的架构图,很详细,当有自定义同步器接入时,只需重写第一层所须要的部分方法便可,不须要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操做时,先通过第一层的API进入AQS内部方法,而后通过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
AQS核心思想是,若是被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工做线程,将共享资源设置为锁定状态;若是共享资源被占用,就须要必定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是经过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
加锁:
解锁:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } }
acquire方法首先会调tryAcquire方法,须要注意的是tryAcquire的结果作取反;根据前面分析,tryAcquire会调用子类的实现,ReentrantLock有两个内部类,FairSync,NonfairSync,都继承自Sync,Sync继承AbstractQueuedSynchronizer;
实现方式差异在是否有hasQueuedPredecessors() 的判断条件
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取lock对象的上锁状态,若是锁是自由状态则=0,若是被上锁则为1,大于1表示重入 int c = getState(); if (c == 0) { // hasQueuedPredecessors,判断本身是否须要排队 // 下面我会单独介绍,若是不须要排队则进行cas尝试加锁 // 若是加锁成功则把当前线程设置为拥有锁的线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 若是C不等于0,可是当前线程等于拥有锁的线程则表示这是一次重入,那么直接把状态+1表示重入次数+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
非公平锁
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。
static final class Node { static final Node SHARED = new Node(); // 表示线程以共享的模式等待锁 static final Node EXCLUSIVE = null; // 表示线程正在以独占的方式等待锁 static final int CANCELLED = 1; // 表示线程获取锁的请求已经取消了 static final int SIGNAL = -1; // 表示线程已经准备好了,就等资源释放了 static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒 static final int PROPAGATE = -3; // 当前线程处在SHARED状况下,该字段才会使用 volatile int waitStatus; // 当前节点在队列中的状态 volatile Node prev; // 前驱指针 volatile Node next; // 后继指针 volatile Thread thread; // 表示处于该节点的线程 Node nextWaiter; // 指向下一个处于CONDITION状态的节点 final boolean isShared() { return nextWaiter == SHARED; } // 返回前驱节点,没有的话抛出npe final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
再看hasQueuedPredecessors,整个方法若是最后返回false,则去加锁,若是返回true则不加锁,由于这个方法被取反操做;hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。若是返回False,说明当前线程能够争取共享资源;若是返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
若是这上面没有看懂,没有关系,先来分析一下构建整个队列的过程;
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail为对尾,赋值给pred Node pred = tail; // 判断pred是否为空,其实就是判断对尾是否有节点,其实只要队列被初始化了对尾确定不为空 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
用一张图来分析一下,整个队列构建过程;
(1)经过Node(Thread thread, Node mode) 方法构建一个node节点(node2),此时的nextWaiter为空,线程不为空,是当前线程;
(2)若是队尾为空,则说明队列未创建,调用enq构建第一个虚拟节点(node1),经过compareAndSetHead方法构建一个头节点,须要注意的是该头节点thread是null,后续不少都是用线程是否为null来判读是否为第一个虚拟节点;
(3)将node1 cas设置为head
(4)将头节点赋值为tail = head
(5)进入下一次for循环时,会走到else分支,会将传入的node的指向头部节点的next,此时node2的prev指向node1(tail)
(6)将node2 cas设置为tail;
(7)将node2指向node1的next;
通过上面的步骤,就构建了一个长度为2的队列;
添加第二个队列时,走的是这段代码,流程就简单多了,代码以下
if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());由于整个构建过程并非原子操做,因此这个条件判断,如今再是否是就看明白了?
addWaiter方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会做为参数,进入到acquireQueued方法中。acquireQueued方法能够对排队中的线程进行“获锁”操做。总的来讲,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者再也不须要获取(中断)。
下面经过代码从“什么时候出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:
final boolean acquireQueued(final Node node, int arg) { // 标记是否成功拿到资源 boolean failed = true; try { // 标记等待过程当中是否中断过 boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点,有两种状况;一、上一个节点为头部;2上一个节点不为头部 final Node p = node.predecessor(); // 若是p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(头结点是虚节点) // 由于第一次tryAcquire判断是否须要排队,若是须要排队,那么我就入队,此处再重试一次 if (p == head && tryAcquire(arg)) { // 获取锁成功,头指针移动到当前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明p为头节点且当前没有获取到锁(多是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 成功拿到资源,准备释放 if (failed) cancelAcquire(node); } }
设置当前节点为头节点,而且将node.thread为空(刚才提到判断是否为头部虚拟节点的条件就是node.thread == null。waitStatus状态并为修改,等下咱们再分析;
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
接下来看shouldParkAfterFailedAcquire代码,须要注意的是,每个新建立Node的节点是被下一个排队的node设置为等待状态为SIGNAL, 这里比较难以理解为何须要去改变上一个节点的park状态?
每一个node都有一个状态,默认为0,表示无状态,-1表示在park;当时不能本身把本身改为-1状态?由于你得肯定你本身park了才是能改成-1;因此只能先park;在改状态;可是问题你本身都park了;彻底释放CPU资源了,故而没有办法执行任何代码了,因此只能别人来改;故而能够看到每次都是本身的后一个节点把本身改为-1状态;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) return true; // static final int CANCELLED = 1; // 表示线程获取锁的请求已经取消了 // static final int SIGNAL = -1; // 表示线程已经准备好了,就等资源释放了 // static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒 // static final int PROPAGATE = -3; // 当前线程处在SHARED状况下,该字段才会使用 if (ws > 0) { do { // 把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
调用LockSupport.park挂起当前线程,本身已经park,没法再修改状态了!
private final boolean parkAndCheckInterrupt() { // 调⽤用park()使线程进⼊入waiting状态 LockSupport.park(this); // 若是被唤醒,查看⾃自⼰己是不不是被中断的,这⾥里里先清除⼀下标记位 return Thread.interrupted(); }
shouldParkAfterFailedAcquire的整个流程仍是比较清晰的,若是不清楚,能够参考美团画的流程图;
经过上面的分析,当failed为true时,也就意味着park结束,线程被唤醒了,for循环已经跳出,开始执行cancelAcquire,经过cancelAcquire方法,将Node的状态标记为CANCELLED;代码以下:
private void cancelAcquire(Node node) { // 将无效节点过滤 if (node == null) return; // 设置该节点不关联任何线程,也就是虚节点(上面已经提到,node.thread = null是判读是不是头节点的条件) node.thread = null; Node pred = node.prev; // 经过前驱节点,处理waitStatus > 0的node while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 把当前node的状态设置为CANCELLED,当下一个node排队结束时,本身就会被上一行代码处理掉; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // 若是当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点,更新失败的话,则进入else,若是更新成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { // 把本身设置为null compareAndSetNext(pred, predNext, null); } else { int ws; // 若是当前节点不是head的后继节点 // 1:判断当前节点前驱节点的是否为SIGNAL // 2:若是不是,则把前驱节点设置为SINGAL看是否成功 // 若是1和2中有一个为true,再判断当前节点的线程是否为null // 若是上述条件都知足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 若是当前节点是head的后继节点,或者上述条件不知足,那就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
当前的流程:
获取当前节点的前驱节点,若是前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
根据当前节点的位置,考虑如下三种状况:
(1) 当前节点是尾节点。
(2) 当前节点是Head的后继节点。
(3) 当前节点不是Head的后继节点,也不是尾节点。
(1)当前节点时尾节点
(2)当前节点是Head的后继节点。
这张图描述的是这段代码:unparkSuccessor
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
(3)当前节点不是Head的后继节点,也不是尾节点。
这张图描述的是这段代码跟(2)同样;
经过上面的图,你会发现全部的变化都是对Next指针进行了操做,而没有对Prev指针进行操做,缘由是执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),也就是下图中代码1和代码2直接的间隙就会出现这种状况,此时修改Prev指针,有可能会致使Prev指向另外一个已经移除队列的Node,所以这块变化Prev指针不安全。
解锁时并不区分公平和不公平,由于ReentrantLock实现了锁的可重入,能够进一步的看一下时如何处理的,上代码:
public void unlock() { sync.release(1); }
public final boolean release(int arg) { // 自定义的tryRelease若是返回true,说明该锁没有被任何线程持有 if (tryRelease(arg)) { // 获取头结点 Node h = head; if (h != null && h.waitStatus != 0) // 头结点不为空而且头结点的waitStatus不是初始化节点状况,解除线程挂起状态 unparkSuccessor(h); return true; } return false; }
这里的判断条件为何是h != null && h.waitStatus != 0
protected final boolean tryRelease(int releases) { // 减小可重入次数,setState(c); int c = getState() - releases; // 当前线程不是持有锁的线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是持有线程所有释放,将当前独占锁全部线程设置为null,并更新state if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
这个方法在cancelAcquire其实也用到了,简单分析一下
// 若是当前节点是head的后继节点,或者上述条件不知足,就唤醒当前节点的后继节点unparkSuccessor(node);
private void unparkSuccessor(Node node) { // 获取结点waitStatus,CAS设置状态state=0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取当前节点的下一个节点 Node s = node.next; // 若是下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点 if (s == null || s.waitStatus > 0) { s = null; // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 若是当前节点的下个节点不为空,并且状态<=0,就把当前节点unpark if (s != null) LockSupport.unpark(s.thread); }
为何要从后往前找第一个非Cancelled的节点呢?
缘由1:addWaiter方法并不是原子,构建链表结构时以下图中 一、2间隙执行unparkSuccessor,此时链表是不完整的,没办法从前日后找了;
缘由2:还有一点缘由,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,所以也是必需要从后往前遍历才可以遍历彻底部的Node;
唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不一样,但都会执行下次循环。若是这个时候获取锁成功,就会把当前interrupted返回。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
若是acquireQueued为True,就会执行selfInterrupt方法。
该方法实际上是为了中断线程。但为何获取了锁之后还要中断线程呢?这部分属于Java提供的协做式中断知识内容,感兴趣同窗能够查阅一下。这里简单介绍一下:
这里的处理方式主要是运用线程池中基本运做单元Worder中的runWorker,经过Thread.interrupted()进行额外的判断处理,能够看下ThreadPoolExecutor源码的判断条件;
AQS在JUC中有⽐比较⼴普遍的使⽤用,如下是主要使⽤用的地⽅方:
至此,经过ReentrantLock分析AQS的实现原理一家完毕,须要说明的是,此文深度参考了美团分析的ReentrantLock,是参考连接的第三个,有兴趣能够对比差别,感谢!