前文(深刻JVM锁机制-synchronized)分析了JVM中的synchronized实现,本文继续分析JVM中的另外一种锁Lock的实现。与synchronized不一样的是,Lock彻底用Java写成,在java这个层面是无关JVM实现的。java
在java.util.concurrent.locks包中有不少Lock的实现类,经常使用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类,实现思路都大同小异,所以咱们以ReentrantLock做为讲解切入点。node
通过观察ReentrantLock把全部Lock接口的操做都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:算法
[java] view plain copy设计模式
static abstract class Sync extends AbstractQueuedSynchronizer 数据结构
Sync又有两个子类:并发
[java] view plain copyapp
final static class NonfairSync extends Sync 函数
[java] view plain copy高并发
final static class FairSync extends Sync 布局
显然是为了支持公平锁和非公平锁而定义,默认状况下为非公平锁。
先理一下Reentrant.lock()方法的调用过程(默认非公平锁):
这些讨厌的Template模式致使很难直观的看到整个调用过程,其实经过上面调用过程及AbstractQueuedSynchronizer的注释能够发现,AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。tryAcquire方法的语义在于用具体子类判断请求线程是否能够得到锁,不管成功与否AbstractQueuedSynchronizer都将处理后面的流程。
简单说来,AbstractQueuedSynchronizer会把全部的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活本身的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程所有处于阻塞状态,通过调查线程的显式阻塞是经过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中经过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
该队列如图:
与synchronized相同的是,这也是一个虚拟队列,不存在队列实例,仅存在节点之间的先后关系。使人疑惑的是为何采用CLH队列呢?原生的CLH队列是用于自旋锁,但Doug Lea把其改造为阻塞锁。
当有线程竞争锁时,该线程会首先尝试得到锁,这对于那些已经在队列中排队的线程来讲显得不公平,这也是非公平锁的由来,与synchronized实现相似,这样会极大提升吞吐量。
若是已经存在Running线程,则新的竞争线程会被追加到队尾,具体是采用基于CAS的Lock-Free算法,由于线程并发对Tail调用CAS可能会致使其余线程CAS失败,解决办法是循环CAS直至成功。AbstractQueuedSynchronizer的实现很是精巧,使人叹为观止,不入细节难以彻底领会其精髓,下面详细说明实现过程:
nonfairTryAcquire方法将是lock方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。
[java] view plain copy
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法会首先判断当前状态,若是c==0说明没有线程正在竞争该锁,若是不c !=0 说明有线程正拥有了该锁。
若是发现c==0,则经过CAS设置该状态值为acquires,acquires的初始调用值为1,每次线程重入该锁都会+1,每次unlock都会-1,但为0时释放锁。若是CAS设置成功,则能够预计其余任何线程调用CAS都不会再成功,也就认为当前线程获得了该锁,也做为Running线程,很显然这个Running线程并未进入等待队列。
若是c !=0 但发现本身已经拥有锁,只是简单地++acquires,并修改status值,但由于没有竞争,因此经过setStatus修改,而非CAS,也就是说这段代码实现了偏向锁的功能,而且实现的很是漂亮。
addWaiter方法负责把当前没法得到锁的线程包装为一个Node添加到队尾:
[java] view plain copy
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其中参数mode是独占锁仍是共享锁,默认为null,独占锁。追加到队尾的动做分两步:
若是当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为Tail
若是当前Tail为null或则线程调用CAS设置队尾失败,则经过enq方法继续设置Tail
下面是enq方法:
[java] view plain copy
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
该方法就是循环调用CAS,即便有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是经过CAS把当前如今追加到队尾,并返回包装后的Node实例。
把线程要包装为Node对象的主要缘由,除了用Node构造供虚拟队列外,还用Node包装了各类线程状态,这些状态被精心设计为一些数字值:
SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要从新这个后继线程(unpark)
CANCELLED(1):由于超时或中断,该线程已经被取消
CONDITION(-2):代表该线程被处于条件队列,就是由于调用了Condition.await而被阻塞
PROPAGATE(-3):传播共享锁
0:0表明无状态
acquireQueued的主要做用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又经过tryAccquire重试是否能得到锁,若是重试成功能则无需阻塞,直接返回
[java] view plain copy
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
仔细看看这个方法是个无限循环,感受若是p == head && tryAcquire(arg)条件不知足循环将永远没法结束,固然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
[java] view plain copy
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。固然也不是立刻把请求不到锁的线程进行阻塞,还要检查该线程的状态,好比若是该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:
[java] view plain copy
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
检查原则在于:
规则1:若是前继的节点状态为SIGNAL,代表当前节点须要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将致使线程阻塞
规则2:若是前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,致使线程阻塞
规则3:若是前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
整体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,若是前继节点处于CANCELLED状态,则顺便删除这些节点从新构造队列。
至此,锁住线程的逻辑已经完成,下面讨论解锁的过程。
请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行之后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程获得解锁,则执行第13行,即设置interrupted = true,以后又进入无限循环。
从无限循环的代码能够看出,并非获得解锁的线程必定能得到锁,必须在第6行中调用tryAccquire从新竞争,由于锁是非公平的,有可能被新加入的线程得到,从而致使刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。经过以后将要介绍的解锁机制会看到,第一个被解锁的线程就是Head,所以p == head的判断基本都会成功。
至此能够看到,把tryAcquire方法延迟到子类中实现的作法很是精妙并具备极强的可扩展性,使人叹为观止!固然精妙的不是这个Templae设计模式,而是Doug Lea对锁结构的精心布局。
解锁代码相对简单,主要体如今AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:
class AbstractQueuedSynchronizer
[java] view plain copy
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
class Sync
[java] view plain copy
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease与tryAcquire语义相同,把如何释放的逻辑延迟到子类中。tryRelease语义很明确:若是线程屡次锁定,则进行屡次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,由于无竞争因此没有使用CAS。
release的语义在于:若是能够释放锁,则唤醒队列第一个线程(Head),具体唤醒代码以下:
[java] view plain copy
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这段代码的意思在于找出第一个能够unpark的线程,通常说来head.next == head,Head就是第一个线程,但Head.next可能被取消或被置为null,所以比较稳妥的办法是从后往前找第一个可用线程。貌似回溯会致使性能下降,其实这个发生的概率很小,因此不会有性能影响。以后即是通知系统内核继续该线程,在Linux下是经过pthread_mutex_unlock完成。以后,被解锁的线程进入上面所说的从新竞争状态。
AbstractQueuedSynchronizer经过构造一个基于阻塞的CLH队列容纳全部的阻塞线程,而对该队列的操做均经过Lock-Free(CAS)操做,但对已经得到锁的线程而言,ReentrantLock实现了偏向锁的功能。
synchronized的底层也是一个基于CAS操做的等待队列,但JVM实现的更精细,把等待队列分为ContentionList和EntryList,目的是为了下降线程的出列速度;固然也实现了偏向锁,从数据结构来讲两者设计没有本质区别。但synchronized还实现了自旋锁,并针对不一样的系统和硬件体系进行了优化,而Lock则彻底依靠系统阻塞挂起等待线程。
固然Lock比synchronized更适合在应用层扩展,能够继承AbstractQueuedSynchronizer定义各类实现,好比实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock对应的Condition也比wait/notify要方便的多、灵活的多。