前言html
这篇文章被归到Java基础分类中,其实真的一点都不基础。网上写ReentrantLock的使用、ReentrantLock和synchronized的区别的文章不少,研究ReentrantLock而且能讲清楚ReentrantLock的原理的文章不多,本文就来研究一下ReentrantLock的实现原理。研究ReentrantLock的实现原理须要比较好的Java基础以及阅读代码的能力,有些朋友看不懂不要紧,能够之后看,相信你必定会有所收获。java
最后说一句,ReentrantLock是基于AQS实现的,这在下面会讲到,AQS的基础又是CAS,若是不是很熟悉CAS的朋友,能够看一下这篇文章Unsafe与CAS。node
AbstractQueuedSynchronizerapp
ReentrantLock实现的前提就是AbstractQueuedSynchronizer,简称AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一个内部类是这个抽象类的子类。先用两张表格介绍一下AQS。第一个讲的是Node,因为AQS是基于FIFO队列的实现,所以必然存在一个个节点,Node就是一个节点,Node里面有:ui
属 性 | 定 义 |
Node SHARED = new Node() | 表示Node处于共享模式 |
Node EXCLUSIVE = null | 表示Node处于独占模式 |
int CANCELLED = 1 | 由于超时或者中断,Node被设置为取消状态,被取消的Node不该该去竞争锁,只能保持取消状态不变,不能转换为其余状态,处于这种状态的Node会被踢出队列,被GC回收 |
int SIGNAL = -1 | 表示这个Node的继任Node被阻塞了,到时须要通知它 |
int CONDITION = -2 | 表示这个Node在条件队列中,由于等待某个条件而被阻塞 |
int PROPAGATE = -3 | 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取能够无条件传播 |
int waitStatus | 0,新Node会处于这种状态 |
Node prev | 队列中某个Node的前驱Node |
Node next | 队列中某个Node的后继Node |
Thread thread | 这个Node持有的线程,表示等待锁的线程 |
Node nextWaiter | 表示下一个等待condition的Node |
看完了Node,下面再看一下AQS中有哪些变量和方法:this
属性/方法 | 含 义 |
Thread exclusiveOwnerThread | 这个是AQS父类AbstractOwnableSynchronizer的属性,表示独占模式同步器的当前拥有者 |
Node | 上面已经介绍过了,FIFO队列的基本单位 |
Node head | FIFO队列中的头Node |
Node tail | FIFO队列中的尾Node |
int state | 同步状态,0表示未锁 |
int getState() | 获取同步状态 |
setState(int newState) | 设置同步状态 |
boolean compareAndSetState(int expect, int update) | 利用CAS进行State的设置 |
long spinForTimeoutThreshold = 1000L | 线程自旋等待的时间 |
Node enq(final Node node) | 插入一个Node到FIFO队列中 |
Node addWaiter(Node mode) | 为当前线程和指定模式建立并扩充一个等待队列 |
void setHead(Node node) | 设置队列的头Node |
void unparkSuccessor(Node node) | 若是存在的话,唤起Node持有的线程 |
void doReleaseShared() | 共享模式下作释放锁的动做 |
void cancelAcquire(Node node) | 取消正在进行的Node获取锁的尝试 |
boolean shouldParkAfterFailedAcquire(Node pred, Node node) | 在尝试获取锁失败后是否应该禁用当前线程并等待 |
void selfInterrupt() | 中断当前线程自己 |
boolean parkAndCheckInterrupt() | 禁用当前线程进入等待状态并中断线程自己 |
boolean acquireQueued(final Node node, int arg) | 队列中的线程获取锁 |
tryAcquire(int arg) | 尝试得到锁(由AQS的子类实现它) |
tryRelease(int arg) | 尝试释放锁(由AQS的子类实现它) |
isHeldExclusively() | 是否独自持有锁 |
acquire(int arg) | 获取锁 |
release(int arg) | 释放锁 |
compareAndSetHead(Node update) | 利用CAS设置头Node |
compareAndSetTail(Node expect, Node update) | 利用CAS设置尾Node |
compareAndSetWaitStatus(Node node, int expect, int update) | 利用CAS设置某个Node中的等待状态 |
上面列出了AQS中最主要的一些方法和属性。整个AQS是典型的模板模式的应用,设计得十分精巧,对于FIFO队列的各类操做在AQS中已经实现了,AQS的子类通常只须要重写tryAcquire(int arg)和tryRelease(int arg)两个方法便可。spa
ReentrantLock的实现线程
ReentrantLock中有一个抽象类Sync:设计
private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { ... }
ReentrantLock根据传入构造方法的布尔型参数实例化出Sync的实现类FairSync和NonfairSync,分别表示公平的Sync和非公平的Sync。因为ReentrantLock咱们用的比较多的是非公平锁,因此看下非公平锁是如何实现的。假设线程1调用了ReentrantLock的lock()方法,那么线程1将会独占锁,整个调用链十分简单:code
第一个获取锁的线程就作了两件事情:
一、设置AbstractQueuedSynchronizer的state为1
二、设置AbstractOwnableSynchronizer的thread为当前线程
这两步作完以后就表示线程1独占了锁。而后线程2也要尝试获取同一个锁,在线程1没有释放锁的状况下必然是行不通的,因此线程2就要阻塞。那么,线程2如何被阻塞?看下线程2的方法调用链,这就比较复杂了:
调用链看到确实很是长,不要紧,结合代码分析一下,其实ReentrantLock没有那么复杂,咱们一点点来扒代码:
1 final void lock() { 2 if (compareAndSetState(0, 1)) 3 setExclusiveOwnerThread(Thread.currentThread()); 4 else 5 acquire(1); 6 }
首先线程2尝试利用CAS去判断state是否是0,是0就设置为1,固然这一步操做确定是失败的,由于线程1已经将state设置成了1,因此第2行一定是false,所以线程2走第5行的acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
从字面上就很好理解这个if的意思,先走第一个判断条件尝试获取一次锁,若是获取的结果为false即失败,走第二个判断条件添加FIFO等待队列。因此先看一下tryAcquire方法作了什么,这个方法最终调用到的是Sync的nonfairTryAcquire方法:
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 } 10 else if (current == getExclusiveOwnerThread()) { 11 int nextc = c + acquires; 12 if (nextc < 0) // overflow 13 throw new Error("Maximum lock count exceeded"); 14 setState(nextc); 15 return true; 16 } 17 return false; 18 }
因为state是volatile的,因此state对线程2具备可见性,线程2拿到最新的state,再次判断一下可否持有锁(可能线程1同步代码执行得比较快,这会儿已经释放了锁),不能够就返回false。
注意一下第10~第16行,这段代码的做用是让某个线程能够屡次调用同一个ReentrantLock,每调用一次给state+1,因为某个线程已经持有了锁,因此这里不会有竞争,所以不须要利用CAS设置state(至关于一个偏向锁)。从这段代码能够看到,nextc每次加1,当nextc<0的时候抛出error,那么同一个锁最多能重入Integer.MAX_VALUE次,也就是2147483647。
而后就走到if的第二个判断里面了,先走AQS的addWaiter方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
先建立一个当前线程的Node,模式为独占模式(由于传入的mode是一个NULL),再判断一下队列上有没有节点,没有就建立一个队列,所以走enq方法:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 Node h = new Node(); // Dummy header 6 h.next = node; 7 node.prev = h; 8 if (compareAndSetHead(h)) { 9 tail = node; 10 return h; 11 } 12 } 13 else { 14 node.prev = t; 15 if (compareAndSetTail(t, node)) { 16 t.next = node; 17 return t; 18 } 19 } 20 } 21 }
这个方法其实画一张图应该比较好理解,造成一个队列以后应该是这样的:
每一步都用图表示出来了,因为线程2所在的Node是第一个要等待的Node,所以FIFO队列上确定没有内容,tail为null,走的就是第4行~第10行的代码逻辑。这里用了CAS设置头Node,固然有可能线程2设置头Node的时候CPU切换了,线程3已经把头Node设置好了造成了上图所示的一个队列,这时线程2再循环一次获取tail,因为tail是volatile的,因此对线程2可见,线程2看见tail不为null,就走到了13行的else里面去往尾Node后面添加自身。整个过程下来,造成了一个双向队列。最后走AQS的acquireQueued(node, 1):
1 final boolean acquireQueued(final Node node, int arg) { 2 try { 3 boolean interrupted = false; 4 for (;;) { 5 final Node p = node.predecessor(); 6 if (p == head && tryAcquire(arg)) { 7 setHead(node); 8 p.next = null; // help GC 9 return interrupted; 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } catch (RuntimeException ex) { 16 cancelAcquire(node); 17 throw ex; 18 } 19 }
此时再作判断,因为线程2是双向队列的真正的第一个Node(前面还有一个h),因此第5行~第10行再次判断一下线程2能不能获取锁(可能这段时间内线程1已经执行完了把锁释放了,state从1变为了0),若是仍是不行,先调用AQS的shouldParkAfterFailedAcquire(p, node)方法:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int s = pred.waitStatus; 3 if (s < 0) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park 7 */ 8 return true; 9 if (s > 0) { 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; 16 } while (pred.waitStatus > 0); 17 pred.next = node; 18 } 19 else 20 /* 21 * Indicate that we need a signal, but don't park yet. Caller 22 * will need to retry to make sure it cannot acquire before 23 * parking. 24 */ 25 compareAndSetWaitStatus(pred, 0, Node.SIGNAL); 26 return false; 27 }
吐槽一下先,这段代码的代码格式真糟糕(看来JDK的开发大牛们也有写得很差的地方),这个waitStatus是h的waitStatus,很明显是0,因此此时把h的waitStatus设置为Noed.SIGNAL即-1并返回false。既然返回了false,上面的acquireQueued的11行if天然不成立,再走一次for循环,仍是先尝试获取锁,不成功,继续走shouldParkAfterFailedAcquire,此时waitStatus为-1,小于0,走第三行的判断,返回true。而后走acquireQueued的11行if的第二个判断条件parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
最后一步,调用LockSupport的park方法阻塞住了当前的线程。至此,使用ReentrantLock让线程1独占锁、线程2进入FIFO队列并阻塞的完整流程已经整理出来了。
lock()的操做明了以后,就要探究一下unlock()的时候代码又作了什么了,接着看下一部分。
unlock()的时候作了什么
就不画流程图了,直接看一下代码流程,比较简单,调用ReentrantLock的unlock方法:
public void unlock() { sync.release(1); }
走AQS的release:
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 }
先调用Sync的tryRelease尝试释放锁:
1 protected final boolean tryRelease(int releases) { 2 int c = getState() - releases; 3 if (Thread.currentThread() != getExclusiveOwnerThread()) 4 throw new IllegalMonitorStateException(); 5 boolean free = false; 6 if (c == 0) { 7 free = true; 8 setExclusiveOwnerThread(null); 9 } 10 setState(c); 11 return free; 12 }
首先,只有当c==0的时候才会让free=true,这和上面一个线程屡次调用lock方法累加state是对应的,调用了多少次的lock()方法天然必须调用一样次数的unlock()方法才行,这样才把一个锁给所有解开。
当一条线程对同一个ReentrantLock所有解锁以后,AQS的state天然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,方法返回true。代码继续往下走,上面的release方法的第四行,h不为null成立,h的waitStatus为-1,不等于0也成立,因此走第5行的unparkSuccessor方法:
1 private void unparkSuccessor(Node node) { 2 /* 3 * Try to clear status in anticipation of signalling. It is 4 * OK if this fails or if status is changed by waiting thread. 5 */ 6 compareAndSetWaitStatus(node, Node.SIGNAL, 0); 7 8 /* 9 * Thread to unpark is held in successor, which is normally 10 * just the next node. But if cancelled or apparently null, 11 * traverse backwards from tail to find the actual 12 * non-cancelled successor. 13 */ 14 Node s = node.next; 15 if (s == null || s.waitStatus > 0) { 16 s = null; 17 for (Node t = tail; t != null && t != node; t = t.prev) 18 if (t.waitStatus <= 0) 19 s = t; 20 } 21 if (s != null) 22 LockSupport.unpark(s.thread); 23 }
s即h的下一个Node,这个Node里面的线程就是线程2,因为这个Node不等于null,因此走21行,线程2被unPark了,得以运行。有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减小一个Node呢?这是一个很巧妙的设计,又回到了AQS的acquireQueued方法了:
1 final boolean acquireQueued(final Node node, int arg) { 2 try { 3 boolean interrupted = false; 4 for (;;) { 5 final Node p = node.predecessor(); 6 if (p == head && tryAcquire(arg)) { 7 setHead(node); 8 p.next = null; // help GC 9 return interrupted; 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } catch (RuntimeException ex) { 16 cancelAcquire(node); 17 throw ex; 18 } 19 }
被阻塞的线程2是被阻塞在第12行,注意这里并无return语句,也就是说,阻塞完成线程2依然会进行for循环。而后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,而后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的全部对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是同样的原理。
这里有一个细节,看一下setHead方法:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
setHead方法里面的前驱Node是Null,也没有线程,那么为何不用一个在等待的线程做为Head Node呢?
由于一个线程随时有可能由于中断而取消,而取消的话,Node天然就要被GC了,那GC前必然要把头Node的后继Node变为一个新的头并且要应对多种状况,这样就很麻烦。用一个没有thread的Node做为头,至关于起了一个引导做用,由于head没有线程,天然也不会被取消。
再看一下上面unparkSuccessor的14行~20行,就是为了防止head的下一个node被取消的状况,这样,就从尾到头遍历,找出离head最近的一个node,对这个node进行unPark操做。
ReentrantLock其余方法的实现
若是能理解ReentrantLock的实现方式,那么你会发现ReentrantLock中其他一些方法的实现仍是很简单的,从JDK API关于ReentrantLock方法的介绍这部分,举几个例子:
一、int getHoldCount()
final int getHoldCount() { return isHeldExclusively() ? getState() : 0; }
获取ReentrantLock的lock()方法被调用了几回,就是state的当前值
二、Thread getOwner()
final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); }
获取当前占有锁的线程,就是AbstractOwnableSynchronizer中exclusiveOwnerThread的值
三、Collection<Thread> getQueuedThreads()
public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }
从尾到头遍历一下,添加进ArrayList中
四、int getQueuedLength()
public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; }
从尾到头遍历一下,累加n。固然这个方法和上面那个方法多是不许确的,由于遍历的时候可能别的线程又往队列尾部添加了Node。
其他方法也都差很少,能够本身去看一下。
遗留问题
ReentrantLock的流程基本已经理清楚了,如今还有一个遗留问题:咱们知道ReentrantLock是能够指定公平锁或是非公平锁,那么究竟是怎么样的代码差异致使公平锁和非公平锁的产生的呢?
说实话,这个问题,我本身到如今尚未彻底想通。以后会持续跟进这个问题,一旦想明白了,会第一时间更新此文或者是新发一篇文章来专门讲述公平ReentrantLock和非公平ReentrantLock在代码上的差异。