学完用 AQS 自定义一个锁之后,咱们能够来看一下刚刚使用过的方法的实现。java
分析源码的时候会省略一些不重要的代码。node
AQS 的实现是基于一个 FIFO 队列的,每个等待的线程被封装成 Node
存放在等待队列中,头结点是空的,不存储信息,等待队列中的节点都是阻塞的,而且在每次被唤醒后都会检测本身的前一个节点是否为头结点,若是是头节点证实在这个线程以前没有在等待的线程,就尝试着去获取共享资源。架构
AQS 继承了 AbstractOwnableSynchronizer
,咱们先分析一下这个父类。源码分析
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractOwnableSynchronizer() { } /** * 独占模式下的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置线程,只是对线程的 set 方法 */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 设置线程,对线程的 get 方法 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
父类很是简单,持有一个独占模式下的线程,而后就只剩下对这个线程的 get 和 set 方法。学习
AQS 是用链表队列来实现线程等待的,那么队列确定要有节点,咱们先从节点讲起。ui
Node 类,每个等待的线程都会被封装成 Node 类this
Node 的域spa
public class Node { int waitStatus; Node prev; Node next; Thread thread; Node nextWaiter; }
waitStatus:等待状态线程
prev:前驱节点code
next:后继节点
thread:持有的线程
nextWaiter:condiction 队列中的后继节点
Node 的 status:
Node 的状态有四种:
取消状态的值是惟一的正数,也是惟一当排队排到它了也不要资源而是直接轮到下个线程来获取资源的
这个方法执行了:
tryAcquirepublic final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
看到上面的 tryAcquire 返回 false 后就会调用 addWaiter
新建节点加入等待队列中。参数 EXCLUSIVE 是独占模式。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 拿到尾节点,若是尾节点是空则说明是第一个节点,就直接入队就行了 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是尾节点不是空的,则须要特殊方法入队 enq(node); return node; }
在 addWaiter
方法建立完节点后,调用 enq 方法,在循环中用 CAS 操做将新的节点入队。
由于可能会有多个线程同时设置尾节点,因此须要放在循环中不断的设置尾节点。
private Node enq(final Node node) { for (;;) { Node t = tail; // 查看尾节点 if (t == null) { // Must initialize // 尾节点为空则设置为刚刚建立的节点 if (compareAndSetHead(new Node())) tail = head; } else { // 尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在这里,节点入队就结束了。
那么咱们回来前面分析的方法,
public final void acquire(long arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
刚刚分析完了 addWaiter
方法,这个方法返回了刚刚建立而且加入的队列。如今开始分析 acquireQueued
方法。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 在循环中去获取锁 for (;;) { // 拿前一个节点 final Node p = node.predecessor(); // 若是前一个节点是头结点则尝试着去获取锁 if (p == head && tryAcquire(arg)) { // 拿到锁后把这个节点设为头结点,这里 setHead 会把除了 next 之外的数据清除 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 这个方法查看在获取锁失败之后是否中断,若是否的话就调用 // parkAndCheckInterrupt 阻塞方法线程,等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
由于很像因此顺便来看一下 acquireInterruptibly
所调用的方法:在此我向你们推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 只有这一句有差异,获取失败了而且检测到中断位被设为 true 直接抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
再来看一下有限时间的,当获取超时之后会将节点 Node 的状态设为 cancel,设置为取消的用处在后面的 release 方法中会有体现。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
总结一下过程
这个方法首先去调用了咱们实现的 tryRelease,当结果返回成功的时候,拿到头结点,调用 unparkSuccessor 方法来唤醒头结点的下一个节点。在此我向你们推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像
public final boolean release(long arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }private void unparkSuccessor(Node node) { int ws = node.waitSatus; // 由于已经获取过锁,因此将状态设设为 0。失败也没所谓,说明有其余的线程把它设为0了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 通常来讲头结点的下一个节点是在等待着被唤醒的,可是若是是取消的或者意外的是空的, * 则向后遍历直到找到没有被取消的节点 * */ Node s = node.next; // 为空或者大于 0,只有 cancel 状态是大于 0 的 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }