在synchronized
未优化以前,咱们在编码中使用最多的同步工具类应该是ReentrantLock
类,ReentrantLock
拥有优化后synchronized
关键字的性能,又提供了更多的灵活性。相比synchronized
,他在功能上更增强大,具备等待可中断,公平锁以及绑定多个条件等synchronized
不具有的功能,是咱们开发过程当中必需要重点掌握的一个关键并发类。java
ReentrantLock
在JDK并发包中举足轻重,不只是由于他自己的使用频度,同时他也为大量JDK并发包中的并发类提供底层支持,包括CopyOnWriteArrayLit
、CyclicBarrier
和LinkedBlockingDeque
等等。既然ReentrantLock
如此重要,那么了解他的底层实现原理对咱们在不一样场景下灵活使用ReentrantLock
以及查找各类并发问题就很关键。这篇文章就带领你们一步步剖析ReentrantLock
底层的实现逻辑,了解实现逻辑以后又应该怎么更好的使用ReentrantLock
。node
在使用ReentrantLock
类时,第一步就是对他进行实例化,也就是使用new ReentrantLock()
,来看看他的实例化的源码:面试
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
在代码中能够看到,ReentrantLock
提供了2个实例化方法,未带参数的实例化方法默认用NonfairSync()
初始化了sync
字段,带参数的实例化方法经过参数区用NonfairSync()
或FairSync()
初始化sync
字段。安全
经过名字看出也就是咱们经常使用的非公平锁与公平锁的实现,公平锁须要经过排队FIFO的方式来获取锁,非公平锁也就是说能够插队,默认状况下ReentrantLock
会使用非公平锁的实现。那么是sync
字段的实现逻辑是什么呢?看下sync
的代码:多线程
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {......}
static final class NonfairSync extends Sync {......}
static final class FairSync extends Sync {......}
复制代码
到这里就发现了AbstractQueuedSynchronizer
类,公平锁和非公平锁其实都是在AbstractQueuedSynchronizer
的基础上实现的,也就是AQS。AQS提供了ReentrantLock
实现的基础。并发
分析了ReentrantLock
的实例化以后,来看看他是怎么实现锁这个功能的:app
//ReentrantLock的lock方法
public void lock() {
sync.lock();
}
//调用了Sync中的lock抽象方法
abstract static class Sync extends AbstractQueuedSynchronizer {
......
/** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */
abstract void lock();
......
}
复制代码
调用了sync
的lock()
方法,Sync
类的lock()
方法是一个抽象方法,NonfairSync()
和FairSync()
分别对lock()
方法进行了实现。工具
//非公平锁的lock实现
static final class NonfairSync extends Sync {
......
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock() {
if (compareAndSetState(0, 1)) //插队操做,首先尝试CAS获取锁,0为锁空闲
setExclusiveOwnerThread(Thread.currentThread()); //获取锁成功后设置当前线程为占有锁线程
else
acquire(1);
}
......
}
//公平锁的lock实现
static final class FairSync extends Sync {
......
final void lock() {
acquire(1);
}
......
}
复制代码
注意看他们的区别,NonfairSync()
会先进行一个CAS操做,将一个state状态从0设置到1,这个也就是上面所说的非公平锁的“插队”操做,前面讲过CAS操做默认是原子性的,这样就保证了设置的线程安全性。这是非公平锁和公平锁的第一点区别。性能
那么这个state状态是作什么用的呢?从0设置到1又表明了什么呢?再来看看跟state有关的源码:优化
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/** * The synchronization state. */
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
复制代码
首先state变量是一个volatile
修饰的int
类型变量,这样就保证了这个变量在多线程环境下的可见性。从变量的注释“The synchronization state”能够看出state表明了一个同步状态。再回到上面的lock()
方法,在设置成功以后,调用了setExclusiveOwnerThread
方法将当前线程设置给了一个私有的变量,这个变量表明了当前获取锁的线程,放到了AQS的父类AbstractOwnableSynchronizer
类中实现。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
......
/** * The current owner of exclusive mode synchronization. */
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
复制代码
若是设置state成功,lock()
方法执行完毕,表明获取了锁。能够看出state状态就是用来管理是否获取到锁的一个同步状态,0表明锁空闲,1表明获取到了锁。那么若是设置state状态不成功呢?接下来会调用acquire(1)
方法,公平锁则直接调用acquire(1)
方法,不会用CAS操做进行插队。acquire(1)
方法是实如今AQS中的一个方法,看下他的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
这个方法很重要也很简单理解,有几步操做,首先调用tryAcquire
尝试获取锁,若是成功,则执行完毕,若是获取失败,则调用addWaiter
方法添加当前线程到等待队列,同时添加后执行acquireQueued
方法挂起线程。若是挂起等待中须要中断则执行selfInterrupt
将线程中断。下面来具体看看这个流程执行的细节,首先看看tryAcquire
方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//NonfairSync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //锁空闲
if (compareAndSetState(0, acquires)) { //再次cas操做获取锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //判断队列中是否已经存在等待线程,若是存在则获取锁失败,须要排队
compareAndSetState(0, acquires)) { //不存在等待线程,再次cas操做获取锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//AQS中实现,判断队列中是否已经存在等待线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码
AQS没有提供具体的实现,ReentrantLock
中公平锁和非公平锁分别有本身的实现。非公平锁在锁空闲的状态下再次CAS操做尝试获取锁,保证线程安全。若是当前锁非空闲,也就是state状态不为0,则判断是不是重入锁,也就是同一个线程屡次获取锁,是重入锁则将state状态+1,这也是
ReentrantLock`支持锁重入的逻辑。
公平锁和非公平锁在这上面有第二点区别,公平锁在锁空闲时首先会调用hasQueuedPredecessors
方法判断锁等待队列中是否存在等待线程,若是存在,则不会去尝试获取锁,而是走接下来的排队流程。至此非公平锁和公平锁的区别你们应该清楚了。若是面试时问道公平锁和非公平锁的区别,相信你们能够很容易答出来了。
经过tryAcquire
获取锁失败以后,会调用acquireQueued(addWaiter)
,先来看看addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //用EXCLUSIVE模式初始化一个Node节点,表明是一个独占锁节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { //若是尾节点不为空,表明等待队列中已经有线程节点在等待
node.prev = pred; //将当前节点的前置节点指向尾节点
if (compareAndSetTail(pred, node)) { //cas设置尾节点为当前节点,将当前线程加入到队列末尾,避免多线程设置致使数据丢失
pred.next = node;
return node;
}
}
enq(node); //若是队列中无等待线程,或者设置尾节点不成功,则循环设置尾节点
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //空队列,初始化头尾节点都为一个空节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //重复addWaiter中的设置尾节点,也是cas的经典操做--自旋,避免使用Synchronized关键字致使的线程挂起
t.next = node;
return t;
}
}
}
}
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); //共享模式
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; //独占模式
......
}
复制代码
addWaiter
方法首先初始化了一个EXCLUSIVE模式的Node节点。Node节点你们应该很熟悉,我写的集合系列文章里面介绍了不少链式结构都是经过这种方式来实现的。AQS中的Node也不例外,他的队列结构也是经过实现一个Node内部类来实现的,这里实现的是一个双向队列。Node节点分两种模式,一种SHARED共享锁模式,一种EXCLUSIVE独占锁模式,ReentrantLock
使用的是EXCLUSIVE独占锁模式,所用用EXCLUSIVE来初始化。共享锁模式后面的文章咱们再详细讲解。
初始化Node节点以后就是将节点加入到队列之中,这里有一点要注意的是多线程环境下,若是CAS设置尾节点不成功,须要自旋进行CAS操做来设置尾节点,这样即保证了线程安全,又保证了设置成功,这是一种乐观的锁模式,固然你能够经过synchronized关键字锁住这个方法,但这样效率就会降低,是一种悲观锁模式。
设置节点的过程我经过下面几张图来描述下,让你们有更形象的理解:
将当前线程加入等待队列以后,须要调用acquireQueued
挂起当前线程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前置节点
if (p == head && tryAcquire(arg)) { //若是前置节点是头节点,说明当前节点是第一个挂起的线程节点,再次cas尝试获取锁
setHead(node); //获取锁成功设置当前节点为头节点,当前节点占有锁
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否须要挂起线程
parkAndCheckInterrupt()) //挂起线程,当前线程阻塞在这里!
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
能够看到这个方法是一个自旋的过程,首先获取当前节点的前置节点,若是前置节点为头结点则再次尝试获取锁,失败则挂起阻塞,阻塞被取消后自旋这一过程。是否能够阻塞经过shouldParkAfterFailedAcquire
方法来判断,阻塞经过parkAndCheckInterrupt
方法来执行。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //表明继任节点须要挂起
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) { //表明前置节点已经退出(超时或中断等状况)
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //前置节点退出,循环设置到最近的一个未退出节点
pred.next = node;
} else { //非可挂起状态或退出状态则尝试设置为Node.SIGNAL状态
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//挂起当前线程
return Thread.interrupted();
}
复制代码
只有当节点处于SIGNAL状态时才能够挂起线程,Node的waitStatus有4个状态分别是:
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
static final int PROPAGATE = -3;
复制代码
注释写的很清楚,这里就不详细解释着四种状态了。到这里整个Lock的过程咱们就所有说完了,公平锁和非公平锁的区别从Lock的过程当中咱们也很容易发现,非公平锁同样要进行排队,只不过在排队以前会CAS尝试直接获取锁。说完了获取锁,下面来看下释放锁的过程。
unLock()
方法比较好理解,由于他不须要考虑多线程的问题,若是unLock()
的不是以前lock
的线程,直接退出就能够了。看看unLock()
的源码:
public class ReentrantLock implements Lock, java.io.Serializable {
......
public void unlock() {
sync.release(1);
}
......
}
public abstract class AbstractQueuedSynchronizer {
......
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //释放锁成功后启动后继线程
return true;
}
return false;
}
......
}
abstract static class Sync extends AbstractQueuedSynchronizer {
......
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) //释放锁必需要是获取锁的线程,不然退出,保证了这个方法只能单线程访问
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //独占锁为0后表明锁释放,不然为重入锁,不释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
......
}
abstract static class Sync extends AbstractQueuedSynchronizer {
......
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //挂起当前线程
}
......
}
复制代码
同lock()
方法同样,会调用AQS的release
方法,首先调用tryRelease
尝试释放,首先必需要是当前获取锁的线程,以后判断是否为重入锁,非重入锁则释放当前线程的锁。锁释放以后调用unparkSuccessor
方法启动后继线程。
ReentrantLock
的获取锁和释放锁到这里就讲完了,总的来讲仍是比较清晰的一个流程,经过AQS的state状态来控制锁获取和释放状态,AQS内部用一个双向链表来维护挂起的线程。在AQS和ReentrantLock之间经过状态和行为来分离,AQS用管理各类状态,并内部经过链表管理线程队列,ReentrantLock则对外提供锁获取和释放的功能,具体实现则在AQS中。下面我经过两张流程图总结了公平锁和非公平锁的流程。
非公平锁: