咱们在以前介绍了并发编程的锁机制:synchronized和lock,lock接口的重要实现类是可重入锁ReentrantLock
。而上一篇并发Lock之AQS(AbstractQueuedSynchronizer)详解介绍了AQS,谈到ReentrantLock,不得不谈抽象类AbstractQueuedSynchronizer(AQS)。AQS定义了一套多线程访问共享资源的同步器框架,ReentrantLock的实现依赖于该同步器。本文在介绍过AQS,结合其具体的实现类ReentrantLock
分析实现原理。java
ReentrantLock实现了Lock接口,内部有三个内部类,Sync、NonfairSync、FairSync,Sync是一个抽象类型,它继承AbstractQueuedSynchronizer,这个AbstractQueuedSynchronizer是一个模板类,它实现了许多和锁相关的功能,并提供了钩子方法供用户实现,好比tryAcquire,tryRelease等。Sync实现了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync两个类继承自Sync,实现了lock方法,公平抢占和非公平抢占针对tryAcquire有不一样的实现。本文重点介绍ReentrantLock默认的实现,即非公平锁的获取锁和释放锁的实现。node
public ReentrantLock() {
sync = new NonfairSync();
}
复制代码
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
acquire(1)
实际上使用的是AbstractQueuedSynchronizer
的acquire方法,它是一套锁抢占的模板,整体原理是先去尝试获取锁,若是没有获取成功,就在CLH队列中增长一个当前线程的节点,表示等待抢占。而后进入CLH队列的抢占模式,进入的时候也会去执行一次获取锁的操做,若是仍是获取不到,就调用LockSupport.park将当前线程挂起。那么当前线程何时会被唤醒呢?当持有锁的那个线程调用unlock的时候,会将CLH队列的头节点的下一个节点上的线程唤醒,调用的是LockSupport.unpark方法。acquire代码比较简单,具体以下:public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
首先,在CLH锁队列尾部增长一个等待节点,这个节点保存了当前线程,经过调用addWaiter实现,这里须要考虑初始化的状况,在第一个等待节点进入的时候,须要初始化一个头节点而后把当前节点加入到尾部,后续则直接在尾部加入节点就好了。编程
private Node addWaiter(Node mode) {
// 初始化一个节点,这个节点保存当前线程
Node node = new Node(Thread.currentThread(), mode);
// 当CLH队列不为空的视乎,直接在队列尾部插入一个节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 当CLH队列为空的时候,调用enq方法初始化队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化节点,头尾都指向一个空节点
if (compareAndSetHead(new Node()))
tail = head;
} else {// 考虑并发初始化
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
首先,外层是一个无限for循环,若是当前节点是头节点的下个节点,而且经过tryAcquire获取到了锁,说明头节点已经释放了锁,当前线程是被头节点那个线程唤醒的,这时候就能够将当前节点设置成头节点,而且将failed标记设置成false,而后返回。至于上一个节点,它的next变量被设置为null,在下次GC的时候会清理掉。安全
若是本次循环没有获取到锁,就进入线程挂起阶段,也就是shouldParkAfterFailedAcquire这个方法。微信
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//
return true;
if (ws > 0) {
//
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
public void unlock() {
sync.release(1);
}
复制代码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
在ReetrantLock的tryLock(long timeout, TimeUnit unit)
提供了超时获取锁的功能。它的语义是在指定的时间内若是获取到锁就返回true,获取不到则返回false。这种机制避免了线程无限期的等待锁释放。多线程
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码
具体看一下内部类里面的方法tryAcquireNanos
:并发
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
复制代码
若是线程被中断了,那么直接抛出InterruptedException。若是未中断,先尝试获取锁,获取成功就直接返回,获取失败则进入doAcquireNanos。tryAcquire咱们已经看过,这里重点看一下doAcquireNanos作了什么。框架
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 起始时间
long lastTime = System.nanoTime();
// 线程入队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 又是自旋!
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若是前驱是头节点而且占用锁成功,则将当前节点变成头结点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 若是已经超时,返回false
if (nanosTimeout <= 0)
return false;
// 超时时间未到,且须要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 阻塞当前线程直到超时时间到期
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 更新nanosTimeout
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
//相应中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
doAcquireNanos的流程简述为:线程先入等待队列,而后开始自旋,尝试获取锁,获取成功就返回,失败则在队列里找一个安全点把本身挂起直到超时时间过时。这里为何还须要循环呢?由于当前线程节点的前驱状态可能不是SIGNAL,那么在当前这一轮循环中线程不会被挂起,而后更新超时时间,开始新一轮的尝试。ui
ReentrantLock是可重入的锁,其内部使用的就是独占模式的AQS。公平锁和非公平锁不一样之处在于,公平锁在获取锁的时候,不会先去检查state状态,而是直接执行aqcuire(1)。公平锁多了hasQueuePredecessors这个方法,这个方法用于判断CHL队列中是否有节点,对于公平锁,若是CHL队列有节点,则新进入竞争的线程必定要在CHL上排队,而非公平锁则是无视CHL队列中的节点,直接进行竞争抢占,这就有可能致使CHL队列上的节点永远获取不到锁,这就是非公平锁之因此不公平的缘由,这里再也不赘述。this