AQS,全称:AbstractQueuedSynchronizer,是JDK提供的一个同步框架,内部维护着FIFO双向队列,即CLH同步队列。java
AQS依赖它来完成同步状态的管理(voliate修饰的state,用于标志是否持有锁)。若是获取同步状态state失败时,会将当前线程及等待信息等构建成一个Node,将Node放到FIFO队列里,同时阻塞当前线程,当线程将同步状态state释放时,会把FIFO队列中的首节的唤醒,使其获取同步状态state。node
不少JUC包下的锁都是基于AQS实现的设计模式
以下脑图:微信
Node
static final class Node { /** 共享节点 */ static final Node SHARED = new Node();
/** 独占节点 */ static final Node EXCLUSIVE = null;
/** 由于超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,会一直是取消 状态不会改变 */ static final int CANCELLED = 1;
/** 后继节点处于等待状态,若是当前节点释放了同步状态或者被取消,会通知后继节点,使其得以 运行 */ static final int SIGNAL = -1;
/** 节点在等待条件队列中,节点线程等待在condition上,当其余线程对condition调用了signal 后,该节点将会从等待队列中进入同步队列中,获取同步状态 */ static final int CONDITION = -2;
/** * 下一次共享式同步状态获取会无条件的传播下去 */ static final int PROPAGATE = -3;
/** 等待状态 */ volatile int waitStatus;
/** 前驱节点 */ volatile Node prev;
/** 后继节点 */ volatile Node next;
/** 获取同步状态的线程 */ volatile Thread thread;
/** * 下一个条件队列等待节点 */ Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; }
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
FIFO结构图
独占式同步状态过程
/** * 独占式获取同步状态 */public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); }}
tryAcquire
尝试去获取锁,获取成功返回true,不然返回false。该方法由继承AQS的子类本身实现。采用了模板方法设计模式。框架
如:ReentrantLock的Sync内部类,Sync的子类:NonfairSync和编辑器
FairSyncoop
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
addWaiter
private Node addWaiter(Node mode) { // 新建Node节点 Node node = new Node(Thread.currentThread(), mode); // 尝试快速添加尾结点 Node pred = tail; if (pred != null) { node.prev = pred; // CAS方式设置尾结点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是上面添加失败,这里循环尝试添加,直到添加成功为止 enq(node); return node; }
enq
private Node enq(final Node node) { // 一直for循环,直到插入Node成功为止 for (;;) { Node t = tail; if (t == null) { // CAS设置首节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // CAS设置尾结点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
final boolean acquireQueued(final Node node, int arg) { // 操做是否成功标志 boolean failed = true; try { // 线程中断标志 boolean interrupted = false; // 不断的自旋循环 for (;;) { // 当前节点的prev节点 final Node p = node.predecessor(); // 判断prev是不是头结点 && 是否获取到同步状态 if (p == head && tryAcquire(arg)) { // 以上条件成立,将当前节点设置成头结点 setHead(node); // 将prev节点移除队列中 p.next = null; // help GC failed = false; return interrupted; } // 自旋过程当中,判断当前线程是否须要阻塞 && 阻塞当前线程而且检验线程中断状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 取消获取同步状态 cancelAcquire(node); } }
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 拿到当前节点的prev节点的等待状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 若是prev的status是signal,表示当prev释放了同步状态或者取消了,会通知当前节 * 点,因此当前节点能够安心的阻塞了(至关睡觉会有人叫醒他) */ return true; if (ws > 0) { /* * status > 0,表示为取消状态,须要将取消状态的节点从队列中移除 * 直到找到一个状态不是取消的节点为止 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 除了以上状况,经过CAS将prev的status设置成signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { // 阻塞当前线程 LockSupport.park(this); // 返回当前线程的中断状态 return Thread.interrupted(); }
selfInterrupt
static void selfInterrupt() { // 未获取到同步状态 && 线程中断状态是true,中断当前线程 Thread.currentThread().interrupt(); }
释放独占式同步状态
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 若是头结点不为空 && 而且 头结点状态不为0 // 唤醒头结点的后继节点 unparkSuccessor(h); return true; } return false; }
tryRelease
尝试去释放同步状态,释放成功返回true,不然返回false。该方法由继承AQS的子类本身实现。采用了模板方法设计模式。flex
如:ReentrantLock的Sync内部类的tryRelease方法。ui
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 获取当前节点状态 */ int ws = node.waitStatus;
// 若是当前节点的状态小于0,那么用CAS设置成0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * 获取当前节点的后继节点 */ Node s = node.next; // 若是后继节点为空 || 或者后继节点的状态 > 0 (为取消状态) if (s == null || s.waitStatus > 0) { s = null; // 从尾结点查找状态不为取消的可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒后继节点 LockSupport.unpark(s.thread); }
总结
在AQS中维护着一个FIFO的同步队列,当线程获取同步状态失败后,则会加入到这个CLH同步队列的对尾并一直保持着自旋。在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点,若是为首节点则不断尝试获取同步状态,获取成功则退出CLH同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。
this
共享式同步状态过程
共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻能够有多个线程获取同步状态。例如读操做能够有多个线程同时进行,而写操做同一时刻只能有一个线程进行写操做,其余操做都会被阻塞。
acquireShared获取同步状态
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 获取失败,自旋获取同步状态 doAcquireShared(arg); }
tryAcquireShared
尝试去获取共享锁,获取成功返回true,不然返回false。该方法由继承AQS的子类本身实现。采用了模板方法设计模式。
如:ReentrantReadWriteLock的Sync内部类
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
doAcquireShared
private void doAcquireShared(int arg) { // 添加共享模式节点到队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋获取同步状态 for (;;) { // 当前节点的前驱 final Node p = node.predecessor(); // 若是前驱节点是head节点 if (p == head) { // 尝试去获取共享同步状态 int r = tryAcquireShared(arg); if (r >= 0) { // 将当前节点设置为头结点,而且释放也是共享模式的后继节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 真正的释放共享同步状态,并唤醒下一个节点 doReleaseShared(); } }
doReleaseShared
private void doReleaseShared() { // 自旋释放共享同步状态 for (;;) { Node h = head; // 若是头结点不为空 && 头结点不等于尾结点,说明存在有效的node节点 if (h != null && h != tail) { int ws = h.waitStatus; // 若是头结点的状态为signal,说明存在须要唤醒的后继节点 if (ws == Node.SIGNAL) { // 将头结点状态更新为0(初始值状态),由于此时头结点已经没用了 // continue为了保证替换成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } // 若是状态为初始值状态0,那么设置成PROPAGATE状态 // 确保在释放同步状态时能通知后继节点 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 获取当前节点状态 */ int ws = node.waitStatus;
// 若是当前节点的状态小于0,那么用CAS设置成0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * 获取当前节点的后继节点 */ Node s = node.next; // 若是后继节点为空 || 或者后继节点的状态 > 0 (为取消状态) if (s == null || s.waitStatus > 0) { s = null; // 从尾结点查找状态不为取消的可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒后继节点 LockSupport.unpark(s.thread); }
熬夜到凌晨,终于搞定了这篇文章,晚安,老铁们
![]()
给个[在看],是对IT老哥最大的支持
本文分享自微信公众号 - IT老哥(dys_family)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。