Doug Lea是JDK中concurrent工具包的做者,这位大神是谁能够自行google。java
本文浅析ReentrantLock(可重入锁)的原理node
Lock接口定义了这几个方法:安全
Condition
,Condition之后会分析;ReentrantLock实现了Lock接口,ReentrantLock中有一个重要的成员变量,同步器
sync继承了AbstractQueuedSynchronizer
简称AQS
,咱们先介绍AQS
;多线程
AQS用一个队列(结构是一个FIFO队列)来管理同步状态,当线程获取同步状态失败时,会将当前线程包装成一个Node
放入队列,当前线程进入阻塞状态;当同步状态释放时,会从队列去出线程获取同步状态。app
AQS里定义了head、tail、state,他们都是volatile修饰的,head指向队列的第一个元素,tail指向队列的最后一个元素,state表示了同步状态,这个状态很是重要,在ReentrantLock中,state为0的时候表明锁被释放,state为1时表明锁已经被占用;工具
看下面代码:oop
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }
这一段静态初始化代码初始了state、head、tail等变量的在内存中的偏移量;Unsafe
类是sun.misc下的类,不属于java标准。Unsafe
让java能够像C语言同样操做内存指针,其中就提供了CAS
的一些原子操做和park、unpark
对线程挂起与恢复的操做;关于CAS
是concurrent工具包的基础,之后会单独介绍,其主要做用就是在硬件级别提供了compareAndSwap
的功能,从而实现了比较和交换的原子性操做。ui
AQS还有一个内部类叫Node,它将线程封装,利用prev和next能够将Node串连成双向链表,这就是一开始说的FIFO的结构;this
ReentrantLock提供了公平锁和非公平锁,咱们这里从非公平锁分析AQS的应用;
Lock调用lock()方法时调用了AQS的lock()方法,咱们来看这个非公平锁NonfairSync
的lock方法:google
final void lock() { //首先调用CAS抢占同步状态state,若是成功则将当前线程设置为同步器的独占线程, //这也是非公平的体现,由于新来的线程没有立刻加入队列尾部,而是先尝试抢占同步状态。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //抢占同步状态失败,调用AQS的acquire acquire(1); }
瞄一眼acquire方法:
public final void acquire(int arg) { //在这里仍是先试着抢占一下同步状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire调用的是NonfairSync
的实现,而后又调用了Sync
的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //和以前同样,利用CAS抢占同步状态,成功则设置当前线程为独占线程而且返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是当前线程已是独占线程,即当前线程已经得到了同步状态则将同步状态state加1, //这里是可重入锁的体现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //没有抢占到同步状态返回false return false; }
再看addWaiter方法:
private Node addWaiter(Node mode) { //新建一个Node,封装了当前线程和模式,这里传入的是独占模式Node.EXCLUSIVE Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //若是tail不为空就不须要初始化node队列了 if (pred != null) { //将node做为队列最后一个元素入列 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; //返回新建的node return node; } } //若是tail为空则表示node队列尚未初始化,此时初始化队列 enq(node); return node; }
瞄一眼enq方法:
private Node enq(final Node node) { //无限loop直到CAS成功,其余地方也大量使用了无限loop for (;;) { Node t = tail; if (t == null) { // Must initialize //队列尾部为空,必须初始化,head初始化为一个空node,不包含线程,tail = head if (compareAndSetHead(new Node())) tail = head; } else { //队列已经初始化,将当前node加在列尾 node.prev = t; //将当前node设置为tail,CAS操做,enqueue安全 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
拿到新建的node后传给acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //标记是否中断状态 boolean interrupted = false; for (;;) { //拿到当前node的前驱 final Node p = node.predecessor(); //若是前驱正好为head,即当前线程在列首,立刻tryAcquire抢占同步状态 if (p == head && tryAcquire(arg)) { //抢占成功后,将当前节点的thread、prev清空做为head setHead(node); p.next = null; // help GC 原来的head等待GC回收 failed = false; return interrupted; } //没有抢占成功后,判断是否要park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
瞄一眼shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是前驱node的状态为SIGNAL,说明当前node能够park /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //若是前驱的状态大于0说明前驱node的thread已经被取消 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //从前驱node开始,将取消的node移出队列 //当前节点以前的节点不会变化,因此这里能够更新prev,并且没必要用CAS来更新。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前驱node状态等于0或者为PROPAGATE(之后会介绍) //将前驱node状态设置为SIGNAL,返回false,表示当前node暂不须要park, //能够再尝试一下抢占同步状态 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
看一下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { //阻塞当前线程 LockSupport.park(this); //返回当前线程是否设置中断标志,并清空中断标志 return Thread.interrupted(); }
这里解释一下为何要保存一下中断标志:中断会唤醒被park的阻塞线程,但被park的阻塞线程不会响应中断,因此这里保存一下中断状态并返回,若是状态为true说明发生过中断,会补发一次中断,即调用interrupt()方法
在acquireQueued中发生异常时执行cancelAcquire:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //清空node的线程 node.thread = null; // Skip cancelled predecessors //移除被取消的前继node,这里只移动了node的prev,没有改变next Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //获取前继node的后继node Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //设置当前node等待状态为取消,其余线程检测到取消状态会移除它们 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { //若是当前node为tail,将前驱node设置为tail(CAS) //设置前驱node(即如今的tail)的后继为null(CAS) //此时,若是中间有取消的node,将没有引用指向它,将被GC回收 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //若是当前node既不是head也不是tail,设置前继node的后继为当前node后继 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //唤醒当前node后继 unparkSuccessor(node); } //当前node的next设置为本身 //注意如今当前node的后继的prev还指向当前node,因此当前node还未被删除,prev是在移除取消节点时更新的 //这里就是为何在前面要从后往前找可换新的node缘由了,next会致使死循环 node.next = node; // help GC } }
画图描述解析一下cancelAcquire:
首先看如何跳过取消的前驱
这时,前驱被取消的node并无被移出队列,前驱的前驱的next还指向前驱;
若是当前node是tail的状况:
这时,没有任何引用指向当前node;
若是当前node既不是tail也不是head:
这时,当前node的前驱的next指向当前node的后继,当前node的next指向本身,pre都没有更新;
若是当前node是head的后继:
这时,只是简单的将当前node的next指向本身;
到这里,当线程抢占同步状态的时候,会进入FIFO队列等待同步状态被释放。在unlock()方法中调用了同步器的release方法;看一下release方法:
public final boolean release(int arg) { //判断是否释放同步状态成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //若是head不为null,且head的等待状态不为0, //唤醒后继node的线程 unparkSuccessor(h); return true; } return false; }
再来看一下tryRelease方法(在Sync类中实现):
protected final boolean tryRelease(int releases) { int c = getState() - releases; //当前thread不是独占模式的那个线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //若是同步状态state为0,释放成功,将独占线程设置为null free = true; setExclusiveOwnerThread(null); } //更新同步状态state setState(c); return free; }
继续看unparkSuccessor(唤醒后继node的tread)方法:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //head的等待状态为负数,设置head的等待状态为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { //若是head的后继node不存在或者后继node等待状态大于0(即取消) //从尾部往当前node迭代找到等待状态为负数的node,unpark //由于会有取消的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
介绍完ReentrantLock后,咱们大致了解了AQS的工做原理。AQS主要就是使用了同步状态和队列实现了锁的功能。有了CAS这个基础,AQS才能发挥做用,使得在enqueue、dequeque、节点取消和异常时可以保证队列在多线程下的完整性。