AQS
即AbstractQueuedSynchronizer
类称做队列同步器,是构建其余同步器的一个重要的基础框架,同步器自身是没有实现任何同步接口。它是经过控制一个int
类型的state
变量来表示同步状态,使用一个内置的FIFO
(先进先出)队列来构建工做队列操做。java
同步器定义有两种资源共享方式:Exclusive
(独占式)和Share
(共享式)的获取同步状态。node
独占式:一个时间点只能执行一个线程。 共享式:一个时间点可多个线程同时执行。设计模式
使用方式
同步器的设计采用模板模式,要实现一个同步组件得先继承AbstractQueuedSynchronizer
类,经过调用同步器提供的方法和重写同步器的方法来实现。安全
调用同步器中的方法就是调用前面提到的经过state
变量值的操做来表示同步操做,state
是被volatile
修饰来保证线程可见性。并发
方法名 | 描述 |
---|---|
getState() | 获取当前线程同步状态值。 |
setState(int newState) | 设置当前同步状态值。 |
compareAndSetState(int expect, int update) | 经过CAS 设置state 的值。 |
为了不被重写,以上方法都被final
修饰了。app
实现同步组件,须要本身根据本身定制化的需求进行处理,因此须要本身重写同步器提供的方法,要重写的方法主要是独占式获取与释放同步状态、共享式获取与释放同步状态。框架
tryAcquire(int arg) 独占式获取同步状态,返回值为boolean
类型,获取成返回true
,获取失败返回false
。elasticsearch
tryRelease(int arg) 独占式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。oop
tryAcquireShared(int arg) 共享式获取同步状态,返回值为int
类型,获取成功返回大于 0 的值。ui
tryReleaseShared(int arg) 共享式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。
isHeldExclusively() 独占模式下是否被当前前程独占,返回值为boolean
类型,已被当前线程所独占返回true
,反之为false
。
同步器队列
一个同步器里面拥有一个同步队列和多个等待队列。
同步队列
在AbstractQueuedSynchronizer
类中,有一个内部类Node
,经过该类构造一个内部的同步队列,这是一个FIFO 双向队列。 当前运行线程回去同步状态时,若是获取失败,则将当前线程信息建立一个Node
追加到同步队列尾部,而后阻塞当前线程,直到队列的上一个节点的同步状态释放,再唤醒当前线程尝试从新获取同步状态。这个从新获取同步状态操做的节点,必定要是同步队列中第一节点。
Node 源码以下:
static final class Node { // 共享模式下等待的标记 static final Node SHARED = new Node(); // 独占模式下等待的标记 static final Node EXCLUSIVE = null; /* * 等待状态常量值,如下四个常量都是 */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 等待状态 volatile int waitStatus; // 当前节点的前驱节点 volatile Node prev; // 当前节点的后继节点 volatile Node next; // 获取同步状态的线程(引用) volatile Thread thread; // 等待队列中的后继节点 Node nextWaiter; // 是否共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
经过以上代码,能够看到节点中保存了节点模式、等待状态、线程引用、前驱和后继节点,构造节点。
同步队列中被阻塞的线程的等待状态包含有四个常量值:CANCELLED、SIGNAL、CONDITION、PROPAGATE ,它们对应的被阻塞缘由以下:
CANCELLED
同步队列中当前节点的线程等待超时或被中断,须要从同步队列中取消等待。SIGNAL
当前节点释放同步状态或被取消后,通知后继节点的线程运行。CONDITION
当前节点在 Condition 上等待,当其余线程对 Condition 调用了 signal() 方法后,该节点将添加到同步队列中。PROPAGATE
该状态存在共享模式的首节点中,当前节点唤醒后将传播唤醒其余节点。
同步器中持有同步队列的首节点和尾节点的引用,在AbstractQueuedSynchronizer
中分别对应head
和tail
字段。
因此同步队列的基本结构如图:
等待队列
AbstractQueuedSynchronizer
类中包含一个内部类ConditionObject
,该类实现了Condition
的接口。一个Condition
对象包含一个等待队列,同时Condition
对象能够实现等待/通知功能。
Condition
持有等待队列中的首节点(firstWaiter)和尾节点(lastWaiter),以下图代码所示:
若是当前线程调用Condition.await()
时,会将当前线程信息构建一个 Node 节点,由于Condition
持有等待队列中的首尾节点,因此将当前等待队列中的尾节点的nextWaiter
指向当前线程构建的节点,同时更新lastWaiter
的引用节点。
上述过程当中的节点、队列的操做,是获取到锁的线程来调用Condition.await()
的,因此整个执行过程在没有基于 CAS 的状况下,也是线程安全的。
经过以上的描述,能够知道一个同步器中同步队列、等待队列构成的示意图:
当调用Condition.await()
时,同步队列中的首节点,也就是当前线程所建立的节点,会加入到等待队列中的尾部,释放当前线程的同步状态而且唤醒同步队列的后继节点,当前线程也就进入等待状态,这个前后顺序不能颠倒。这个过程至关于同步队列的首节点的线程构造新的节点加入到等待队列的尾部。
当调用Condition.signal()
方法时,会先将等待队列中首节点转移到同步队列尾部,而后唤醒该同步队列中的线程,该线程从Condition.await()
中自旋退出,接着在在同步器的acquireQueued()
中自旋获取同步状态。
当调用Condition.wait()
方法,同步队列首节点转移到等待队列方法:
public final void await() throws InterruptedException { // 若是线程已中断,则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 添加节点到等待队列中 Node node = addConditionWaiter(); // 修改 state 来达到释放同步状态,避免死锁 int savedState = fullyRelease(node); int interruptMode = 0; // 判断当前节点是否在同步队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 继续获取同步状态竞争 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // 清除已取消的节点 unlinkCancelledWaiters(); if (interruptMode != 0) // 被中断时的处理 reportInterruptAfterWait(interruptMode); }
上面addc
方法是向等待队列中添加一个新的节点。
private Node addConditionWaiter() { // 获取等待队列中尾节点 Node t = lastWaiter; // 若是最后一个节点已取消,则清除取消节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 利用当前线程信息建立等待队列的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 若是最后尾节点为空,当前节点则为等待队列的首节点 firstWaiter = node; else // 不然将当前尾节点的下一个节点指向当前线程信息所构造的节点 t.nextWaiter = node; lastWaiter = node; // 更新 Condition 的尾节点引用 return node; }
当调用Condition.signal()
方法,等待队列首节点转移到同步队列方法:
public final void signal() { // 是否被当前线程所独占 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取等待队列中首节点 Node first = firstWaiter; if (first != null) // 转移到同步队列,而后唤醒该节点 doSignal(first); }
转移同步队列首节点到同步队列,并唤醒该节点方法doSignal()
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 去除首节点 first.nextWaiter = null; } while (!transferForSignal(first) && // 从等待队列中转移到同步队列 (first = firstWaiter) != null); }
转移等待队列到同步队列方法transferForSignal(Node node)
final boolean transferForSignal(Node node) { // 验证节点是否被取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 转移节点至同步队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
等待队列中的头结点线程安全移动到同步队列方法enq(final Node node)
private Node enq(final Node node) { for (;;) { Node t = tail; // 同步队列中若是为空,则初始化同步器 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 不然新节点的前驱节点为当前同步队列的尾节点 node.prev = t; // 设置当前新节点为同步队列的尾节点,并更新先前同步队列的尾节点的后继节点指向当前新节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
独占式同步状态
独占式同步状态获取和释放是线程安全的操做,一个时间点确保只有一个线程获取到同步状态。
独占式同步状态获取
acquire(int arg)
方法是获取独占式同步状态的方法,当线程获取同步失败时,会加入到同步队列中。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代码中,当执行tryAcquire(int arg)
方法获取同步状态失败时,接着经过addWaiter(Node.EXCLUSIVE)
构造当前线程信息的节点,随后将新构造的节点经过acquireQueued(final Node node, int arg)
方法加入到同步队列中,节点在同步队列中自旋等待获取同步状态。
tryAcquire(int arg)
是自定义同步器实现的,实现该方法须要保证线程安全获取同步状态,前面讲到AQS
提供的compareAndSetState(int expect, int update)
方法经过CAS
设置state
值来保证线程安全。
上面获取独占式同步状态时,主要分析acquireQueued(final Node node, int arg)
方法,节点加入队列并自旋等待。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 是否中断标识 boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 若是前驱节点是首节点,而且当前节点获取到同步状态 if (p == head && tryAcquire(arg)) { // 将当前节点设置为首节点 setHead(node); // 将原首节点(即当前节点的前驱节点)引用清空,利于 GC 回收 p.next = null; // 成功获取到同步状态标志 failed = false; return interrupted; } // 判断前驱节点是否超时或取消,以及当前线程是否被中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 若是被中断,则节点出队 if (failed) cancelAcquire(node); } }
在首节点释放同步状态后,同时唤醒后继节点。后继节点经过自旋的方式(这里利用死循环方式)也会检查本身的前驱节点是否为首节点,若是是前驱节点则会尝试获取同步状态。获取成功则返回,不然判断是否被中断或者继续自旋上述获取同步状态操做。
独占式同步状态释放
release(int arg)
方法是释放同步状态,当释放同步状态后会唤醒后继节点。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease(int arg)
方法一样也是自定义同步器实现。当首节点不为空且处于等待状态时,那么调用unparkSuccessor(Node node)
方法唤醒后继节点。
private void unparkSuccessor(Node node) { // CAS 设置等待状态为初始状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 若是当前释放同步状态的节点不存在后继节点或后继节点超时/被中断 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点中开始寻找等待状态的节点做为新首节点,这里已排除当前节点(t != node) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
释放同步状态的整个过程就是:释放同步状态,唤醒后继节点。这个后继节点必须知足,非空、非当前节点、等待状态小于或等于 0 ,即SIGNAL
、CONDITION
、PROPAGATE
和初始化状态。
独占式资源共享方式除了上面的同步状态获取,还有独占式超时获取使用的方法是doAcquireNanos(int arg, long nanosTimeout)
、独占式可中断获取使用的方法是acquireInterruptibly(int arg)
。
共享式同步状态
共享式同步状态同一时间点能够有多个线程获取到同步状态。
共享式同步状态获取
acquireShared(int arg)
方法是共享式同步状态获取的方法。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 获取同步状态失败后调用的方法 doAcquireShared(arg); }
tryAcquireShared(int arg)
方法是自定义同步器实现的,返回大于或等于 0 时,表示获取成功。若是小于 0 时,获取同步状态失败后会调用doAcquireShared(int arg)
方法进行再次尝试获取。
private void doAcquireShared(int arg) {、 // 构造一个当前线程信息的新节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋式获取同步状态 for (;;) { final Node p = node.predecessor(); // 判断新节点的前驱节点是否为首节点 if (p == head) { // 再次尝试获取同步状态 int r = tryAcquireShared(arg); // 获取成功后退出自旋 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上面代码中,当获取同步状态失败后,则建立一个共享模式类型的节点,而后自旋式获取同步状态,若是前驱节点为首节点时则尝试再次获取同步状态,获取同步状态成功后退出当前自旋。
共享式释放同步状态
releaseShared(int arg)
方法来释放共享式同步状态。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 同步状态释放成功后,唤醒后面等待状态的节点 doReleaseShared(); return true; } return false; }
上面tryReleaseShared(int arg)
释放同步状态方法必须保证线程安全,由于它多个线程获取到同步状态时会引起并发操做,能够经过循环操做和 CAS 来确保安前行。
doReleaseShared()
方法唤醒后续等待状态的节点。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 验证后继节点的线程处于等待状态 if (ws == Node.SIGNAL) { // 再次检查后继节点的线程是否处于等待状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒后继节点,这时每唤醒一次就更新一次首节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
共享同步状态释放后,自旋式依次唤醒队列中节点。
总结
从 AQS 中能够借鉴它利用循环和 CAS 来确保并发的安全性的思路,同时它采用模板设计模式定义一个处理逻辑,将具体的特定处理逻辑交由子类自定义实现。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 以及 Tomncat 的 LimitLatch 都有用其做为同步器。
推荐阅读