早些时候(jdk 1.5以前),并发环境下作同步控制,你的选择很少,多半是使用synchronized
关键字。不论是同步方法仍是同步块,总之遇到这个关键字,未获取锁线程就会乖乖等候,直到已获取锁的线程释放掉锁。java
而jdk 1.5推出ReenntrantLock
以后,此工具一度很风靡,当时人们更喜欢用Lock而不是synchronized,主要是由于它用起来灵活吧。(本人到如今为止,用synchronized的场景仍是Lock的时候多)直到后来,愈来愈多的文章,从性能、是否公平、实现原理各个方面对两者比较,你们才对他们有了更直观的认识。node
本文旨在分析ReenntrantLock的主要实现逻辑,并初步窥探AQS结构。若是不犯懒的话,但愿后续能将AQS作成系列,真正理解Doug Lea大神的这个经典实现。segmentfault
研究工具的原理以前,要先会使用工具。并发
public class ReentrantLockTest { Lock lock = new ReentrantLock(); //建立锁 public void doSomething(){ //### 1-尝试获取锁,成功 if(lock.tryLock()){ System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); try { //模拟逻辑执行 TimeUnit.MILLISECONDS.sleep(1100L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName())); lock.unlock(); //### 1.1-逻辑执行完,释放锁 } //### 2-尝试获取锁,失败 else { System.out.println(String.format("%s线程,获取锁失败",Thread.currentThread().getName())); } } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething(); },"T-"+total); t.start(); total--; TimeUnit.MILLISECONDS.sleep(1000L); } } }
tryLock()
方法会尝试获取锁,若是获取不到,直接return false
(不会阻断);若是获取到锁,return true
。工具
上面的例子,执行结果为:源码分析
T-3线程,获取到锁了 T-2线程,获取锁失败 T-3线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
修改下上例中的加锁方式:性能
Lock lock = new ReentrantLock(); public void doSomething2(){ lock.lock(); System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName())); try { TimeUnit.MILLISECONDS.sleep(1000L); //模拟业务逻辑 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName())); lock.unlock(); } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething2(); },"T-"+total); t.start(); total--; } }
与tryLock()不通,lock()
方式尝试获取锁,若是获取不到会持续等待。ui
执行结果会变为:编码
T-3线程,获取到锁了 T-3线程,业务执行完毕 T-2线程,获取到锁了 T-2线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
ReenntrantLock 加 / 解锁的使用方式就这些,而它是靠编码实现的。下图给出了ReenntrantLock类部分结构:spa
ReenntrantLock默认实现的是非公平锁(本文也只分析非公平实现)。
final Sync sync; public ReentrantLock() { sync = new NonfairSync(); //成员变量sync,赋值成NonfairSync的对象 }
先从实现较简单的tryLock()
研究:
## ReentrantLock类 public boolean tryLock() { return sync.nonfairTryAcquire(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 1- 获取AQS类中的state状态值 if (c == 0) { // 2- 若是state是0(默认值),将state原子形修改为1 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); // 2.1- 原子修改为功,标记AOS中的exclusiveOwnerThread为当前线程 return true; } } // 3- 此时state不是0,当前线程 == AOS中的exclusiveOwnerThread,将state修改成1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
tryLock()方法,核心逻辑就是原子修改AQS中的state
值,volatile
+CAS
(jdk9 VarHandle实现)。
具体一些:
实现过程当中,只在首次修改state
值,即将其从0改为1的时候,采用了原子的CAS
方式。
以后只判断当前线程和owner线程
(AOS中的exclusiveOwnerThread)是否一致,若是一致state++;不一致,直接return false
。
unLock()实现一样简单
## ReentrantLock类 public void unlock() { sync.release(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 public final boolean release(int arg) { ... tryRelease(arg) //尝试释放 ... } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync类 protected final boolean tryRelease(int releases) { int c = getState() - releases; // state-- // 1-验证线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //2-若是state==0时,将结果赋值为true,清空owner线程 free = true; setExclusiveOwnerThread(null); } setState(c); //state赋值 return free; }
若是操做线程是owner
线程(首次tryLock()时会记录owner):tryLock()
每次调用,state++
;unLock()
每次调用,state--
(state=0时,清空owner线程)。
Tip: 注释1处,若是当前线程非owner线程,会直接抛出异常!
对于 tryLock() 而言,它在实现上,彻底没用到AQS的精华。既然叫Abstract Queued Synchronizer——抽象队列同步器,队列、同步什么的才是重点。别急,lock()
方法会用到这些。
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //中断interrupt }
对于默认的非公平锁实现,acquire(int arg)
彻底可替换
成以下写法:
public final void acquire(int arg) { ##### tryAcquire(arg) 改为了 tryLock(arg) if (!tryLock(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //线程 interrupt }
如此替换后,逻辑就很好理解了:在用tryLock()
获取锁失败的状况下,会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
而 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 显然也分红了两个方法addWaiter
和acquireQueued
addWaiter(Node.EXCLUSIVE)
部分:private Node addWaiter(Node mode) { Node node = new Node(mode); //建立node,建立的同时绑定线程 for (;;) { Node oldTail = tail; if (oldTail != null) { //循环2-将node节点和首次循环中初始化的队列关联 node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); //循环1-初始化同步队列 } } }
这里需关注 AQS.Node 类 的一些关键属性(已文字标明各属性用途):
## 表示Node节点的状态,有CANCELLED(待取消)、SIGNAL(待唤醒)、CONDITION或默认的0几个状态 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; volatile Node prev; //prev指向前节点 volatile Node next; //next指向后节点 ## 节点绑定线程 volatile Thread thread;
经过下图,可更清楚的看出addWaiter方法的执行过程(此时线程T-3
在执行中):
结论1:
`addWaiter`会建立队列,并返回尾节点,即图中的`Node2`
acquireQueued(final Node node, int arg)
方法:final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); //获取pre节点,就是Node1 if (p == head && tryAcquire(arg)) { //### 注释1-再次尝试获取锁 setHead(node); //获取到锁了,去掉Node1,Node2变成新的head节点 p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } ... } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //2次循环,将waitStatus==Node.SIGNAL,renturn true return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); //首次循环,将pre节点Node1的waitStatus修改为SIGNAL } return false; }
这里依照上图,详细解释下:acquireQueued
方法的入参前面提到了,就是 addWaiter方法 新增的尾节点,即入参node
= Node2,那么node.predecessor()天然是Node1了——p
= Node1。
注释1位置,先判断p
是否是 头结点:
若是p
是头节点(上图中,p就是头结点),tryAcquire(arg)
会再次尝试获取锁。此时也有两种状况:
线程T-3
已经执行完并释放了锁,那么当前线程T-2
能够获取到锁;以后去掉当前头结点Node1,将Node2设置成头结点。线程T-3
未执行完,那么当前线程T-2
没法获取锁,以后会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法p
不是头结点,一样会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法而因为shouldParkAfterFailedAcquire(Node pred, Node node)方法在循环中,可能会执行两次:
waitStatus
修改为SIGNAL
(注意,因为循环的原故,还会再次执行到注释1
处,也就会再次尝试获取锁——上次线程T-3未结束,此次就有可能结束了);waitStatus
已是SIGNAL
,直接return true
。后面的parkAndCheckInterrupt()方法会将当前线程T-2
阻塞。给出线程T-2
未获取锁状况下的队列状况:
列出线程T-1
也参与其中的完整队列图。可看到尾节点以前的节点,绑定的线程都是阻塞
状态(park),而waitStatus
都是待唤醒
状态(waitStatus = SIGNAL = -1):
总结以上内容,做为结论2:
`acquireQueued`方法,若是当前线程是第1个获取锁失败的线程(例子中“线程T-3”正在执行,“线程T-2”就是第一个获取锁失败的线程),会再尝试2次获取锁; 获取锁失败 或 当前线程非第1个获取锁失败的线程(例子中T-1就不是第一个获取锁失败的线程),将前置节点状态修改为待唤醒,并阻塞关联线程。
为了便于理解,画出整个acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法的逻辑图:
阻塞并不是终点,还要再次看下unlock()
时作了什么。
## ReentrantLock类 public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //尝试释放,前面的已经分析过了 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // ### 重点看unparkSuccessor(h)方法,入参是`头节点` return true; } return false; } ## AQS类 private void unparkSuccessor(Node node) { // 获取Node节点的waitStatus,若是<0(好比待唤醒SIGNA = -1),原子形还原成0 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 获取头结点的下一个节点,若是是空(CANCELLED可能产生空),链表尾部遍历,取最前面一个waitStatus<0的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); // 唤醒 }
先不考虑CANCELLED状况,那么第二个节点对应的线程会被唤醒。第二个节点是什么来路?前面已经分析了,第1个获取锁失败的线程会和第二个节点绑定(例子中的Node2,对应的线程天然是T-2,下图):
线程T-2
被唤醒后,会作什么?
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { //循环 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); //### 线程T-2本来被阻塞于此 } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
很显然,若是线程T-2
被唤醒后,因为循环的原故,会再次进入以下逻辑:
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); //head易主 p.next = null; return interrupted; } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
tryAcquire(arg)
再次尝试获取锁,显然此时线程T-3
已经执行完了(否则也不会执行unlock),那么线程T-2
极可能会获取到锁——
那么,head易主,队列发生以下变化:
最后给出加 / 解锁过程当中的队列变化,帮助理解。
以上,终于分析完了 ReentrantLock的主要方法的实现。(有点细碎哈)
本系列的下一篇文章 AQS系列二:源码分析“公平”ReentrantLock和Condition 会继续探索ReentrantLock
的公平锁实现,敬请期待!