JDK 1.5 的 java.util.concurrent.locks 包中都是锁,其中有一个抽象类 AbstractQueuedSynchronizer (抽象队列同步器),也就是 AQS, 咱们今天就来看看该类。java
咱们看看该类的结构,该类被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的内部类所继承,而这些内部类都是这些锁的真正实现,不管是公平锁仍是非公平锁。node
也就是说,这些锁的真正实现都是该类来实现的。那么,咱们就从这些锁开始看看是如何实现从锁到解锁的。less
咱们先看看重入锁 ReentranLock 的 lock 方法。工具
public void lock() {
sync.lock();
}
复制代码
该方法调用了内部类的 sync 抽象类的 lock 方法,该方法的实现有公平锁和非公平锁。咱们看看公平锁是如何实现的:ui
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
复制代码
调用了 acquire 方法,该方法就是 AQS 的的方法,由于 sync 继承了 AQS,而公平锁继承了 Sync,等于间接继承了 AQS,咱们看看该方法。this
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
该方法JDK注释 :spa
以独占模式获取对象,若是被中断则停止。经过先检查中断状态,而后至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。不然在成功以前,或者线程被中断以前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可使用此方法来实现 Lock.lockInterruptibly() 方法。操作系统
楼主来简单说一下该方法的做用:该方法会试图获取锁,若是获取不到,就会被加入等待队列等待被唤醒,这个其实和咱们以前分析的 synchronized 是差很少的。线程
咱们仔细看看该方法,首先是 tryAcquire 方法,也就是尝试获取锁,该方法是须要被写的,父类默认的方法是抛出异常。如何重写呢?抽象类定义一个标准:若是返回 true,表示获取锁成功,反之失败。设计
咱们回到 acquire 方法,若是获取锁成功,就直接返回了,若是失败了,则继续后面的操做,也就是将线程放入等待队列中:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
咱们先看看 addWaiter(Node.EXCLUSIVE) 方法:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
该方法注释:将当前线程放入到队列节点。参数呢?参数有2种,Node.EXCLUSIVE 是独占锁,Node.SHARED 是分享锁。
在 Node 类种定义了这两个常量:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
复制代码
独占锁是null,共享锁是空对象。
咱们看看该方法的步骤:
咱们看看 enq 方法的实现:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
该方法步骤以下:
该方法主要就是初始化头节点和末端节点,并将新的节点追加到末端节点并更新末端节点。
咱们会到 addWaiter 方法中,该方法主要做用就是根据当前线程建立一个 node 对象,并追加到队列的末端。
咱们再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
addWaiter 方法会返回刚刚建立的node 对象,而后调用 acquireQueued 方法,咱们进入该方法查看:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
该方法步骤以下:
咱们看看 shouldParkAfterFailedAcquire 方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
该方法步骤以下:
再看 parkAndCheckInterrupt 方法(挂起并检查是否中断):
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
该方法很是的简单,就是将当前线程挂起,等到有别的线程唤醒(一般是 head 节点中线程),而后返回当前线程是不是被中断了,注意,该方法会清除中断状态。
回到 acquireQueued 方法,总结一下该方法,该方法就是将刚刚建立的线程节点挂起,而后等待唤醒,若是被唤醒了,则将本身设置为 head 节点。最后,返回是否被中断。
再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
在该方法中,若是获取锁失败并被唤醒,且被中断了,那么就执行 selfInterrupt 方法:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
复制代码
将当前线程设置中断状态位。
好了,到这里,整个lock 方法,咱们基本就分析完了,能够说,整个方法就是将线程放入到等待队列并挂起而后等待 head 节点唤醒。其中,tryAcquire 方法高频出现,该方法具体实现由子类实现,好比 重入锁,读写锁,线程池的 worker,其中 CountDown 和 Semaphore 实现的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定义的?就是返回 true 表示拿到锁了,返回 false 表示拿锁失败,具体如何实现AQS管不了。但他们都依赖一个极其重要的字段 ------- state。
楼主有必要说说这个字段,该字段定义了当前同步器的状态,若是你们知道 pv 原语的话,应该很好理解这个字段,该字段在 AQS 中是如何定义的:
/** * The synchronization state. */
private volatile int state;
复制代码
volatile。该字段可能会被多个线程修改,所以,须要设置为 volatile ,保证变量的可见性。
咱们能够看看 重入锁中的公平锁是如何使用该字段的。
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
该方法重写了 tryAcquire 方法,步骤以下:
从这里,咱们能够看到, statei 字段很是的重要,判断锁是否被持有彻底根据这个字段来的。这点必定要注意,而这个设计和操做系统的 pv 由殊途同归之妙。
那么看完了拿锁,再看看解锁,咱们能够先猜测一下如何设计,首先确定是要将 state 字段设置为 0,才能让下个线程拿锁,而后呢?唤醒等待队列中的下个线程。让他尝试拿锁。那到底 doug lea 是否是这么设计的呢?咱们来看看。
该方法调用了AQS 的 release 方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
首先尝试释放,若是成功,则唤醒下一个线程。
咱们先看看 tryRelease 方法 (须要重写):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
该方法步骤以下:
能够看到,若是 state 不是 0 的话,就会返回 false ,后面的步骤就没有了,也就是说,重入锁解锁的时候不会唤醒下一个线程。
若是解锁成功,执行下面的步骤,若是 head 头节点不是 null 而且他的状态不是0,说明有线程能够唤醒,执行 unparkSuccessor 方法。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
该方法步骤以下:
这个时候,等待在 acquireQueued 方法中,准确的说是 parkAndCheckInterrupt 方法中的 线程被唤醒,开始继续循环,尝试拿锁(须要修改 state 变量),并设置本身为 head。
这里还有一个漏掉的地方,就是 waitStatus 变量,何时会大于等于0? 该变量默认是 0,大于 0 的状态是被取消的状态。何时会被取消呢? 在acquireQueued 方法中,若是方法没有正常结束,则会执行 finally 中的 cancelAcquire 方法,该方法会将状态变成 1,也就是取消状态。
此次咱们分析 AQS,也就是锁的的真正实现,只分析了 lock 方法和 unlock 方法,这两个方法是重入锁的基础。CountDown 和 Semaphore 是共享锁,可是基本原理相同,只是将 state 的数字加大即可以实现。而和重入锁等锁相关联的 Condition 则是经过 LockSupport 工具类直接挂起当前线程,并将当前线程添加到等待队列中,当调用 Condition 的 signal 方法时,则唤醒队列中的第一个线程。具体源码咱们有机会再分析。
总之,java 重入锁的实现基于 AQS,而 AQS 主要基于 state 变量和队列来实现。实现原理和 pv原语 相似。
good luck!!!!!