接上篇,这篇从Lock.lock/unlock开始。特别说明在没有特殊状况下全部程序、API、文档都是基于JDK 6.0的。html
public void java.util.concurrent.locks.ReentrantLock.lock()java
获取锁。node
若是该锁没有被另外一个线程保持,则获取该锁并当即返回,将锁的保持计数设置为 1。算法
若是当前线程已经保持该锁,则将保持计数加 1,而且该方法当即返回。并发
若是该锁被另外一个线程保持,则出于线程调度的目的,禁用当前线程,而且在得到锁以前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。高并发
从上面的文档能够看出ReentrantLock是可重入锁的实现。而内部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()实现的。java.util.concurrent.locks.ReentrantLock.Sync是抽象类,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync两个实现,也就是常说的公平锁和不公平锁。性能
公平锁和非公平锁ui
若是获取一个锁是按照请求的顺序获得的,那么就是公平锁,不然就是非公平锁。.net
在没有深刻了解内部机制及实现以前,先了解下为何会存在公平锁和非公平锁。公平锁保证一个阻塞的线程最终可以得到锁,由于是有序的,因此老是能够按照请求的顺序得到锁。不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提升并发的性能。这是由于一般状况下挂起的线程从新开始与它真正开始运行,两者之间会产生严重的延时。所以非公平锁就能够利用这段时间完成操做。这是非公平锁在某些时候比公平锁性能要好的缘由之一。线程
两者在实现上的区别会在后面介绍,咱们先从公平锁(FairSync)开始。
前面说过java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基础,对于一个FairSync而言,lock()就直接调用AQS的acquire(int arg);
public final void acquire(int arg) 以独占模式获取对象,忽略中断。经过至少调用一次
tryAcquire(int)
来实现此方法,并在成功时返回。不然在成功以前,一直调用tryAcquire(int)
将线程加入队列,线程可能重复被阻塞或不被阻塞。
在介绍实现以前先要补充上一节的知识,对于一个AQS的实现而言,一般状况下须要实现如下方法来描述如何锁定线程。
tryAcquire(int)
试图在独占模式下获取对象状态。此方法应该查询是否容许它在独占模式下获取对象状态,若是容许,则获取它。此方法老是由执行 acquire 的线程来调用。若是此方法报告失败,则 acquire 方法能够将线程加入队列(若是尚未将它加入队列),直到得到其余某个线程释放了该线程的信号。也就是说此方法是一种尝试性方法,若是成功获取锁那最好,若是没有成功也没有关系,直接返回false。
tryRelease(int)
试图设置状态来反映独占模式下的一个释放。 此方法老是由正在执行释放的线程调用。释放锁可能失败或者抛出异常,这个在后面会具体分析。tryAcquireShared(int) 试图在共享模式下获取对象状态。
tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。
isHeldExclusively() 若是对于当前(正调用的)线程,同步是以独占方式进行的,则返回
true
。
除了tryAcquire(int)外,其它方法会在后面具体介绍。首先对于ReentrantLock而言,不论是公平锁仍是非公平锁,都是独占锁,也就是说同时可以有一个线程持有锁。所以对于acquire(int arg)而言,arg==1。在AQS中acquire的实现以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个看起来比较复杂,咱们分解如下4个步骤。
这是一个比较复杂的过程,咱们循序渐进一个一个分析。
tryAcquire(acquires)
对于公平锁而言,它的实现方式以下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
在这段代码中,前面说明对于AQS存在一个state来描述当前有多少线程持有锁。因为AQS支持共享锁(例如读写锁,后面会继续讲),因此这里state>=0,可是因为ReentrantLock是独占锁,因此这里不妨理解为0<=state,acquires=1。isFirst(current)是一个很复杂的逻辑,包括踢出无用的节点等复杂过程,这里暂且不提,大致上的意思是说判断AQS是否为空或者当前线程是否在队列头(为了区分公平与非公平锁)。
比较非公平锁的tryAcquire实现java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平锁多了一个判断当前节点是否在队列头,这个就保证了是否按照请求锁的顺序来决定获取锁的顺序(同一个线程的屡次获取锁除外)。
如今再回头看公平锁和非公平锁的lock()方法。公平锁只有一句acquire(1);而非公平锁的调用以下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很显然,非公平锁在第一次获取锁,或者其它线程释放锁后(可能等待),优先采用compareAndSetState(0,1)而后设置AQS独占线程而持有锁,这样有时候比acquire(1)顺序检查锁持有而要高效。即便在重入锁上,也就是compareAndSetState(0,1)失败,可是是当前线程持有锁上,非公平锁也没有问题。
addWaiter(mode)
tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥做用了。通常状况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空(上一篇中介绍过相关概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量,
static final Node EXCLUSIVE = null; //独占节点模式
static final Node SHARED = new Node(); //共享节点模式
addWaiter(mode)中的mode就是节点模式,也就是共享锁仍是独占锁模式。
前面一再强调ReentrantLock是独占锁模式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上面是节点如队列的一部分。当前仅当队列不为空而且将新节点插入尾部成功后直接返回新节点。不然进入enq(Node)进行操做。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(Node)去队列操做实现了CHL队列的算法,若是为空就建立头结点,而后同时比较节点尾部是不是改变来决定CAS操做是否成功,当且仅当成功后才将为不节点的下一个节点指向为新节点。能够看到这里仍然是CAS操做。
acquireQueued(node,arg)
自旋请求锁,若是可能的话挂起线程,直到获得锁,返回当前线程是否中断过(若是park()过而且中断过的话有一个interrupted中断位)。
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
下面的分析就须要用到上节节点的状态描述了。acquireQueued过程是这样的:
一个节点是否该park()是关键,这是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)实现的。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0) return true;
if (s > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
selfInterrupt()
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
若是线程曾经中断过(或者阻塞过)(好比手动interrupt()或者超时等等,那么就再中断一次,中断两次的意思就是清除中断位)。
大致上整个Lock.lock()就这样一个流程。除了lock()方法外,还有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下来的章节中会一一介绍。