AQS全称AbstractQueuedSynchronizer(抽象队列同步器)java
AQS中维护了一个被volatile修饰的int类型的同步状态state,以及CLH等待队列。node
state同步状态用于维护同步资源被使用的状况,AQS自己并不关心state的值及其含义,彻底由AQS的子类去定义以及维护。设计模式
CLH等待队列是由一个双向链表来实现的,存在head和tail指针分别指向链表中的头节点以及尾节点,同时链表中的节点由AQS中的Node静态内部类来表示。安全
ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore底层都是基于AQS来实现的。多线程
AQS支持两种模式,一种是独占模式,一种是共享模式。并发
独占模式表示,同步资源在同一时刻只能被一个线程所持有,对应AQS的acquire()以及release()方法。ide
共享模式表示,同步资源在同一时刻能够被多个线程所持有,对应AQS的acquireShared()以及releaseShared()方法。源码分析
acquire()方法:独占模式下获取同步资源。 release()方法:独占模式下释放同步资源。 acquireShared()方法:共享模式下获取同步资源。 releaseShared()方法:共享模式下释放同步资源。
AQS使用了模板方法设计模式,在acquire()、release()、acquireShared()、releaseShared()方法中都会调用其对应的try方法,好比acquire()方法中会调用tryAcquire()方法,release()方法中会调用tryRelease()方法,AQS子类只须要重写AQS提供的tryAcquire()、tryRelease()或tryAcquireShared()、tryReleaseShared()方法便可,同时须要保证方法的实现是线程安全的。ui
tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()方法都没有使用abstract进行修饰,同时方法中都会直接抛出UnsupportedOperationException异常,好处是不须要强制子类同时实现独占模式和共享模式中的方法,由于大多数AQS的子类都仅支持一种模式,用户只须要根据实际状况进行选择便可。this
tryAcquire(int arg)方法:独占模式下尝试获取同步资源,同时AQS规定,若是获取同步资源成功则返回true,不然返回false。 tryRelease(int arg)方法:独占模式下尝试释放同步资源,同时AQS规定,若是释放同步资源成功则返回true,不然返回false。 tryAcquireShared(int arg)方法:共享模式下尝试获取同步资源,同时AQS规定,若是获取同步资源失败则返回负数,不然返回剩余的资源个数。 tryReleaseShared(int arg)方法:共享模式下尝试释放同步资源,同时AQS规定,若是释放同步资源成功则返回true,不然返回false。
Node类提供的核心属性
// 节点封装的线程 volatile Thread thread; // 指向前驱节点的指针 volatile Node prev; // 指向后继节点的指针 volatile Node next; // 节点的等待状态(默认为0)(默认为0)(默认为0) volatile int waitStatus; // 下一个正在等待的节点 Node nextWaiter; // 共享模式下的标识节点 static final Node SHARED = new Node(); // 独占模式下的标识节点 static final Node EXCLUSIVE = null;
同时Node类中维护了一系列节点的等待状态值
// CANCELLED状态,表示线程已超时等等,处于CANCELLED状态的节点会从等待队列中剔除,不会参与到同步资源的竞争当中 static final int CANCELLED = 1; // SIGNAL状态,若是节点的等待状态为SIGNAL,那么当它释放同步资源时,将会唤醒离它最近的同时等待状态不为CANCELLED的后继节点(同时也能说明节点存在后继节点) static final int SIGNAL = -1; // 表示线程在指定的条件下进行等待 static final int CONDITION = -2; // PROPAGATE状态,表示实际存在可用资源,须要再往下传播(唤醒) static final int PROPAGATE = -3;
所以每一个Node节点中都会包含节点封装的线程、分别指向前驱和后继节点的指针、节点的等待状态、指向下一个正在等待的节点的指针。
/** * 自定义AQS独占模式下的同步器来实现独享锁 */ public class Mutex implements Lock, java.io.Serializable { /** * 自定义AQS独占模式下的同步器 * 使用state为0表示当前锁没有被线程所持有 * 使用state为1表示当前锁已经被线程所持有 */ private static class Sync extends AbstractQueuedSynchronizer { /** * 判断锁是否被当前线程所持有 */ protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /** * 尝试获取锁 * 判断锁是否存在,若是锁不存在则获取锁(经过CAS控制) */ public boolean tryAcquire(int acquires) { assert acquires == 1; // 值必须是1(独享锁只有一把锁嘛) if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); // 将当前线程设置为独占模式下拥有同步资源的线程 return true; } return false; } /** * 尝试释放锁(要求被谁加的锁只能被谁释放) * 判断当前拥有同步资源的线程是否为当前线程,若是不是则抛出异常,不然释放锁 * 这里有三种调用状况,锁空闲的状态下调用、锁已经被线程所持有但被并不是拥有锁的线程调用、锁已经被线程所持有并被拥有锁的线程调用,只有第三种状况才可以解锁成功 */ protected boolean tryRelease(int releases) { assert releases == 1; // 值必须是1(独享锁只有一把锁嘛) if (Thread.currentThread() != getExclusiveOwnerThread()) // 要求被谁加的锁只能被谁释放 throw new IllegalMonitorStateException(); if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); // 将独占模式中拥有同步资源的线程置为NULL setState(0); return true; } /** * 提供一个Condition实例 */ Condition newCondition() { return new ConditionObject(); } /** * 判断锁是否被线程所持有 */ final boolean isLocked() { return getState() == 1; } } /** * 同步器 */ private final Sync sync = new Sync(); /** * 加锁 */ public void lock() { sync.acquire(1); } /** * 尝试获取锁 */ public boolean tryLock() { return sync.tryAcquire(1); } /** * 解锁 * 解锁只能调用同步器的release(),不能调用tryRelease()方法,由于tryRelease()方法只是简单的修改一下同步状态的值而已,并无去唤醒等待队列中的线程,正常是须要唤醒等待队列中离头节点最近的同时等待状态不为CANCELLED的节点 */ public void unlock() { sync.release(1); } /** * 返回与此Mutex绑定的Condition实例 */ public Condition newCondition() { return sync.newCondition(); } /** * 判断锁是否被线程所持有 */ public boolean isLocked() { return sync.isLocked(); } /** * 判断是否有线程在等待获取锁 */ public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 可能抛出InterruptedException的加锁(若是线程被设置了中断标识那么直接抛出异常) */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 在指定的时间内尝试获取锁 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
AQS子类(同步器)通常都是经过内部类实现,而后做为内部组件来使用。
public class Main { static class MyRunnable implements Runnable { private Mutex mutex = new Mutex(); @Override public void run() { System.out.println(String.format("%s Running", Thread.currentThread().getName())); mutex.lock(); System.out.println(String.format("%s加锁", Thread.currentThread().getName())); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } mutex.unlock(); System.out.println(String.format("%s解锁", Thread.currentThread().getName())); } } public static void main(String[] args) { Runnable runnable = new MyRunnable(); Thread threadA = new Thread(runnable, "线程A"); Thread threadB = new Thread(runnable, "线程B"); Thread threadC = new Thread(runnable, "线程C"); threadA.start(); threadB.start(); threadC.start(); } }
能够看到该独享锁是公平锁,多线程按照申请锁的顺序获取锁。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
总结:当线程要获取同步资源时,能够调用acquire()或者tryAcquire()方法,acquire()方法中会调用AQS子类的tryAcquire()方法,尝试获取同步资源,若是获取同步资源成功,则直接返回,作本身的事情,不然将会执行addWaiter()方法,将当前线程封装成Node节点而后加入到等待队列当中,而后执行acquireQueued()方法,用于自旋获取同步资源,若是全部条件都知足那么最后将会执行selfInterrupt()方法。
private Node addWaiter(Node mode) { // 将当前线程封装成Node节点,而且指定为独占模式,独占模式Node.EXCLUSIVE为NULL,也就是说节点的nextWaiter为NULL Node node = new Node(Thread.currentThread(), mode); // 将节点加入到队尾当中 Node pred = tail; if (pred != null) { // 将当前节点的前驱指针指向尾节点 node.prev = pred; // 经过CAS设置尾节点(若是pred指针所指向的尾节点就是当前的尾节点,也就是在这个过程中没有其余节点插入到队尾,则将tail指针指向当前节点) if (compareAndSetTail(pred, node)) { // 将以前尾节点的后继指针指向当前节点 pred.next = node; return node; } } // 若是不存在尾节点,也就是队列为空,或者经过CAS设置尾节点失败(也就是在这个过程中有其余节点插入到队尾),那么将会经过enq()方法死循环进行设置。 enq(node); // 不管怎么样该方法最终都会返回封装了当前线程的节点。 return node; }
总结:addWaiter()方法用于将当前线程封装成Node节点而后加入到等待队列当中,若是在这个过程当中,等待队列为空或者经过CAS设置尾节点失败,那么将会经过enq()方法死循环进行设置。
private Node enq(final Node node) { // 死循环 for (;;) { Node t = tail; // 若是尾节点为空则初始化队列,建立一个空的节点,而且将head和tail指针都指向这个节点 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 将当前节点的前驱指针指向尾节点 node.prev = t; // 经过CAS设置尾节点(若是t指针所指向的节点就是当前的尾节点,也就是在这个过程中没有其余节点插入到队尾,则将tail指针指向当前节点) if (compareAndSetTail(t, node)) { // 将以前的尾节点的后继指针指向当前节点 t.next = node; return t; } } } }
总结:enq()方法中使用死循环初始化队列以及经过CAS设置尾节点,直到尾节点被设置成功,同时须要注意的是当队列初始化后会有一个空的头节点,该节点不包含任何的线程,而后再将当前节点加入到队列当中。
final boolean acquireQueued(final Node node, int arg) { // 失败标识 boolean failed = true; try { // 中断标识 boolean interrupted = false; // 自旋 for (;;) { // 获取节点的前驱节点 final Node p = node.predecessor(); // 若是节点的前驱节点是头节点那么尝试获取同步资源 // 强制要求队列中的节点获取同步资源的顺序必须是从队头到队尾,不然将会形成节点丢失,丢失了的节点中的线程将会永远处于阻塞状态,同时只有当线程获取了同步资源后,它才能成为头节点(队列初始化后的头节点除外),所以头节点确定是已经获取过同步资源的(队列初始化后的头节点除外),所以为了遵循队列中的节点获取同步资源的顺序必须是从队头到队尾,因此永远只有头节点的后继节点拥有尝试获取同步资源的权利,所以当在尝试获取同步资源以前,须要先判断一下当前节点的前驱节点是不是头节点,若是不是就不用获取了 if (p == head && tryAcquire(arg)) { // 当获取同步资源成功,则将当前节点设置为头节点 setHead(node); // 将以前头节点的后继指针设置为null,帮助GC p.next = null; failed = false; // 返回中断标识 return interrupted; } // 若是节点的前驱节点不是头节点,或者尝试获取同步资源失败,那么将会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 若是在执行该方法的过程当中,抛出了异常(线程超时等等),则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。 if (failed) cancelAcquire(node); } }
总结:acquireQueued()方法用于自旋获取同步资源,同时该方法的方法出口只有一个,也就是当节点的前驱节点是头节点,同时尝试获取同步资源成功,那么就会将当前节点设置为头节点,不然就会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程,等待被唤醒,同时在执行acquireQueued()方法的过程当中,若是抛出了异常,则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取节点的前驱节点的等待状态 int ws = pred.waitStatus; // 若是前驱节点的等待状态为SIGNAL,那么当它释放同步资源时,将会自动唤醒离它最近的同时等待状态不为CANCELLED的后继节点,所以当前节点就能够直接阻塞了,等待被唤醒时再去尝试获取同步资源 if (ws == Node.SIGNAL) return true; // 若是前驱节点的等待状态为CANCELLED,那么经过循环找到前一个不为CANCELLED状态的节点,而且将当前节点的前驱指针指向该节点,将该节点的后继指针指向当前节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 经过CAS将前驱节点的等待状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
总结:shouldParkAfterFailedAcquire()方法用于判断线程可否进行阻塞,以及剔除被设置为CANCELLED状态的节点。
正常状况下,线程第一次进来shouldParkAfterFailedAcquire()方法时,会将前驱节点的等待状态设置为SIGNAL,而后再次自旋进来该方法,判断到前驱节点的等待状态为SIGNAL,直接返回,而后就进入待阻塞状态。
当该节点的前驱节点被CANCELLED时,若是前驱节点的前驱节点是头节点,那么将会唤醒当前节点,那么它会再次自旋进来该方法,判断到前驱节点的等待状态为CANCELLED,就会将当前节点的前驱指针指向前一个不为CANCELLED状态的节点,也就是头节点,而后再将头节点的后继指针指向当前节点,而后再次自旋进来该方法,判断到前驱节点的等待状态为SIGNAL,直接返回,再次进入待阻塞状态。
不管怎么样经过shouldParkAfterFailedAcquire()方法的全部节点最终都会进入待阻塞状态,也就是说等待队列中除了头节点之外的全部线程都会处于阻塞状态。
private final boolean parkAndCheckInterrupt() { // 阻塞当前线程,blocker对象使用当前对象 LockSupport.park(this); // 当被唤醒时返回线程的中断标识 return Thread.interrupted(); }
总结:parkAndCheckInterrupt()方法用于阻塞线程,同时当线程被唤醒时会返回线程的中断标识,尽管若是线程被设置了中断标识,但也不会影响线程继续往下执行,只不过当它成功获取到同步资源时,会调用一次selfInterrupt()方法,再次为线程设置中断标识。
static void selfInterrupt() { // 为线程设置中断标识 Thread.currentThread().interrupt(); }
总结:当获取了同步资源的线程被设置了中断标识,才会调用selfInterrupt()方法,再次为线程设置中断标识,由于在parkAndCheckInterrupt()方法中已经调用过一次Thread.interrupted()方法,避免外部又再次调用Thread.interrupted()方法致使线程的中断标识被清除。
private void cancelAcquire(Node node) { if (node == null) return; // 将当前节点封装的线程设置为NULL node.thread = null; // 经过循环获取当前节点不为CANCELLED状态的前驱节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取前驱节点的后继节点(若是节点的前驱节点不是CANCELLED状态,那么前驱节点的后继节点就是它本身) Node predNext = pred.next; // 将节点的等待状态设置为CANCELLED node.waitStatus = Node.CANCELLED; // 若是当前节点是尾节点,则直接经过CAS将tail指针指向当前节点不为CANCELLED状态的前驱节点,同时经过CAS将前驱节点的后继指针设置为NULL if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 若是当前节点的前驱节点不是头节点 同时 前驱节点的等待状态为SIGNAL(若是不是SIGNAL那就设置为SIGNAL) 且 前驱节点封装的线程不为NULL if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 获取节点的后继节点 Node next = node.next; // 若是后继节点的等待状态不为CANCELLED,则经过CAS将前驱节点的后继指针指向当前节点的后继节点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); // 这里并无将当前节点的后继节点的前驱指针指向前驱节点(不用设置,unparkSuccessor()方法会自动跳过) } else { // 若是当前节点的前驱节点是头节点,则直接唤醒当前节点的后继节点,让它来剔除当前节点 unparkSuccessor(node); } node.next = node; } }
总结:若是线程在阻塞的过程中抛出了异常,也就是直接中断acquireQueued()方法,而后执行finally语句块,因为failed标识为true,所以会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,若是当前节点是尾节点,则直接经过CAS将tail指针指向当前节点不为CANCELLED状态的前驱节点,同时将该前驱节点的后继指针设置为NULL,若是当前节点的前驱节点不是头节点,则经过CAS将前驱节点的后继指针指向当前节点的后继节点,若是当前节点的前驱节点是头节点,那么唤醒当前节点的后继节点,让它来剔除当前节点。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 若是队列不等于空,同时头节点的等待状态不为0,也就是头节点存在后继节点,那么调用unparkSuccessor()方法,唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
总结:当获取了同步资源的线程释放同步资源时(外部线程或者头节点中的线程),将会调用release()方法,release()方法中会调用AQS子类的tryRelease()方法,尝试释放同步资源,若是释放同步资源成功,同时队列不为空以及头节点的等待状态不为0,也就是头节点存在后继节点,那么就会调用unparkSuccessor()方法,唤醒离头节点最近的(也就是头节点的后继节点)同时等待状态不为CANCELLED的后继节点,那么该节点将会经过自旋尝试获取同步资源。
private void unparkSuccessor(Node node) { // 获取节点的等待状态 int ws = node.waitStatus; // 若是节点的等待状态不为CANCELLED,则经过CAS将节点的等待状态设置为0(恢复成队列初始化后的状态) if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取节点的后继节点 Node s = node.next; // 若是节点的后继指针为NULL(不能说明节点就没有后继节点)或者后继节点为CANCELLED状态,那么就从后往前寻找离当前节点最近的同时等待状态不为CANCELLED的后继节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒该后继节点中的线程 LockSupport.unpark(s.thread); }
总结:unparkSuccessor()方法用于唤醒离节点最近的同时等待状态不为CANCELLED的后继节点,若是节点的后继指针为NULL,不能说明节点就没有后继节点,或者后继节点的等待状态为CANCELLED,则从后往前,寻找离节点最近的同时等待状态不为CANCELLED的节点,最终唤醒该节点中的线程。
1.当线程要获取同步资源时,能够调用acquire()或者tryAcquire()方法,acquire()方法中会调用AQS子类的tryAcquire()方法,尝试获取同步资源,若是获取同步资源成功,则直接返回,作本身的事情,不然将会执行addWaiter()方法,将当前线程封装成Node节点而后加入到等待队列当中,而后执行acquireQueued()方法,用于自旋获取同步资源,若是全部条件都知足那么最终将会执行selfInterrupt()方法。
2.addWaiter()方法用于将当前线程封装成Node节点而后加入到等待队列当中,若是在这个过程当中,等待队列为空或者经过CAS设置尾节点失败(也就是当前指针所指向的尾节点并非真正的尾节点,也就是在这个过程中有其余节点插入到队尾),那么将会经过enq()方法死循环进行设置。
3.enq()方法中使用死循环初始化队列以及经过CAS设置尾节点,直到尾节点被设置成功,同时须要注意的是当队列初始化后会有一个空的头节点,该节点不包含任何的线程,而后再将当前节点加入到队列当中。
4.acquireQueued()方法用于自旋获取同步资源,同时该方法的方法出口只有一个,也就是当节点的前驱节点是头节点,同时尝试获取同步资源成功,那么就会将当前节点设置为头节点,不然就会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程,等待被唤醒,同时在执行acquireQueued()方法的过程当中,若是抛出了异常,则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。
5.shouldParkAfterFailedAcquire()方法用于判断线程可否进行阻塞,以及剔除被设置为CANCELLED状态的节点,正常状况下,线程第一次进来shouldParkAfterFailedAcquire()方法时,会将前驱节点的等待状态设置为SIGNAL,而后再次自旋进来该方法,判断到前驱节点的等待状态为SIGNAL,直接返回,而后就进入待阻塞状态,当该节点的前驱节点被CANCELLED时,若是前驱节点的前驱节点是头节点,那么将会唤醒当前节点,那么它会再次自旋进来该方法,判断到前驱节点的等待状态为CANCELLED,就会将当前节点的前驱指针指向前一个不为CANCELLED状态的节点,也就是头节点,而后再将头节点的后继指针指向当前节点,而后再次自旋进来该方法,判断到前驱节点的等待状态为SIGNAL,直接返回,再次进入待阻塞状态,不管怎么样经过shouldParkAfterFailedAcquire()方法的全部节点最终都会进入待阻塞状态,也就是说等待队列中除了头节点之外的全部线程都会处于阻塞状态。
6.parkAndCheckInterrupt()方法用于阻塞线程,同时当线程被唤醒时会返回线程的中断标识,尽管若是线程被设置了中断标识,但也不会影响线程继续往下执行,只不过当它成功获取到同步资源时,会调用一次selfInterrupt()方法,再次为线程设置中断标识,由于在parkAndCheckInterrupt()方法中已经调用过一次Thread.interrupted()方法,避免外部又再次调用Thread.interrupted()方法致使线程的中断标识被清除。
此时等待队列中除了头节点之外的全部线程都会处于阻塞状态
1.若是线程在阻塞的过程中抛出了异常,也就是直接中断acquireQueued()方法,而后执行finally语句块,因为failed标识为true,所以会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,若是当前节点是尾节点,则直接经过CAS将tail指针指向当前节点不为CANCELLED状态的前驱节点,同时将该前驱节点的后继指针设置为NULL,若是当前节点的前驱节点不是头节点,则经过CAS将前驱节点的后继指针指向当前节点的后继节点,若是当前节点的前驱节点是头节点,那么唤醒当前节点的后继节点,让它来剔除当前节点。
2.当获取了同步资源的线程释放同步资源时(外部线程或者头节点中的线程),将会调用release()方法,release()方法中会调用AQS子类的tryRelease()方法,尝试释放同步资源,若是释放同步资源成功,同时队列不为空以及头节点的等待状态不为0,也就是头节点存在后继节点,那么就会调用unparkSuccessor()方法,唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,那么该节点将会经过自旋尝试获取同步资源。
3.在调用unparkSuccessor()方法唤醒离节点最近的同时等待状态不为CANCELLED的后继节点时,若是节点的后继指针为NULL,不能说明节点就没有后继节点,或者后继节点的等待状态为CANCELLED,则从后往前,寻找离节点最近的同时等待状态不为CANCELLED的节点,最终唤醒该节点中的线程。
为何要用CAS设置尾节点?
若是在设置尾节点的这个过程中,有其余节点插入到队尾,而后将tail指针指向当前节点,当前节点的前驱指针指向以前的尾节点,以前的尾节点的后继指针指向当前节点,那么中间插入的节点就会丢失。
在acquireQueued()方法中,为何尝试获取同步资源以前,须要先判断一下当前节点的前驱节点是不是头节点?
强制要求等待队列中的节点获取同步资源的顺序必须是从队头到队尾,不然将会形成节点丢失,丢失了的节点中的线程将会永远处于阻塞状态(当同步资源被释放时,还没来得及唤醒离头节点最近同时等待状态不为CANCELLED的后继节点时,等待队列中一个排在很后的节点被唤醒,而后它将会经过自旋尝试获取同步资源,一旦它获取了同步资源,那么它将成为头节点,最终它与以前头节点之间的全部节点中的线程将会永远处于阻塞状态),同时只有当线程获取了同步资源后,它才能成为头节点(队列初始化后的头节点除外),所以头节点确定是已经获取过同步资源的(队列初始化后的头节点除外),所以为了遵循队列中的节点获取同步资源的顺序必须是从队头到队尾,因此永远只有头节点的后继节点拥有尝试获取同步资源的权利,所以当在尝试获取同步资源以前,须要先判断一下当前节点的前驱节点是不是头节点,若是不是就不用获取了,至于头节点释放同步资源后,可否被后继节点获取到同步资源另说,由于当同步资源被释放时,被唤醒的后继节点可能还没来得获取同步资源,此时就被外部线程直接获取了,所以被唤醒的这个线程又只能再次进入阻塞状态。
为何在unparkSuccessor()方法中,若是节点的后继指针为NULL,须要从后往前寻找离节点最近的同时等待状态不为CANCELLED的后继节点,而不从前日后进行寻找?
若是节点的后继指针为NULL,不能说明节点就没有后继节点,由于不管是在addWaiter()方法仍是enq()方法将节点加入到队列,它老是先将当前节点的前驱指针指向尾节点,而后再经过CAS将tail指针指向当前节点,若是在将以前尾节点的后继指针指向当前节点以前,须要唤醒尾节点的后继节点,因为此时尾节点的后继指针仍然为NULL,所以没法经过next指针从前日后寻找,只能经过pred指针从后往前寻找。
线程在什么状况会被唤醒?
线程被唤醒只有两种状况
一种是外部线程或者头节点释放同步资源时,须要唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,那么该节点就会经过自旋尝试获取同步资源。
一种是当节点的前驱节点被CANCELLED时,若是前驱节点的前驱节点是头节点,那么将会唤醒当前节点,将当前节点的前驱指针指向前一个不为CANCELLED状态的节点,也就是头节点,而后再将头节点的后继指针指向当前节点。
等待队列中处于CANCELLED状态的节点何时被剔除?
cancelAcquire()和shouldParkAfterFailedAcquire()方法均可以剔除等待队列中处于CANCELLED状态的节点。
*在unparkSuccessor()中须要剔除处于CANCELLED状态的节点是为了不同步问题,可能存在一个处于CANCELLED状态的节点将来得及被剔除,而后它又做为要唤醒的节点的后继节点。
/** * 自定义AQS共享模式下的同步器来实现共享锁 */ public class Share { /** * 自定义AQS共享模式下的同步器 */ private static class Sync extends AbstractQueuedSynchronizer { /** * 存储线程获取同步资源的状况 */ private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>(); /** * 初始化同步资源 */ public Sync(int state) { setState(state); } /** * 尝试获取同步资源(须要保证是线程安全的) */ @Override protected int tryAcquireShared(int arg) { int state = getState(); int available = state - arg; if (available >= 0 && compareAndSetState(state, available)) { // 经过CAS保证原子性 threadLocal.set(arg); return available; } return -1; } /** * 释放同步资源(线程释放同步资源的个数必须等于它获取同步资源的个数) */ @Override protected boolean tryReleaseShared(int arg) { if (threadLocal.get() != arg) throw new UnsupportedOperationException(); if (compareAndSetState(getState(), getState() + arg)) { // 经过CAS保证原子性 threadLocal.set(null); return true; } return false; } } /** * 初始化同步器的同步资源 */ public Share(int permits) { sync = new Sync(permits); } public Sync sync; /** * 获取许可 */ public void acquire(int permits) { sync.acquireShared(permits); } /** * 尝试获取许可 */ public boolean tryAcquire(int permits) { return sync.tryAcquireShared(permits) >= 0; } /** * 释放许可 */ public boolean release(int permits) { return sync.releaseShared(permits); } }
public class Main { static class MyRunnable implements Runnable { private Share share; private int permits; @Override public void run() { System.out.println(String.format("%s Running", Thread.currentThread().getName())); share.acquire(permits); System.out.println(String.format("%s获取了%s个许可", Thread.currentThread().getName(), permits)); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } share.release(permits); System.out.println(String.format("%s释放了%s个许可", Thread.currentThread().getName(), permits)); } public MyRunnable(Share share, int permits) { this.share = share; this.permits = permits; } } public static void main(String[] args) { Share share = new Share(10); Thread threadA = new Thread(new MyRunnable(share,5),"线程A"); Thread threadB = new Thread(new MyRunnable(share,4),"线程B"); Thread threadC = new Thread(new MyRunnable(share,3),"线程C"); threadA.start(); threadB.start(); threadC.start(); } }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
总结:当线程要获取同步资源时,能够调用acquireShared()或tryAcquireShared()方法,acquireShared()方法中会调用AQS子类的tryAcquireShared()方法,尝试获取同步资源,若是获取同步资源成功,则直接返回,作本身的事情,不然将会调用doAcquireShared()方法。
private void doAcquireShared(int arg) { // 将当前线程封装成Node节点,而后加入到等待队列当中 // 当前节点会被指定为共享模式,共享模式Node.SHARED为一个空的节点,也就是说节点的nextWaiter不为NULL(isShared()方法返回true) // 在调用addWaiter()方法的过程当中,若是等待队列为空或者经过CAS设置尾节点失败,那么将会经过enq()方法死循环进行设置 final Node node = addWaiter(Node.SHARED); // 失败标识 boolean failed = true; try { // 中断标识 boolean interrupted = false; // 自旋 for (;;) { // 获取节点的前驱节点 final Node p = node.predecessor(); // 若是节点的前驱节点是头节点,则尝试获取同步资源 if (p == head) { int r = tryAcquireShared(arg); // 若是获取同步资源成功,则调用setHeadAndPropagate()方法 if (r >= 0) { setHeadAndPropagate(node, r); // 将以前的头节点的后继指针设置为NULL,help gc p.next = null; // 若是获取了同步资源的线程被设置了中断标识,那么调用selfInterrupt()方法,再次为线程设置一个中断标识 if (interrupted) selfInterrupt(); failed = false; return; } } // 若是节点的前驱节点不是头节点,或者尝试获取同步资源失败,那么将会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 若是在执行该方法的过程当中,抛出了异常(线程超时等等),则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。 if (failed) cancelAcquire(node); } }
总结:doAcquireShared()方法用于将当前线程封装成Node节点而后加入到等待队列当中,而后经过自旋获取同步资源,同时该方法的方法出口只有一个,也就是当节点的前驱节点是头节点,同时尝试获取同步资源成功,那么就会调用setHeadAndPropagate()方法,不然将会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程,等待被唤醒,同时在执行doAcquireShared()方法的过程当中,若是抛出了异常,则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。
private void setHeadAndPropagate(Node node, int propagate) { // 获取头节点 Node h = head; // 将当前节点设置为头节点 setHead(node); //若是线程获取了同步资源后,仍然有剩余的可用资源(正常状况),或没有剩余的可用资源但旧的和新的头节点的等待状态为PROPAGATE时(说明实际存在可用资源),那么将会调用doReleaseShared()方法 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 排除等待队列中不为共享模式的节点 doReleaseShared(); } }
总结:setHeadAndPropagate()方法用于将当前节点设置为头节点,同时若是当线程获取了同步资源后,仍然有剩余的可用资源(正常状况),或没有剩余的可用资源但旧的和新的头节点的等待状态为PROPAGATE时(说明实际存在可用资源),那么将会调用doReleaseShared()方法。
private void doReleaseShared() { // 使用死循环来保证CAS操做最终确定成功 for (;;) { // 获取头节点 Node h = head; // 若是head指针和tail指针不是指向同一个节点,说明头节点确定存在后继节点(使用head != tail能够避免头节点存在后继节点可是头节点的后继指针又为NULL的状况) if (h != null && h != tail) { // 获取头节点的等待状态,若是等待状态为SIGNAL,则经过CAS将头节点的等待状态设置为0(重置),而后唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // 若是头节点的等待状态为0,则经过CAS将头节点的等待状态设置为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) // 若是执行完以上步骤后,h指针指向的头节点仍然为当前的头节点,则退出循环,完成释放过程,而后作本身的事情 break; } }
总结:当等待队列中的线程获取了同步资源后,仍然有剩余的可用资源,或没有剩余的可用资源但旧的和新的头节点的等待状态为PROPAGATE,或者当线程释放同步资源这两种状况,都会调用doReleaseShared()方法,该方法使用死循环来保证CAS操做最终确定成功,若是头节点存在后继节点,同时头节点的等待状态为SIGNAL时,那么将会经过CAS将头节点的等待状态设置为0(重置),而后唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,若是判断到头节点的等待状态为0,那么将会经过CAS将节点的等待状态设置为PROPAGATE,表示须要传播下去。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
总结:当获取了同步资源的线程释放同步资源时,将会调用releaseShared()方法,releaseShared()方法中会调用AQS子类的tryReleaseShared()方法,尝试释放同步资源,若是释放同步资源成功,则会调用doReleaseShared()方法,唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点。
1.当线程要获取同步资源时,能够调用acquireShared()或tryAcquireShared()方法,acquireShared()方法中会调用AQS子类的tryAcquireShared()方法,尝试获取同步资源,若是获取同步资源成功,则直接返回,作本身的事情,不然将会调用doAcquireShared()方法。
2.doAcquireShared()方法用于将当前线程封装成Node节点而后加入到等待队列当中,而后经过自旋获取同步资源,同时该方法的方法出口只有一个,也就是当节点的前驱节点是头节点,同时尝试获取同步资源成功,那么就会调用setHeadAndPropagate()方法,不然将会调用shouldParkAfterFailedAcquire()方法,判断线程可否进行阻塞,当线程可以被阻塞时,将会调用parkAndCheckInterrupt()方法阻塞线程,等待被唤醒,同时在执行doAcquireShared()方法的过程当中,若是抛出了异常,则failed标识为true,那么将会执行cancelAcquire()方法,将当前节点的等待状态设置为CANCELLED,同时从等待队列中剔除。
3.setHeadAndPropagate()方法用于将当前节点设置为头节点,同时若是当线程获取了同步资源后,仍然有剩余的可用资源(正常状况),或没有剩余的可用资源但旧的和新的头节点的等待状态为PROPAGATE时(说明实际存在可用资源),那么将会调用doReleaseShared()方法。
4.当等待队列中的线程获取了同步资源后,仍然有剩余的可用资源,或没有剩余的可用资源但旧的和新的头节点的等待状态为PROPAGATE,或者当线程释放同步资源这两种状况,都会调用doReleaseShared()方法,该方法使用死循环来保证CAS操做最终确定成功,若是头节点存在后继节点,同时头节点的等待状态为SIGNAL时,那么将会经过CAS将头节点的等待状态设置为0(重置),而后唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,若是判断到头节点的等待状态为0(表示并发释放同步资源),那么将会经过CAS将节点的等待状态设置为PROPAGATE,表示须要传播下去。
5.当获取了同步资源的线程释放同步资源时,将会调用releaseShared()方法,releaseShared()方法中会调用AQS子类的tryReleaseShared()方法,尝试释放同步资源,若是释放同步资源成功,则会调用doReleaseShared()方法,唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点。
有哪些场景会将节点的等待状态设置为PROPAGATE,以及它的做用是什么?
1.当线程A释放同步资源时,将当前的头节点的等待状态设置为0,而后唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,若是被唤醒的节点获取了同步资源,而后在调用setHeadAndPropagate()方法以前,线程B释放了同步资源,此时判断到头节点的等待状态为0,那么就会将头节点的等待状态设置为PROPAGATE,表示并发释放了同步资源,目前还有可用的同步资源,而后被唤醒的节点在执行setHeadAndPropagate()方法时,若是没有剩余的可用资源,可是判断到旧的头节点的等待状态为PROPAGATE,说明实际存在可用资源,那么会再次调用doReleaseShared()方法,去唤醒后继节点,尝试获取同步资源。
2.若是被唤醒的节点获取了同步资源,在将当前节点设置为头节点以后,线程A和B释放了同步资源,那么就跟场景1同样,线程B会将头节点的等待状态设置为PROPAGATE,而后被唤醒的节点在执行setHeadAndPropagate()方法时,若是没有剩余的可用资源,除了判断旧的头节点的等待状态是否为PROPAGATE之外,还须要判断新的头节点的等待状态是否为PROPAGATE。
场景一和场景二的区别是获取同步资源的线程在设置头节点以前仍是头节点以后。