AbstractQueuedSynchronizer(如下简称AQS)是Java中用于构建锁和同步器的框架,许多同步器均可以经过AQS很容易而且高效地构造出来。不少文章已经基于论文和源码对实现进行了解读,本文试着从另外的角度入手:先不考虑AQS的实现,假设让咱们本身实现锁,咱们能够怎么作?最后再来看AQS的实现,才能更好地理解为何要这么实现。html
咱们能够形象地把锁理解成门票,只有当线程拿到了门票,才能进入临界区。所以咱们能够用一个状态变量state
表示锁,state = true
表示能够获取到锁,反之就是表示锁已经被占用。那么当锁被占用时,应该怎么处理?这里有两种思路:node
基于第一种思路实现的锁叫作自旋锁(SpinLock)。下面咱们先看自选锁中最简单的实现,这个实现叫作Test-And-Set-LOCK,简称TAS Lock。linux
public class TASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (!state.getAndSet(true)) {} // 循环检测直到状态可用
}
public void unlock() {
state.set(false);
}
}
复制代码
从实现上咱们能够看出,获取锁的线程一直处于活跃状态,可是并无执行任何有效的任务,所以使用自旋锁会形成busy-waiting
。git
在对TAS锁提出优化思路前,先介绍一下缓存一致性。下面这张图描述的是每一个处理器都有本身的缓存,但共享一个内存,缓存的内容来自内存。一旦处理器更新了本身的缓存,若是这个更新须要被其余处理器感知,就须要经过总线来通知。所以频繁更新会占用大量总线流量。github
目前咱们是用一个状态变量来标识锁的状态。TAS锁每次循环都会调用getAndSet()
,这是一个更新指令,会致使其余线程的缓存都失效,从而都会去内存中获取值,所以占用总线流量资源。数组
TAS锁
的问题在于每次循环都修改状态,实际上只有状态是可用的状况下,才有必要去修改。TTAS
(Test-Test-And-Set
)改进就是在加锁前先检查状态变量是否为false,只有条件知足才去修改。缓存
public class TTASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (true) {
while (state.get()) {} // 循环读取state状态
if (!state.getAndSet(true)) { // 只有当state为false,才会修改
return;
}
}
}
public void unlock() {
state.set(false);
}
}
复制代码
可是当释放锁时,其余线程检测到state
都是false
,这时都会调用state.getAndSet(true)
,又退化到TAS
的情形。bash
TTAS
的问题关键在于全部线程都同时去获取锁,所以引入延迟能够解决问题:当获取锁失败时,在重试前先睡眠一段时间,再次失败则延迟时间翻倍——指数退避。架构
public class BackoffLock {
AtomicBoolean state = new AtomicBoolean(false);
private int minDelay;
private int maxDelay;
public BackoffLock(int minDelay, int maxDelay) {
this.minDelay = minDelay;
this.maxDelay = maxDelay;
}
public void lock() throws InterruptedException {
int delay = minDelay;
while (true) {
while (state.get()) {}
if (!state.getAndSet(true)) {
return;
}
Thread.sleep((int) (Math.random() * delay));
if (delay < maxDelay) {
delay = 2 * delay;
}
}
}
public void unlock() {
state.set(false);
}
}
复制代码
指数退避自旋的不足在于须要设置好延迟参数,极可能就在线程睡眠过程当中,获取锁的线程恰好就释放了锁。框架
一开始由于咱们都是基于一个状态变量来标识锁,才会致使频繁占用总线流量,那么若是每一个线程都有一个状态,就能够大幅减小占用。
基于数组的队列锁lock()
时从数组中按顺序找到一个可用的位置,用来表明当前线程。unlock()
时通知下一个线程。
public class ArrayLock {
private int n;
private volatile boolean[] flags;
private AtomicInteger next = new AtomicInteger(0);
private ThreadLocal<Integer> slot = new ThreadLocal<>();
public ArrayLock(int n) {
this.n = n;
flags = new boolean[n];
flags[0] = true;
}
public void lock() {
int index = next.getAndIncrement();
slot.set(index);
while (!flags[index % n]) {}
}
public void unlock() {
int index = slot.get();
flags[index % n] = false; // 为复用作好准备
flags[(index + 1) % n] = true; // 通知下一个线程
}
}
复制代码
显然,基于数组的队列锁的不足之处就是锁的数量受限于数组长度。所以,可用考虑经过链表来改进。
CLH锁
内部就维护了一个隐式的链表。CLH
是Craig, Landin, and Hagersten的缩写。
public class CLHSpinLock {
private final ThreadLocal<QNode> node;
private final ThreadLocal<QNode> prev;
AtomicReference<QNode> tail = new AtomicReference<>(new QNode());
public CLHSpinLock() {
node = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return new QNode();
}
};
prev = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return null;
}
};
}
public void lock() {
QNode myNode = node.get();
myNode.locked = true;
QNode pred = tail.getAndSet(myNode);
prev.set(pred);
// 在前继节点自旋
while (pred.locked) {};
}
public void unlock() {
QNode myNode = node.get();
myNode.locked = false;
node.set(prev.get());
}
class QNode {
volatile boolean locked = false;
}
}
复制代码
因为CLH是在前继节点上自旋,在NUMA架构下,可能须要频繁访问远端内存,影响性能。那么能不能直接在本地节点自旋呢?
MCS锁
就是在本地节点自旋,把CLH的屡次对远端内存的监听 + 一次对本地内存的更新,简化成了屡次对本地内存的监听 + 一次对远端内存的更新。
public class MCSSpinLock {
ThreadLocal<QNode> node = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return new QNode();
}
};
AtomicReference<QNode> tail = new AtomicReference<>(null);
public void lock() {
QNode qNode = node.get();
QNode pred = tail.getAndSet(qNode);
if (pred != null) {
qNode.locked = true;
pred.next = qNode; // QNode.next是volatile,保证了线程可见性
while (qNode.locked) {};
}
}
public void unlock() {
QNode qNode = node.get();
if (qNode.next == null) { // 当前节点没有发现后继节点
if (tail.compareAndSet(qNode, null)) { // 确实没有后继节点
return;
}
while (qNode.next == null) {}; // 有后继节点,可是尚未关联上,须要等待
}
qNode.next.locked = false;
}
class QNode {
volatile boolean locked = false;
volatile QNode next = null;
}
}
复制代码