咱们前面几张提到过,JUC 这个包里面的工具类的底层就是使用 CAS 和 volatile 来保证线程安全的,整个 JUC 包里面的类都是基于它们构建的。今天咱们介绍一个很是重要的同步器,这个类是 JDK 在 CAS 和 volatile 的基础上为咱们提供的一个同步工具类。java
AbstractQueuedSynchronizer,JDK 1.5 引入了 JUC 包,这个包提供了一些列支持并发的组件,这些组件是一些列同步器,他们主要完成如下功能:node
AQS 是一个小框架,基于这个框架咱们能够实现不少的同步器,ReentrantLock,CountDownLatch,Semaphore 等都是基于 AQS 实现的。算法
同步器的核心方法是 acquire 和 release 操做。安全
acquire数据结构
while(当前同步器的状态不容许获取操做){多线程
若是当前线程再也不队列中,将其加入队列并发
阻塞当前线程app
}框架
线程若是位于队列中,将其移出队列异步
release
更新同步器的状态
if(新的状态容许某个被阻塞的线程获取成功)
解除队列中一个或多个线程的阻塞状态。
从上面的操做思想中咱们能够提出三大关键操做:同步器状态变动,线程阻塞和释放,插入和移出队列。由此能够引伸出三个基本组件:
同步状态
AQS 类使用 int 值来保存同步状态,而且暴露出 getState,setState 和 compareAndSet 操做来读取和更新这个同步状态。线程经过修改(加/减指定的数量)码是否成功来决定当前线程是否成功获取到同步状态。
State 被声明成了 volatile,保证了可见性和有序性。又经过 CAS 指令来实现 compareAndSet ,使得当且仅当同步状态拥有一个一致的指望值的时候,才会被原子地设置成新值,这样就保证了同步状态的原子性。
阻塞
直到 JSR166,阻塞线程和解除线程阻塞都是基于 Java 的内置管程。
JUC 包使用 LockSupport 类来解决这个问题。LockSupport.park 阻塞当前线程直到有 LockSupport.unpark 方法被调用。
队列
整个框架的核心就是如何管理线程阻塞队列,该队列是严格的 FIFO 队列,所以不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构。这里采用了 CLH 锁。
CLH队列实际并不那么像队列,它的入队和出队与实际的业务密切相关。它是一个链表队列。用过 AQS 的两个字段 head(头节点) 和 tail(尾节点)来存取,这两个字段初始化的时候都指向了一个空节点。
入队操做:
CLH 队列是 FIFO 队列,因此新的节点来到的时候,是要插入到当前队列的尾节点以后。当一个线程获取到同步状态以后,其余线程没法获取,转而被构形成节点加入到同步队列中,并且这个加入队列的过程必需要保证线程安全,所以使用了 CAS方法,它须要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与以前的尾节点创建关联。
出队操做:
由于是 FIFO 队列,因此能成功获取到 AQS 同步状态的一定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,然后续节点会在获取 AQS 同步状态成功的时候将本身设置为首届点。设置首节点是由获取同步成功的线程来完成的,因此不须要像入队这样的 CAS 操做。
条件队列
上一节是 AQS 的同步队列,这一节是条件队列。AQS 只有一个同步队列,可是能够有多个条件队列。AQS 框架提供了一个 ConditionObject 类,给维护独占同步的类以及实现 Lock 接口的类使用。
ConditionObject 类 和 AQS 共用了内部节点,有本身单独的条件队列。Singal 操做是经过将节点从条件队列转移到同步队列来实现的。
singal:
await:
方法结构
组件 | 数据结构 |
---|---|
同步状态 | volatile int state |
阻塞 | LockSupport类 |
队列 | Node节点 |
条件队列 | ConditionObject |
咱们经过独占式同步状态的释放和获取,以及共享式同步状态的释放和获取来看看 AQS 是如何实现的。
独占式
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代码主要完成了同步状态的获取,节点构造,加入同步队列以及在同步队列中自旋等待等相关工做。
来看看节点构造和加入队列的实现:
private Node addWaiter(Node mode) { // 当前线程构形成Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试快速在尾节点后新增节点 提高算法效率 先将尾节点指向pred Node pred = tail; if (pred != null) { //尾节点不为空 当前线程节点的前驱节点指向尾节点 node.prev = pred; //并发处理 尾节点有可能已经不是以前的节点 因此须要CAS更新 if (compareAndSetTail(pred, node)) { //CAS更新成功 当前线程为尾节点 原先尾节点的后续节点就是当前节点 pred.next = node; return node; } } //第一个入队的节点或者是尾节点后续节点新增失败时进入enq enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //尾节点为空 第一次入队 设置头尾节点一致 同步队列的初始化 if (compareAndSetHead(new Node())) tail = head; } else { //全部的线程节点在构造完成第一个节点后 依次加入到同步队列中 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
节点进入同步队列后,就进入了一个自旋的过程,每一个线程节点都在自旋地观察,当条件知足,获取到了同步状态,就能够从自旋过程当中退出,不然依旧自旋。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前线程节点的前驱节点 final Node p = node.predecessor(); //前驱节点为头节点且成功获取同步状态 if (p == head && tryAcquire(arg)) { //设置当前节点为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //是否阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 阻塞线程的过程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点的状态决定后续节点的行为 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /*前驱节点为-1 后续节点能够被阻塞 * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞 * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //阻塞线程 LockSupport.park(this); return Thread.interrupted(); }
当获取同步状态成功以后,对于锁这种并发组件而言,就意味着当前线程获取到了锁。
再看 release 方法:
head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,若是后继节点得到锁成功,会把本身设置为头结点,节点的变化过程以下。修改head节点指向下一个得到锁的节点,新的得到锁的节点,将prev的指针指向null。
public final boolean release(int arg) { if (tryRelease(arg)) {//同步状态释放成功 Node h = head; if (h != null && h.waitStatus != 0) //直接释放头节点 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /*寻找符合条件的后续节点 * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒后续节点 LockSupport.unpark(s.thread); }
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中进行自旋。移除的条件是前驱节点是头节点而且成功获取了同步状态。释放时,会唤醒头节点的后继节点。
ReentrantLock:ReentrantLock 类使用 AQS 同步状态来保存锁重复持有的次数。当锁被一个线程获取时,ReentrantLock 也会记录下当前得到锁的线程表示,以便检查是否重复获取。
ReentrantReadWriteLock:ReentrantReadWriteLock 使用 AQS 同步状态中的 16 为来保存写锁的持有次数,剩下的 16 为来保存读锁的持有次数。WriteLock 的构建方式和 ReentrantLock 同样。ReadLock 则经过使用 acquireShared 方法来支持同时容许多个读线程。
Semaphore:信号量使用 AQS 同步状态来保存信号量当前计数。它里面定义的 acquireShared 方法会减小计数,当计数为非正值时阻塞线程。tryRelease 会增长技术,在计数为正值时还要解除线程的阻塞。
CountDownLatch:使用 AQS 同步状态来表示计数。当该计数为 0 时,全部的 acquire 方法才能经过。
FutureTask:使用 AQS 的同步状态来表示某个异步计算任务的运行状态(初始化,运行中,被取消和完成)。设置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一个 FutureTask 时会调用 AQS 的 release 操做。等待计算结果的线程阻塞解除是经过 AQS 的 acquire 实现的。
SynchronousQueues:SynchronousQueues类使用了内部的等待节点,这些节点能够用于协调生产者和消费者。同时,它使用AQS同步状态来控制当某个消费者消费当前一项时,容许一个生产者继续生产,反之亦然。
多线程并发修改同步状态,修改为功的线程标记为拥有同步状态。
获取失败的线程,加入到同步队列的队尾;加入到队列中后,若是当前节点的前驱节点为头节点再次尝试获取同步状态(下文代码:p == head && tryAcquire(arg))。
若是头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;其余节点则继续自旋。
当线程执行完相应逻辑后,须要释放同步状态,使后继节点有机会同步状态(让出资源,让排队的线程使用)。这时就须要调用release(int arg)方法。调用该方法后,会唤醒后继节点。
后继节点获取同步状态成功,头节点出队。须要注意的事,出队操做是间接的,有节点获取到同步状态时,会将当前节点设置为head,而本来的head设置为null。
当同步队列中头节点唤醒后继节点时,此时可能有其余线程尝试获取同步状态。
假设获取成功,将会被设置为头节点。
头节点后续节点获取同步状态失败。
共享模式和独占模式最主要的区别是在支持同一时刻有多个线程同时获取同步状态。为了不带来额外的负担,在上文中提到的同步队列中都是用独占模式进行讲述,其实同步队列中的节点应该是独占和共享节点并存的。
共享节点尝试获取同步状态。
当一个同享节点获取到同步状态,并唤醒后面等待的共享状态的结果以下图所示:
最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点: