JUC锁框架_AbstractQueuedSynchronizer详细分析

 

 

AQS是JUC锁框架中最重要的类,经过它来实现独占锁和共享锁的。本章是对AbstractQueuedSynchronizer源码的彻底解析,分为四个部分介绍:java

  1. CLH队列即同步队列:储存着全部等待锁的线程
  2. 独占锁
  3. 共享锁
  4. Condition条件

注: 还有一个AbstractQueuedLongSynchronizer类,它与AQS功能和实现几乎同样,惟一不一样的是AQLS中表明锁被获取次数的成员变量state类型是long长整类型,而AQS中该成员变量是int类型。node

一. CLH队列(线程同步队列)

由于获取锁是有条件的,没有获取锁的线程就要阻塞等待,那么就要存储这些等待的线程。安全

在AQS中咱们使用CLH队列储存这些等待的线程,但它并非直接储存线程,而是储存拥有线程的node节点。因此先介绍重要内部类Node。多线程

1.1 内部类Node

static final class Node { // 共享模式的标记 static final Node SHARED = new Node(); // 独占模式的标记 static final Node EXCLUSIVE = null; // waitStatus变量的值,标志着线程被取消 static final int CANCELLED = 1; // waitStatus变量的值,标志着后继线程(即队列中此节点以后的节点)须要被阻塞.(用于独占锁) static final int SIGNAL = -1; // waitStatus变量的值,标志着线程在Condition条件上等待阻塞.(用于Condition的await等待) static final int CONDITION = -2; // waitStatus变量的值,标志着下一个acquireShared方法线程应该被容许。(用于共享锁) static final int PROPAGATE = -3; // 标记着当前节点的状态,默认状态是0, 小于0的状态都是有特殊做用,大于0的状态表示已取消 volatile int waitStatus; // prev和next实现一个双向链表 volatile Node prev; volatile Node next; // 该节点拥有的线程 volatile Thread thread; // 可能有两种做用:1. 表示下一个在Condition条件上等待的节点 // 2. 表示是共享模式或者独占模式,注意第一种状况节点必定是共享模式 Node nextWaiter; // 是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一个节点prev,若是为null,则抛出NullPointerException异常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 用于建立链表头head,或者共享模式SHARED Node() { } // 使用在addWaiter方法中 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 使用在Condition条件中 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } 

重要的成员属性:app

  1. waitStatus: 表示当前节点的状态,默认状态是0。总共有五个值CANCELLED、SIGNAL、CONDITION、PROPAGATE以及0。
  2. prev和next:记录着当前节点前一个节点和后一个节点的引用。
  3. thread:当前节点拥有的线程。当拥有锁的线程释放锁的时候,可能会调用LockSupport.unpark(thread),唤醒这个被阻塞的线程。
  4. nextWaiter:若是是SHARED,表示当前节点是共享模式,若是是null,当前节点是独占模式,若是是其余值,当前节点也是独占模式,不过这个值也是Condition队列的下一个节点。

注意:经过Node咱们能够实现两个队列,一是经过prev和next实现CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待线程队列(单向队列),后一个咱们在Condition中介绍。框架

1.2 操做CLH队列

1.2.1 存储CLH队列

// CLH队列头 private transient volatile Node head; // CLH队列尾 private transient volatile Node tail; 

1.2.2 设置CLH队列头head

/** * 经过CAS函数设置head值,仅仅在enq方法中调用 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } 

这个方法只在enq方法中调用,经过CAS函数设置head值,保证多线程安全ide

// 从新设置队列头head,它只在acquire系列的方法中调用 private void setHead(Node node) { head = node; // 线程也没有意义了,由于该线程已经获取到锁了 node.thread = null; // 前一个节点已经没有意义了 node.prev = null; } 

这个方法只在acquire系列的方法中调用,从新设置head,表示移除一些等待线程节点。函数

1.2.3 设置CLH队列尾tail

/** * 经过CAS函数设置tail值,仅仅在enq方法中调用 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } 

这个方法只在enq方法中调用,经过CAS函数设置tail值,保证多线程安全oop

1.2.4 将一个节点插入到CLH队列尾

// 向队列尾插入新节点,若是队列没有初始化,就先初始化。返回原先的队列尾节点 private Node enq(final Node node) { for (;;) { Node t = tail; // t为null,表示队列为空,先初始化队列 if (t == null) { // 采用CAS函数即原子操做方式,设置队列头head值。 // 若是成功,再将head值赋值给链表尾tail。若是失败,表示head值已经被其余线程,那么就进入循环下一次 if (compareAndSetHead(new Node())) tail = head; } else { // 新添加的node节点的前一个节点prev指向原来的队列尾tail node.prev = t; // 采用CAS函数即原子操做方式,设置新队列尾tail值。 if (compareAndSetTail(t, node)) { // 设置老的队列尾tail的下一个节点next指向新添加的节点node t.next = node; return t; } } } } 

这个方法向CLH队列尾插入一个新节点,若是队列为空,就先建立队列再插入新节点,返回老的队列尾节点。ui

1.2.5 将当前线程添加到CLH队列尾

// 经过给定的模式mode(独占或者共享)为当前线程建立新节点,并插入队列中 private Node addWaiter(Node mode) { // 为当前线程建立新的节点 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若是队列已经建立,就将新节点插入队列尾。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是队列没有建立,经过enq方法建立队列,并插入新的节点。 enq(node); return node; } 

为当前线程建立一个新节点,再插入到CLH队列尾,返回新建立的节点。

二. 独占锁

咱们先想想独占锁的功能是什么?

独占锁至少有两个功能:

  1. 获取锁的功能。 当多个线程一块儿获取锁的时候,只有一个线程能获取到锁,其余线程必须在当前位置阻塞等待。
  2. 释放锁的功能。获取锁的线程释放锁资源,并且还必须能唤醒正在等待锁资源的一个线程。
    带着这些疑惑,咱们来看AQS中是怎么实现的。

2.1 获取独占锁的方法

2.1.1 acquire方法

/** * 获取独占锁。若是没有获取到,线程就会阻塞等待,直到获取锁。不会响应中断异常 * @param arg */ public final void acquire(int arg) { // 1. 先调用tryAcquire方法,尝试获取独占锁,返回true,表示获取到锁,不须要执行acquireQueued方法。 // 2. 调用acquireQueued方法,先调用addWaiter方法为当前线程建立一个节点node,并插入队列中, // 而后调用acquireQueued方法去获取锁,若是不成功,就会让当前线程阻塞,当锁释放时才会被唤醒。 // acquireQueued方法返回值表示在线程等待过程当中,是否有另外一个线程调用该线程的interrupt方法,发起中断。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 

将这个方法一下格式,你们就很好理解了

public final void acquire(int arg) { // 1.先调用tryAcquire方法,尝试获取独占锁,返回true则直接返回 if (tryAcquire(arg)) return; // 2. 调用addWaiter方法为当前线程建立一个节点node,并插入队列中 Node node = addWaiter(Node.EXCLUSIVE); // 调用acquireQueued方法去获取锁, // acquireQueued方法返回值表示在线程等待过程当中,是否有另外一个线程调用该线程的interrupt方法,发起中断。 boolean interrupted = acquireQueued(node, arg); // 若是interrupted为true,则当前线程要发起中断请求 if (interrupted) { selfInterrupt(); } } 

2.1.2 tryAcquire方法

// 尝试去获取独占锁,当即返回。若是返回true表示获取锁成功。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 

若是子类想实现独占锁,则必须重写这个方法,不然抛出异常。这个方法的做用是当前线程尝试获取锁,若是获取到锁,就会返回true,并更改锁资源。没有获取到锁返回false。

注:这个方法是当即返回的,不会阻塞当前线程

下面是ReentrantLock中FairSync的tryAcquire方法实现

// 尝试获取锁,与非公平锁最大的不一样就是调用hasQueuedPredecessors()方法 // hasQueuedPredecessors方法返回true,表示等待线程队列中有一个线程在当前线程以前, // 根据公平锁的规则,当前线程不能获取锁。 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取锁的记录状态 int c = getState(); // 若是c==0表示当前锁是空闲的 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 判断当前线程是否是独占锁的线程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 更改锁的记录状态 setState(nextc); return true; } return false; } 

2.1.3 acquireQueued方法

addWaiter方法已经在上面讲解了。acquireQueued方法做用就是获取锁,若是没有获取到,就让当前线程阻塞等待。

/** * 想要获取锁的 acquire系列方法,都会这个方法来获取锁 * 循环经过tryAcquire方法不断去获取锁,若是没有获取成功, * 就有可能调用parkAndCheckInterrupt方法,让当前线程阻塞 * @param node 想要获取锁的节点 * @param arg * @return 返回true,表示在线程等待的过程当中,线程被中断了 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 表示线程在等待过程当中,是否被中断了 boolean interrupted = false; // 经过死循环,直到node节点的线程获取到锁,才返回 for (;;) { // 获取node的前一个节点 final Node p = node.predecessor(); // 若是前一个节点是队列头head,而且尝试获取锁成功 // 那么当前线程就不须要阻塞等待,继续执行 if (p == head && tryAcquire(arg)) { // 将节点node设置为新的队列头 setHead(node); // help GC p.next = null; // 不须要调用cancelAcquire方法 failed = false; return interrupted; } // 当p节点的状态是Node.SIGNAL时,就会调用parkAndCheckInterrupt方法,阻塞node线程 // node线程被阻塞,有两种方式唤醒, // 1.是在unparkSuccessor(Node node)方法,会唤醒被阻塞的node线程,返回false // 2.node线程被调用了interrupt方法,线程被唤醒,返回true // 在这里只是简单地将interrupted = true,没有跳出for的死循环,继续尝试获取锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed为true,表示发生异常,非正常退出 // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了。 if (failed) cancelAcquire(node); } } 

主要流程:

  1. 经过for (;;)死循环,直到node节点的线程获取到锁,才跳出循环。
  2. 获取node节点的前一个节点p。
  3. 当前一个节点p时CLH队列头节点时,调用tryAcquire方法尝试去获取锁,若是获取成功,就将节点node设置成CLH队列头节点(至关于移除节点node和以前的节点)而后return返回。
    注意:只有当node节点的前一个节点是队列头节点时,才会尝试获取锁,因此获取锁是有顺序的,按照添加到CLH队列时的顺序。
  4. 调用shouldParkAfterFailedAcquire方法,来决定是否要阻塞当前线程。
  5. 调用parkAndCheckInterrupt方法,阻塞当前线程。
  6. 若是当前线程发生异常,非正常退出,那么会在finally模块中调用cancelAcquire(node)方法,取消当前节点状态。

注意:这里当尝试获取锁失败时,并无当即阻塞当前线程,可是由于在for (;;)死循环里,会继续循环,方法不会返回。

2.1.4 shouldParkAfterFailedAcquire方法

这个方法的返回值决定是否要阻塞当前线程

/** * 根据前一个节点pred的状态,来判断当前线程是否应该被阻塞 * @param pred : node节点的前一个节点 * @param node * @return 返回true 表示当前线程应该被阻塞,以后应该会调用parkAndCheckInterrupt方法来阻塞当前线程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 若是前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞 return true; if (ws > 0) { // 若是前一个节点状态是Node.CANCELLED(大于0就是CANCELLED), // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。 // 因此从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev, // 表示node节点的前一个节点已经改变。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 此时前一个节点pred的状态只能是0或者PROPAGATE,不多是CONDITION状态 // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点) // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 

咱们发现是根据前一个节点的状态,来决定是否阻塞当前线程。而前一个节点状态是在哪里改变的呢?惊奇地发现也是在这个方法中改变的。

  1. 若是前一个节点状态是Node.SIGNAL,那么直接返回true,阻塞当前线程
  2. 若是前一个节点状态是Node.CANCELLED(大于0就是CANCELLED),表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。因此从pred节点一直向前查找直到找到不是CANCELLED状态的节点。
    并把它赋值给node.prev,表示node节点的前一个节点已经改变。在acquireQueued方法中进行下一次循环。
  3. 不是前面两种状态,那么就将前一个节点状态设置成Node.SIGNAL,表示须要阻塞当前线程,这样再下一次循环时,就会直接阻塞当前线程。

2.1.5 parkAndCheckInterrupt 方法

阻塞当前线程,线程被唤醒后返回当前线程中断状态

/** * 阻塞当前线程,线程被唤醒后返回当前线程中断状态 */ private final boolean parkAndCheckInterrupt() { // 经过LockSupport.park方法,阻塞当前线程 LockSupport.park(this); // 当前线程被唤醒后,返回当前线程中断状态 return Thread.interrupted(); } 

经过LockSupport.park(this)阻塞当前线程。

2.1.6 cancelAcquire方法

将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了。

// 将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了。 private void cancelAcquire(Node node) { // 若是node为null,就直接返回 if (node == null) return; // node.thread = null; // 跳过那些已取消的节点,在队列中找到在node节点前面的第一次状态不是已取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 记录pred原来的下一个节点,用于CAS函数更新时使用 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将node节点状态设置为已取消Node.CANCELLED; node.waitStatus = Node.CANCELLED; // 若是node节点是队列尾节点,那么就将pred节点设置为新的队列尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 而且设置pred节点的下一个节点next为null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } 

2.1.7 小结

  1. tryAcquire方法尝试获取独占锁,由子类实现
  2. acquireQueued方法,方法内采用for死循环,先调用tryAcquire方法,尝试获取锁,若是成功,则跳出循环方法返回。 若是失败,就可能阻塞当前线程。当别的线程锁释放的时候,可能会唤醒这个线程,而后再次进行循环判断,调用tryAcquire方法,尝试获取锁。
  3. 若是发生异常,该线程被唤醒,因此要取消节点node的状态,由于节点node所在线程不是在阻塞状态了。

注:还有其余获取独占锁的方法,例如doAcquireInterruptibly、doAcquireNanos都在文章最后的源码解析中,这里就不作解析了,具体原理都差很少。

2.2 释放独占锁的方法

2.2.1 release方法

// 在独占锁模式下,释放锁的操做 public final boolean release(int arg) { // 调用tryRelease方法,尝试去释放锁,由子类具体实现 if (tryRelease(arg)) { Node h = head; // 若是队列头节点的状态不是0,那么队列中就可能存在须要唤醒的等待节点。 // 还记得咱们在acquireQueued(final Node node, int arg)获取锁的方法中,若是节点node没有获取到锁, // 那么咱们会将节点node的前一个节点状态设置为Node.SIGNAL,而后调用parkAndCheckInterrupt方法 // 将节点node所在线程阻塞。 // 在这里就是经过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 
  1. 调用tryRelease方法释放锁资源,返回true表示锁资源彻底释放了,返回false表示还持有锁资源。
  2. 若是锁资源彻底被释放了,就要唤醒等待锁资源的线程。调用unparkSuccessor方法唤醒一个等待线程
    注:CLH队列头节点h为null,表示队列为空,没有节点。节点h的状态是0,表示没有CLH队列中没有被阻塞的线程。

2.2.2 tryRelease方法

// 尝试去释放当前线程持有的独占锁,当即返回。若是返回true表示释放锁成功 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } 

若是子类想实现独占锁,则必须重写这个方法,不然抛出异常。做用是释放当前线程持有的锁,返回true表示已经彻底释放锁资源,返回false,表示还持有锁资源。

注:对于独占锁来讲,同一时间只能有一个线程持有这个锁,可是这个线程能够重复地获取锁,由于被锁住的模块,再次进入另外一个被这个锁锁住的模块,是容许的。这个就作可重入性,因此对于可重入的锁释放操做,也须要屡次。

下面是ReentrantLock中Sync的tryRelease方法实现

protected final boolean tryRelease(int releases) { // c表示新的锁的记录状态 int c = getState() - releases; // 若是当前线程不是独占锁的线程,就抛出IllegalMonitorStateException异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 标志是否能够释放锁 boolean free = false; // 当新的锁的记录状态为0时,表示能够释放锁 if (c == 0) { free = true; // 设置独占锁的线程为null setExclusiveOwnerThread(null); } setState(c); return free; } 

2.2.3 unparkSuccessor方法

// 唤醒node节点的下一个非取消状态的节点所在线程(即waitStatus<=0) private void unparkSuccessor(Node node) { // 获取node节点的状态 int ws = node.waitStatus; // 若是小于0,就将状态从新设置为0,表示这个node节点已经完成了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下一个节点 Node s = node.next; // 若是下一个节点为null,或者状态是已取消,那么就要寻找下一个非取消状态的节点 if (s == null || s.waitStatus > 0) { // 先将s设置为null,s不是非取消状态的节点 s = null; // 从队列尾向前遍历,直到遍历到node节点 for (Node t = tail; t != null && t != node; t = t.prev) // 由于是从后向前遍历,因此不断覆盖找到的值,这样才能获得node节点后下一个非取消状态的节点 if (t.waitStatus <= 0) s = t; } // 若是s不为null,表示存在非取消状态的节点。那么调用LockSupport.unpark方法,唤醒这个节点的线程 if (s != null) LockSupport.unpark(s.thread); } 

这个方法的做用是唤醒node节点的下一个非取消状态的节点所在线程。

  1. 将node节点的状态设置为0
  2. 寻找到下一个非取消状态的节点s
  3. 若是节点s不为null,则调用LockSupport.unpark(s.thread)方法唤醒s所在线程。
    注:唤醒线程也是有顺序的,就是添加到CLH队列线程的顺序。

2.2.4 小结

  1. 调用tryRelease方法去释放当前持有的锁资源。
  2. 若是彻底释放了锁资源,那么就调用unparkSuccessor方法,去唤醒一个等待锁的线程。

三. 共享锁

共享锁与独占锁相比,共享锁可能被多个线程共同持有

3.1 获取共享锁的方法

3.1.1 acquireShared方法

// 获取共享锁 public final void acquireShared(int arg) { // 尝试去获取共享锁,若是返回值小于0表示获取共享锁失败 if (tryAcquireShared(arg) < 0) // 调用doAcquireShared方法去获取共享锁 doAcquireShared(arg); } 

调用tryAcquireShared方法尝试获取共享锁,若是返回值小于0表示获取共享锁失败.则继续调用doAcquireShared方法获取共享锁。

3.1.2 tryAcquireShared方法

// 尝试去获取共享锁,当即返回。返回值大于等于0,表示获取共享锁成功 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } 

若是子类想实现共享锁,则必须重写这个方法,不然抛出异常。做用是尝试获取共享锁,返回值大于等于0,表示获取共享锁成功。

下面是ReentrantReadWriteLock中Sync的tryReleaseShared方法实现,这个咱们会在ReentrantReadWriteLock章节中重点介绍的。

protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 当前线程是第一个获取读锁(共享锁)的线程 if (firstReader == current) { // 将firstReaderHoldCount减一,若是就是1,那么表示该线程须要释放读锁(共享锁), // 将firstReader设置为null if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; // 获取当前线程的HoldCounter变量 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); // 将rh变量的count减一, int count = rh.count; if (count <= 1) { readHolds.remove(); // count <= 0表示当前线程就没有获取到读锁(共享锁),这里释放就抛出异常。 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // 由于读锁是利用高16位储存的,低16位的数据是要屏蔽的, // 因此这里减去SHARED_UNIT(65536),至关于减一 // 表示一个读锁已经释放 int nextc = c - SHARED_UNIT; // 利用CAS函数从新设置state值 if (compareAndSetState(c, nextc)) return nextc == 0; } } 

3.1.3 doAcquireShared方法

/** * 获取共享锁,获取失败,则会阻塞当前线程,直到获取共享锁返回 * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 为当前线程建立共享锁节点node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若是节点node前一个节点是同步队列头节点。就会调用tryAcquireShared方法尝试获取共享锁 if (p == head) { int r = tryAcquireShared(arg); // 若是返回值大于0,表示获取共享锁成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是节点p的状态是Node.SIGNAL,就是调用parkAndCheckInterrupt方法阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了 if (failed) cancelAcquire(node); } } 

这个方法与独占锁的acquireQueued方法相比较,不一样的有三点:

  1. doAcquireShared方法,调用addWaiter(Node.SHARED)方法,为当前线程建立一个共享模式的节点node。而acquireQueued方法是由外部传递来的。
  2. doAcquireShared方法没有返回值,acquireQueued方法会返回布尔类型的值,是当前线程中断标志位值
  3. 最大的区别是从新设置CLH队列头的方法不同。doAcquireShared方法调用setHeadAndPropagate方法,而acquireQueued方法调用setHead方法。

3.1.4 setHeadAndPropagate方法

// 从新设置CLH队列头,若是CLH队列头的下一个节点为null或者共享模式, // 那么就要唤醒共享锁上等待的线程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 设置新的同步队列头head setHead(node); // 若是propagate大于0, if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取新的CLH队列头的下一个节点s Node s = node.next; // 若是节点s是空或者共享模式节点,那么就要唤醒共享锁上等待的线程 if (s == null || s.isShared()) doReleaseShared(); } } 

3.2 释放共享锁的方法

3.2.1 releaseShared方法

// 释放共享锁 public final boolean releaseShared(int arg) { // 尝试释放共享锁 if (tryReleaseShared(arg)) { // 唤醒等待共享锁的线程 doReleaseShared(); return true; } return false; } 

3.2.2 tryReleaseShared方法

// 尝试去释放共享锁 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } 

若是子类想实现共享锁,则必须重写这个方法,不然抛出异常。做用是释放当前线程持有的锁,返回true表示已经彻底释放锁资源,返回false,表示还持有锁资源。

3.2.3 doReleaseShared方法

// 会唤醒等待共享锁的线程 private void doReleaseShared() { for (;;) { // 将同步队列头赋值给节点h Node h = head; // 若是节点h不为null,且不等于同步队列尾 if (h != null && h != tail) { // 获得节点h的状态 int ws = h.waitStatus; // 若是状态是Node.SIGNAL,就要唤醒节点h后继节点的线程 if (ws == Node.SIGNAL) { // 将节点h的状态设置成0,若是设置失败,就继续循环,再试一次。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒节点h后继节点的线程 unparkSuccessor(h); } // 若是节点h的状态是0,就设置ws的状态是PROPAGATE。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是同步队列头head节点发生改变,继续循环, // 若是没有改变,就跳出循环 if (h == head) break; } } 

四. Condition条件

Condition是为了实现线程之间相互等待的问题。注意Condition对象只能在独占锁中才能使用。

考虑一下状况,有两个线程,生产者线程,消费者线程。当消费者线程消费东西时,发现没有东西,这时它就要等待,让生产者线程生产东西后,在通知它消费。
由于操做的是同一个资源,因此要加锁,防止多线程冲突。而锁在同一时间只能有一个线程持有,因此消费者在让线程等待前,必须释放锁,且唤醒另外一个等待锁的线程。
那么在AQS中Condition条件又是如何实现的呢?

  1. 首先内部存在一个Condition队列,存储着全部在此Condition条件等待的线程。
  2. await系列方法:让当前持有锁的线程释放锁,并唤醒一个在CLH队列上等待锁的线程,再为当前线程建立一个node节点,插入到Condition队列(注意不是插入到CLH队列中)
  3. signal系列方法:其实这里没有唤醒任何线程,而是将Condition队列上的等待节点插入到CLH队列中,因此当持有锁的线程执行完毕释放锁时,就会唤醒CLH队列中的一个线程,这个时候才会唤醒线程。

4.1 await系列方法

4.1.1 await方法

/** * 让当前持有锁的线程阻塞等待,并释放锁。若是有中断请求,则抛出InterruptedException异常 * @throws InterruptedException */ public final void await() throws InterruptedException { // 若是当前线程中断标志位是true,就抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); // 为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了 Node node = addConditionWaiter(); // 释放当前线程占有的锁,并唤醒CLH队列一个等待线程 int savedState = fullyRelease(node); int interruptMode = 0; // 若是节点node不在同步队列中(注意不是Condition队列) while (!isOnSyncQueue(node)) { // 阻塞当前线程,那么怎么唤醒这个线程呢? // 首先咱们必须调用signal或者signalAll将这个节点node加入到同步队列。 // 只有这样unparkSuccessor(Node node)方法,才有可能唤醒被阻塞的线程 LockSupport.park(this); // 若是当前线程产生中断请求,就跳出循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 若是节点node已经在同步队列中了,获取同步锁,只有获得锁才能继续执行,不然线程继续阻塞等待 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition队列中状态不是Node.CONDITION的节点 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要抛出异常,或者发出中断请求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 

方法流程:

  1. addConditionWaiter方法:为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了
  2. fullyRelease方法:释放当前线程占有的锁,并唤醒CLH队列一个等待线程
  3. isOnSyncQueue 方法:若是返回false,表示节点node不在CLH队列中,即没有调用过 signal系列方法,因此调用LockSupport.park(this)方法阻塞当前线程。
  4. 若是跳出while循环,表示节点node已经在CLH队列中,那么调用acquireQueued方法去获取锁。
  5. 清除Condition队列中状态不是Node.CONDITION的节点

4.1.2 addConditionWaiter方法

为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了

private Node addConditionWaiter() { Node t = lastWaiter; // 若是Condition队列尾节点的状态不是Node.CONDITION if (t != null && t.waitStatus != Node.CONDITION) { // 清除Condition队列中,状态不是Node.CONDITION的节点, // 而且可能会从新设置firstWaiter和lastWaiter unlinkCancelledWaiters(); // 从新将Condition队列尾赋值给t t = lastWaiter; } // 为当前线程建立一个状态为Node.CONDITION的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是t为null,表示Condition队列为空,将node节点赋值给链表头 if (t == null) firstWaiter = node; else // 将新节点node插入到Condition队列尾 t.nextWaiter = node; // 将新节点node设置为新的Condition队列尾 lastWaiter = node; return node; } 

4.1.3 fullyRelease方法

释放当前线程占有的锁,并唤醒CLH队列一个等待线程

/** * 释放当前线程占有的锁,并唤醒CLH队列一个等待线程 * 若是失败就抛出异常,设置node节点的状态是Node.CANCELLED * @return */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 释放当前线程占有的锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } 

4.1.4 isOnSyncQueue

节点node是否是在CLH队列中

// 节点node是否是在CLH队列中 final boolean isOnSyncQueue(Node node) { // 若是node的状态是Node.CONDITION,或者node没有前一个节点prev, // 那么返回false,节点node不在同步队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node有下一个节点next,那么它必定在同步队列中 if (node.next != null) // If has successor, it must be on queue return true; // 从同步队列中查找节点node return findNodeFromTail(node); } // 在同步队列中从后向前查找节点node,若是找到返回true,不然返回false private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 

4.1.5 acquireQueued方法

获取独占锁,这个在独占锁章节已经说过

4.1.6 unlinkCancelledWaiters 方法

清除Condition队列中状态不是Node.CONDITION的节点

private void unlinkCancelledWaiters() { // condition队列头赋值给t Node t = firstWaiter; // 这个trail节点,只是起辅助做用 Node trail = null; while (t != null) { //获得下一个节点next。当节点是condition时候,nextWaiter表示condition队列的下一个节点 Node next = t.nextWaiter; // 若是节点t的状态不是CONDITION,那么该节点就要从condition队列中移除 if (t.waitStatus != Node.CONDITION) { // 将节点t的nextWaiter设置为null t.nextWaiter = null; // 若是trail为null,表示原先的condition队列头节点实效,须要设置新的condition队列头 if (trail == null) firstWaiter = next; else // 将节点t从condition队列中移除,由于改变了引用的指向,从condition队列中已经找不到节点t了 trail.nextWaiter = next; // 若是next为null,表示原先的condition队列尾节点也实效,从新设置队列尾节点 if (next == null) lastWaiter = trail; } else // 遍历到的有效节点 trail = t; // 将next赋值给t,遍历完整个condition队列 t = next; } } 

4.1.7 reportInterruptAfterWait方法

/** * 若是interruptMode是THROW_IE,就抛出InterruptedException异常 * 若是interruptMode是REINTERRUPT,则当前线程再发出中断请求 * 不然就什么都不作 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } 

4.2 signal系列方法

4.2.1 signal方法

// 若是condition队列不为空,将condition队列头节点插入到同步队列中 public final void signal() { // 若是当前线程不是独占锁线程,就抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 将Condition队列头赋值给节点first Node first = firstWaiter; if (first != null) // 将Condition队列中的first节点插入到CLH队列中 doSignal(first); } 

若是condition队列不为空,就调用doSignal方法将condition队列头节点插入到CLH队列中。

4.2.2 doSignal方法

// 将Condition队列中的first节点插入到CLH队列中 private void doSignal(Node first) { do { // 原先的Condition队列头节点取消,因此从新赋值Condition队列头节点 // 若是新的Condition队列头节点为null,表示Condition队列为空了 // ,因此也要设置Condition队列尾lastWaiter为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 取消first节点nextWaiter引用 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } 

为何使用while循环,由于只有是Node.CONDITION状态的节点才能插入CLH队列,若是不是这个状态,那么循环Condition队列下一个节点。

4.2.3 transferForSignal方法

// 返回true表示节点node插入到同步队列中,返回false表示节点node没有插入到同步队列中 final boolean transferForSignal(Node node) { // 若是节点node的状态不是Node.CONDITION,或者更新状态失败, // 说明该node节点已经插入到同步队列中,因此直接返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点 Node p = enq(node); int ws = p.waitStatus; // 若是前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。 // 就说明节点p以后也不会发起唤醒下一个node节点线程的操做, // 因此这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } 
  1. 状态不是 Node.CONDITION的节点,是不能从Condition队列中插入到CLH队列中。直接返回false
  2. 调用enq方法,将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点
  3. 若是前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。那么就要LockSupport.unpark(node.thread)方法唤醒node节点所在线程。

4.2.4 signalAll 方法

// 将condition队列中全部的节点都插入到同步队列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } 

4.2.5 doSignalAll方法

/** * 将condition队列中全部的节点都插入到同步队列中 * @param first condition队列头节点 */ private void doSignalAll(Node first) { // 表示将condition队列设置为空 lastWaiter = firstWaiter = null; do { // 获得condition队列的下一个节点 Node next = first.nextWaiter; first.nextWaiter = null; // 将节点first插入到同步队列中 transferForSignal(first); first = next; // 循环遍历condition队列中全部的节点 } while (first != null); } 

循环遍历整个condition队列,调用transferForSignal方法,将节点插入到CLH队列中。

4.3 小结

Condition只能使用在独占锁中。它内部有一个Condition队列记录全部在Condition条件等待的线程(即就是调用await系列方法后等待的线程).
await系列方法:会让当前线程释放持有的锁,并唤醒在CLH队列上的一个等待锁的线程,再将当前线程插入到Condition队列中(注意不是CLH队列)
signal系列方法:并非唤醒线程,而是将Condition队列中的节点插入到CLH队列中。

总结

使用AQS类来实现独占锁和共享锁:

  1. 内部有一个CLH队列,用来记录全部等待锁的线程
  2. 经过 acquire系列方法用来获取独占锁,获取失败,则阻塞当前线程
  3. 经过release方法用来释放独占锁,释放成功,则会唤醒一个等待独占锁的线程。
  4. 经过acquireShared系列方法用来获取共享锁。
  5. 经过releaseShared方法用来释放共享锁。
  6. 经过Condition来实现线程之间相互等待的。

示例

import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; // 实现一个可重入的独占锁, 必须复写tryAcquire和tryRelease方法 class Sync extends AbstractQueuedSynchronizer { // 尝试获取独占锁, 利用锁的获取次数state属性 @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取锁的记录状态state int c = getState(); // 若是c==0表示当前锁是空闲的 if (c == 0) { // 经过CAS原子操做方式设置锁的状态,若是为true,表示当前线程获取的锁, // 为false,锁的状态被其余线程更改,当前线程获取的锁失败 if (compareAndSetState(0, acquires)) { // 设置当前线程为独占锁的线程 setExclusiveOwnerThread(current); return true; } } // 判断当前线程是否是独占锁的线程,由于是可重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 更改锁的记录状态 setState(nextc); return true; } return false; } // 释放持有的独占锁, 由于是可重入锁,因此只有当c等于0的时候,表示当前持有锁的彻底释放了锁。 @Override protected boolean tryRelease(int releases) { // c表示新的锁的记录状态 int c = getState() - releases; // 若是当前线程不是独占锁的线程,就抛出IllegalMonitorStateException异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 标志是否能够释放锁 boolean free = false; // 当新的锁的记录状态为0时,表示能够释放锁 if (c == 0) { free = true; // 设置独占锁的线程为null setExclusiveOwnerThread(null); } setState(c); return free; } } public class AQSTest { public static void newThread(Sync sync, String name, int time) { new Thread(new Runnable() { @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+" 开始运行,准备获取锁"); // 经过acquire方法,获取锁,若是没有获取到,就等待 sync.acquire(1); try { System.out.println("====线程"+Thread.currentThread().getName()+" 在run方法中获取了锁"); lockAgain(); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println("----线程"+Thread.currentThread().getName()+" 在run方法中释放了锁"); sync.release(1); } } private void lockAgain() { sync.acquire(1); try { System.out.println("====线程"+Thread.currentThread().getName()+" 在lockAgain方法中再次获取了锁"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println("----线程"+Thread.currentThread().getName()+" 在lockAgain方法中释放了锁"); sync.release(1); } } },name).start(); } public static void main(String[] args) { Sync sync = new Sync(); newThread(sync, "t1111", 1000); newThread(sync, "t2222", 1000); newThread(sync, "t3333", 1000); } } 

利用AbstractQueuedSynchronizer简单实现一个可重入的独占锁。

  1. 要实现独占锁,必须重写tryAcquire和tryRelease方法。不然在获取锁和释放锁的时候,会抛出异常。
  2. 直接的调用AQS类的acquire(1)和release(1)方法获取锁和释放锁。

输出结果是

线程t1111 开始运行,准备获取锁
====线程t1111 在run方法中获取了锁
====线程t1111  在lockAgain方法中再次获取了锁
线程t2222 开始运行,准备获取锁
线程t3333 开始运行,准备获取锁
----线程t1111 在lockAgain方法中释放了锁
----线程t1111 在run方法中释放了锁
====线程t2222 在run方法中获取了锁
====线程t2222  在lockAgain方法中再次获取了锁
----线程t2222 在lockAgain方法中释放了锁
----线程t2222 在run方法中释放了锁
====线程t3333 在run方法中获取了锁
====线程t3333  在lockAgain方法中再次获取了锁
----线程t3333 在lockAgain方法中释放了锁
----线程t3333 在run方法中释放了锁

附录

package java.util.concurrent.locks; import sun.misc.Unsafe; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } static final class Node { // 共享模式的标记 static final Node SHARED = new Node(); // 独占模式的标记 static final Node EXCLUSIVE = null; // waitStatus变量的值,标志着线程被取消 static final int CANCELLED = 1; // waitStatus变量的值,标志着后继线程(即队列中此节点以后的节点)须要被阻塞.(用于独占锁) static final int SIGNAL = -1; // waitStatus变量的值,标志着线程在Condition条件上等待阻塞.(用于Condition的await等待) static final int CONDITION = -2; // waitStatus变量的值,标志着下一个acquireShared方法线程应该被容许。(用于共享锁) static final int PROPAGATE = -3; // 标记着当前节点的状态,默认状态是0, 小于0的状态都是有特殊做用,大于0的状态表示已取消 volatile int waitStatus; // prev和next实现一个双向链表 volatile Node prev; volatile Node next; // 该节点拥有的线程 volatile Thread thread; // 可能有两种做用:1. 表示下一个在Condition条件上等待的节点 // 2. 表示是共享模式或者独占模式,注意第一种状况节点必定是共享模式 Node nextWaiter; // 是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一个节点prev,若是为null,则抛出NullPointerException异常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 用于建立链表头head,或者共享模式SHARED Node() { } // 使用在addWaiter方法中 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 使用在Condition条件中 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } // CLH队列头 private transient volatile Node head; // CLH队列尾 private transient volatile Node tail; // 用来记录当前锁被获取的次数,当state==0,表示尚未被任何线程获取 private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // 采用CAS函数。比较并交换函数,它是原子操做函数;即,经过CAS操做的数据都是以原子方式进行的 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Queuing utilities static final long spinForTimeoutThreshold = 1000L; // 向队列尾插入新节点,若是队列没有初始化,就先初始化。返回原先的队列尾节点 private Node enq(final Node node) { for (;;) { Node t = tail; // t为null,表示队列为空,先初始化队列 if (t == null) { // 采用CAS函数即原子操做方式,设置队列头head值。 // 若是成功,再将head值赋值给链表尾tail。若是失败,表示head值已经被其余线程,那么就进入循环下一次 if (compareAndSetHead(new Node())) tail = head; } else { // 新添加的node节点的前一个节点prev指向原来的队列尾tail node.prev = t; // 采用CAS函数即原子操做方式,设置新队列尾tail值。 if (compareAndSetTail(t, node)) { // 设置老的队列尾tail的下一个节点next指向新添加的节点node t.next = node; return t; } } } } // 经过给定的模式mode(独占或者共享)为当前线程建立新节点,并插入队列中 private Node addWaiter(Node mode) { // 为当前线程建立新的节点 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若是队列已经建立,就将新节点插入队列尾。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是队列没有建立,经过enq方法建立队列,并插入新的节点。 enq(node); return node; } // 从新设置队列头head,它只在acquire系列的方法中调用 private void setHead(Node node) { head = node; // 线程也没有意义了,由于该线程已经获取到锁了 node.thread = null; // 前一个节点已经没有意义了 node.prev = null; } // 唤醒node节点的下一个非取消状态的节点所在线程(即waitStatus<=0) private void unparkSuccessor(Node node) { // 获取node节点的状态 int ws = node.waitStatus; // 若是小于0,就将状态从新设置为0,表示这个node节点已经完成了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下一个节点 Node s = node.next; // 若是下一个节点为null,或者状态是已取消,那么就要寻找下一个非取消状态的节点 if (s == null || s.waitStatus > 0) { // 先将s设置为null,s不是非取消状态的节点 s = null; // 从队列尾向前遍历,直到遍历到node节点 for (Node t = tail; t != null && t != node; t = t.prev) // 由于是从后向前遍历,因此不断覆盖找到的值,这样才能获得node节点后下一个非取消状态的节点 if (t.waitStatus <= 0) s = t; } // 若是s不为null,表示存在非取消状态的节点。那么调用LockSupport.unpark方法,唤醒这个节点的线程 if (s != null) LockSupport.unpark(s.thread); } // 会唤醒等待共享锁的线程 private void doReleaseShared() { for (;;) { // 将同步队列头赋值给节点h Node h = head; // 若是节点h不为null,且不等于同步队列尾 if (h != null && h != tail) { // 获得节点h的状态 int ws = h.waitStatus; // 若是状态是Node.SIGNAL,就要唤醒节点h后继节点的线程 if (ws == Node.SIGNAL) { // 将节点h的状态设置成0,若是设置失败,就继续循环,再试一次。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒节点h后继节点的线程 unparkSuccessor(h); } // 若是节点h的状态是0,就设置ws的状态是PROPAGATE。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是同步队列头head节点发生改变,继续循环, // 若是没有改变,就跳出循环 if (h == head) break; } } // 从新设置CLH队列头,若是CLH队列头的下一个节点为null或者共享模式, // 那么就要唤醒共享锁上等待的线程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 设置新的同步队列头head setHead(node); // 若是propagate大于0, if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取新的CLH队列头的下一个节点s Node s = node.next; // 若是节点s是空或者共享模式节点,那么就要唤醒共享锁上等待的线程 if (s == null || s.isShared()) doReleaseShared(); } } // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了。 private void cancelAcquire(Node node) { // 若是node为null,就直接返回 if (node == null) return; // node.thread = null; // 跳过那些已取消的节点,在队列中找到在node节点前面的第一次状态不是已取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 记录pred原来的下一个节点,用于CAS函数更新时使用 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将node节点状态设置为已取消Node.CANCELLED; node.waitStatus = Node.CANCELLED; // 若是node节点是队列尾节点,那么就将pred节点设置为新的队列尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 而且设置pred节点的下一个节点next为null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * 根据前一个节点pred的状态,来判断当前线程是否应该被阻塞 * @param pred : node节点的前一个节点 * @param node * @return 返回true 表示当前线程应该被阻塞,以后应该会调用parkAndCheckInterrupt方法来阻塞当前线程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 若是前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞 return true; if (ws > 0) { // 若是前一个节点状态是Node.CANCELLED(大于0就是CANCELLED), // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。 // 因此从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev, // 表示node节点的前一个节点已经改变。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 此时前一个节点pred的状态只能是0或者PROPAGATE,不多是CONDITION状态 // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点) // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 当前线程发出中断通知 */ static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * 阻塞当前线程,线程被唤醒后返回当前线程中断状态 */ private final boolean parkAndCheckInterrupt() { // 经过LockSupport.park方法,阻塞当前线程 LockSupport.park(this); // 当前线程被唤醒后,返回当前线程中断状态 return Thread.interrupted(); } /** * 想要获取锁的 acquire系列方法,都会这个方法来获取锁 * 循环经过tryAcquire方法不断去获取锁,若是没有获取成功,就有可能调用parkAndCheckInterrupt方法,让当前线程阻塞 * @param node 想要获取锁的节点 * @param arg * @return 返回true,表示在线程等待的过程当中,线程被中断了 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 表示线程在等待过程当中,是否被中断了 boolean interrupted = false; // 经过死循环,直到node节点的线程获取到锁,才返回 for (;;) { // 获取node的前一个节点 final Node p = node.predecessor(); // 若是前一个节点是队列头head,而且尝试获取锁成功 // 那么当前线程就不须要阻塞等待,继续执行 if (p == head && tryAcquire(arg)) { // 将节点node设置为新的队列头 setHead(node); // help GC p.next = null; // 不须要调用cancelAcquire方法 failed = false; return interrupted; } // 当p节点的状态是Node.SIGNAL时,就会调用parkAndCheckInterrupt方法,阻塞node线程 // node线程被阻塞,有两种方式唤醒, // 1.是在unparkSuccessor(Node node)方法,会唤醒被阻塞的node线程,返回false // 2.node线程被调用了interrupt方法,线程被唤醒,返回true // 在这里只是简单地将interrupted = true,没有跳出for的死循环,继续尝试获取锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了。 if (failed) cancelAcquire(node); } } /** * 这个方法与acquireQueued(final Node node, int arg)方法流程几乎同样 * 只不过当parkAndCheckInterrupt返回true时,直接抛出异常。 * @param arg * @throws InterruptedException */ private void doAcquireInterruptibly(int arg) throws InterruptedException { // 为当前线程建立节点node,并插入到队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 经过死循环,直到node节点的线程获取到锁,或者当前线程有中断请求会抛出中断异常 for (;;) { final Node p = node.predecessor(); // 若是前一个节点是队列头head,而且尝试获取锁成功 // 将该节点node设置成队列头head if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 当p节点的状态是Node.SIGNAL时,就会调用parkAndCheckInterrupt方法,阻塞node线程 // node线程被阻塞,有两种方式唤醒, // 1.是在unparkSuccessor(Node node)方法,会唤醒被阻塞的node线程,返回false // 2.node线程被调用了interrupt方法,线程被唤醒,返回true // 在这里若是parkAndCheckInterrupt返回true,就会抛出InterruptedException异常 // 跳出死循环,方法返回 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 尝试在必定的时间nanosTimeout内获取锁,超时了就返回false * */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 为当前线程建立节点node,并插入到队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 若是前一个节点是队列头head,而且尝试获取锁成功 // 将该节点node设置成队列头head if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 计算剩余时间 nanosTimeout = deadline - System.nanoTime(); // 剩余时间小于等于0,就直接返回false,获取锁失败 if (nanosTimeout <= 0L) return false; // 当p节点的状态是Node.SIGNAL时,调用LockSupport.parkNanos阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 当前线程阻塞nanosTimeout时间 LockSupport.parkNanos(this, nanosTimeout); // 若是当前线程中断标志位是true,抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了 if (failed) cancelAcquire(node); } } /** * 获取共享锁,获取失败,则会阻塞当前线程,直到获取共享锁返回 * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 为当前线程建立共享锁节点node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若是节点node前一个节点是同步队列头节点。就会调用tryAcquireShared方法尝试获取共享锁 if (p == head) { int r = tryAcquireShared(arg); // 若是返回值大于0,表示获取共享锁成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是节点p的状态是Node.SIGNAL,就是调用parkAndCheckInterrupt方法阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了 if (failed) cancelAcquire(node); } } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不须要唤醒了 if (failed) cancelAcquire(node); } } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // Main exported methods // 尝试去获取独占锁,当即返回。若是返回true表示获取锁成功。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 尝试去释放当前线程持有的独占锁,当即返回。若是返回true表示释放锁成功 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 尝试去获取共享锁,当即返回。返回值大于等于0,表示获取共享锁成功 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 尝试去释放共享锁 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * 获取独占锁。若是没有获取到,线程就会阻塞等待,直到获取锁。不会响应中断异常 * @param arg */ public final void acquire(int arg) { // 1. 先调用tryAcquire方法,尝试获取独占锁,返回true,表示获取到锁,不须要执行acquireQueued方法。 // 2. 调用acquireQueued方法,先调用addWaiter方法为当前线程建立一个节点node,并插入队列中, // 而后调用acquireQueued方法去获取锁,若是不成功,就会让当前线程阻塞,当锁释放时才会被唤醒。 // acquireQueued方法返回值表示在线程等待过程当中,是否有另外一个线程调用该线程的interrupt方法,发起中断。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // public final void acquire(int arg) { // // 1.先调用tryAcquire方法,尝试获取独占锁,返回true则直接返回 // if (tryAcquire(arg)) return; // // 2. 调用addWaiter方法为当前线程建立一个节点node,并插入队列中 // Node node = addWaiter(Node.EXCLUSIVE); // // 调用acquireQueued方法去获取锁, // // acquireQueued方法返回值表示在线程等待过程当中,是否有另外一个线程调用该线程的interrupt方法,发起中断。 // boolean interrupted = acquireQueued(node, arg); // // 若是interrupted为true,则当前线程要发起中断请求 // if (interrupted) { // selfInterrupt(); // } // } /** * 获取独占锁。若是没有获取到,线程就会阻塞等待,直到获取锁。若是有中断请求就会产生中断异常 * @param arg */ public final void acquireInterruptibly(int arg) throws InterruptedException { // 若是当前线程中断标志位是true,那么直接抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 先调用tryAcquire方法,尝试获取独占锁,返回true,表示获取到锁,不须要执行doAcquireInterruptibly方法。 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * 尝试在必定时间内获取独占锁。若是有中断请求就会产生中断异常。 * 返回true,表示获取锁成功 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是当前线程中断标志位是true,那么直接抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 先调用tryAcquire方法,尝试获取独占锁,返回true,表示获取到锁。 // 不然调用doAcquireNanos方法获取锁 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } // 在独占锁模式下,释放锁的操做 public final boolean release(int arg) { // 调用tryRelease方法,尝试去释放锁,由子类具体实现 if (tryRelease(arg)) { Node h = head; // 若是队列头节点的状态不是0,那么队列中就可能存在须要唤醒的等待节点。 // 还记得咱们在acquireQueued(final Node node, int arg)获取锁的方法中,若是节点node没有获取到锁, // 那么咱们会将节点node的前一个节点状态设置为Node.SIGNAL,而后调用parkAndCheckInterrupt方法 // 将节点node所在线程阻塞。 // 在这里就是经过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 获取共享锁 public final void acquireShared(int arg) { // 尝试去获取共享锁,若是返回值小于0表示获取共享锁失败 if (tryAcquireShared(arg) < 0) // 调用doAcquireShared方法去获取共享锁 doAcquireShared(arg); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } // 释放共享锁 public final boolean releaseShared(int arg) { // 尝试释放共享锁 if (tryReleaseShared(arg)) { // 唤醒等待共享锁的线程 doReleaseShared(); return true; } return false; } // Queue inspection methods public final boolean hasQueuedThreads() { return head != tail; } public final boolean hasContended() { return head != null; } public final Thread getFirstQueuedThread() { // handle only fast path, else relay return (head == tail) ? null : fullGetFirstQueuedThread(); } /** * Version of getFirstQueuedThread called when fastpath fails */ private Thread fullGetFirstQueuedThread() { Node h, s; Thread st; if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st; Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; if (tt != null) firstThread = tt; t = t.prev; } return firstThread; } public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) return true; return false; } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } // 返回false表示队列为null,或者当前线程节点在是队列头的下一个节点。 // 返回true表示有一个线程节点在当前线程节点以前 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } // Instrumentation and monitoring methods public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; } public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } public String toString() { int s = getState(); String q = hasQueuedThreads() ? "non" : ""; return super.toString() + "[State = " + s + ", " + q + "empty queue]"; } // Internal support methods for Conditions // 节点node是否是在CLH队列中 final boolean isOnSyncQueue(Node node) { // 若是node的状态是Node.CONDITION,或者node没有前一个节点prev, // 那么返回false,节点node不在同步队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node有下一个节点next,那么它必定在同步队列中 if (node.next != null) // If has successor, it must be on queue return true; // 从同步队列中查找节点node return findNodeFromTail(node); } // 在同步队列中从后向前查找节点node,若是找到返回true,不然返回false private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } // 返回true表示节点node插入到同步队列中,返回false表示节点node没有插入到同步队列中 final boolean transferForSignal(Node node) { // 若是节点node的状态不是Node.CONDITION,或者更新状态失败, // 说明该node节点已经插入到同步队列中,因此直接返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点 Node p = enq(node); int ws = p.waitStatus; // 若是前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。 // 就说明节点p以后也不会发起唤醒下一个node节点线程的操做, // 因此这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 等待节点node存放到同步队列中,若是是当前线程插入的,返回true,若是是另外一个线程插入的,返回false。 final boolean transferAfterCancelledWait(Node node) { // 当节点node状态仍是Node.CONDITION,改变它的状态是0,而后插入到同步队列中。 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // 若是节点node不在同步队列中,当前线程让出执行权 while (!isOnSyncQueue(node)) Thread.yield(); return false; } /** * 释放当前线程占有的锁,并唤醒CLH队列一个等待线程 * 若是失败就抛出异常,设置node节点的状态是Node.CANCELLED * @return */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 释放当前线程占有的锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // Instrumentation methods for conditions public final boolean owns(ConditionObject condition) { return condition.isOwnedBy(this); } public final boolean hasWaiters(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.hasWaiters(); } public final int getWaitQueueLength(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitQueueLength(); } public final Collection<Thread> getWaitingThreads(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitingThreads(); } public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** Condition队列头. */ private transient Node firstWaiter; /** Condition队列尾. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } // Internal methods /** * 为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了 * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // 若是Condition队列尾节点的状态不是Node.CONDITION if (t != null && t.waitStatus != Node.CONDITION) { // 清除Condition队列中,状态不是Node.CONDITION的节点, // 而且可能会从新设置firstWaiter和lastWaiter unlinkCancelledWaiters(); // 从新将Condition队列尾赋值给t t = lastWaiter; } // 为当前线程建立一个状态为Node.CONDITION的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是t为null,表示Condition队列为空,将node节点赋值给链表头 if (t == null) firstWaiter = node; else // 将新节点node插入到Condition队列尾 t.nextWaiter = node; // 将新节点node设置为新的Condition队列尾 lastWaiter = node; return node; } // 将Condition队列中的first节点插入到CLH队列中 private void doSignal(Node first) { do { // 原先的Condition队列头节点取消,因此从新赋值Condition队列头节点 // 若是新的Condition队列头节点为null,表示Condition队列为空了 // ,因此也要设置Condition队列尾lastWaiter为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 取消first节点nextWaiter引用 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 将condition队列中全部的节点都插入到同步队列中 * @param first condition队列头节点 */ private void doSignalAll(Node first) { // 表示将condition队列设置为空 lastWaiter = firstWaiter = null; do { // 获得condition队列的下一个节点 Node next = first.nextWaiter; first.nextWaiter = null; // 将节点first插入到同步队列中 transferForSignal(first); first = next; // 循环遍历condition队列中全部的节点 } while (first != null); } // 清除Condition队列中状态不是Node.CONDITION的节点 private void unlinkCancelledWaiters() { // condition队列头赋值给t Node t = firstWaiter; // 这个trail节点,只是起辅助做用 Node trail = null; while (t != null) { //获得下一个节点next。当节点是condition时候,nextWaiter表示condition队列的下一个节点 Node next = t.nextWaiter; // 若是节点t的状态不是CONDITION,那么该节点就要从condition队列中移除 if (t.waitStatus != Node.CONDITION) { // 将节点t的nextWaiter设置为null t.nextWaiter = null; // 若是trail为null,表示原先的condition队列头节点实效,须要设置新的condition队列头 if (trail == null) firstWaiter = next; else // 将节点t从condition队列中移除,由于改变了引用的指向,从condition队列中已经找不到节点t了 trail.nextWaiter = next; // 若是next为null,表示原先的condition队列尾节点也实效,从新设置队列尾节点 if (next == null) lastWaiter = trail; } else // 遍历到的有效节点 trail = t; // 将next赋值给t,遍历完整个condition队列 t = next; } } // public methods // 若是condition队列不为空,将condition队列头节点插入到同步队列中 public final void signal() { // 若是当前线程不是独占锁线程,就抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 将Condition队列头赋值给节点first Node first = firstWaiter; if (first != null) // 将Condition队列中的first节点插入到CLH队列中 doSignal(first); } // 将condition队列中全部的节点都插入到同步队列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // 让当前持有锁的线程阻塞等待,并释放锁。若是线程等待期间发出中断请求,不会产生中断异常 public final void awaitUninterruptibly() { // 为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了 Node node = addConditionWaiter(); // 释放当前线程占有的锁,并唤醒其余线程 int savedState = fullyRelease(node); boolean interrupted = false; // 若是节点node不在同步队列中(注意不是Condition队列),就阻塞当前线程 while (!isOnSyncQueue(node)) { // 阻塞当前线程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } // 若是节点node已经在同步队列中了,获取同步锁,只有获得锁才能继续执行,不然线程继续阻塞等待 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * 若是线程没有发起了中断请求,返回0. * 若是线程发起了中断请求,且中断请求在signalled(即调用signal或signalAll)以前返回THROW_IE * 中断请求在signalled以后返回REINTERRUPT */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * 若是interruptMode是THROW_IE,就抛出InterruptedException异常 * 若是interruptMode是REINTERRUPT,则当前线程再发出中断请求 * 不然就什么都不作 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * 让当前持有锁的线程阻塞等待,并释放锁。若是有中断请求,则抛出InterruptedException异常 * @throws InterruptedException */ public final void await() throws InterruptedException { // 若是当前线程中断标志位是true,就抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); // 为当前线程建立新的Node节点,而且将这个节点插入到Condition队列中了 Node node = addConditionWaiter(); // 释放当前线程占有的锁,并唤醒CLH队列一个等待线程 int savedState = fullyRelease(node); int interruptMode = 0; // 若是节点node不在同步队列中(注意不是Condition队列) while (!isOnSyncQueue(node)) { // 阻塞当前线程,那么怎么唤醒这个线程呢? // 首先咱们必须调用signal或者signalAll将这个节点node加入到同步队列。 // 只有这样unparkSuccessor(Node node)方法,才有可能唤醒被阻塞的线程 LockSupport.park(this); // 若是当前线程产生中断请求,就跳出循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 若是节点node已经在同步队列中了,获取同步锁,只有获得锁才能继续执行,不然线程继续阻塞等待 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition队列中状态不是Node.CONDITION的节点 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要抛出异常,或者发出中断请求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } // 获取 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition队列中状态不是Node.CONDITION的节点 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要抛出异常,或者发出中断请求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * 返回true:表示这个condition对象是由sync对象建立的。 */ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * 这个condition对象上是否有等待线程,即condition队列不为空,且有一个节点的状态是 Node.CONDITION */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 遍历condition队列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * 返回condition对象上等待线程的个数,即遍历condition队列,计算节点的状态是Node.CONDITION的个数 */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; // 遍历condition队列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { // 计算节点的状态是Node.CONDITION的个数 if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * 返回condition对象上等待线程的集合。 */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); // 遍历condition队列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { // 当节点的状态是Node.CONDITION,将节点的线程添加到list集合中 if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } } // 支持compareAndSet操做 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * 经过CAS函数设置head值,仅仅在enq方法中调用 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 经过CAS函数设置tail值,仅仅在enq方法中调用 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 经过CAS函数设置node对象的waitStatus值 */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * 经过CAS函数设置node对象的next值 */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } } 
相关文章
相关标签/搜索