AQS
- AbstractQueuedSynchronizer队列同步器,Lock接口实现的核心,可自定义同步器。
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
- CLH锁实现图,将每一个线程构形成Node节点,加入链表,新加入线程在列表队列最后,每次头结点获取到所,后续节点继续获取锁。
- AQS暴露操做方法,隐藏实现细节,集成AQS,重写获取锁方法,可自定义同步锁。
- 内部经过voliate修饰的int变量,
unsafe
方法提供的compareAndSet方法提供同步,逐步包装成同步队列,同步锁。
- 原理总结,用AQS的volatile的同步特性,设置int锁数量,unsafe对象的CAS同步设置方法,构造同步队列,最后unsafe的park,unpark直接操做线程挂起释放。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
//实现AQS重写锁
//Lock的全部操做,经过代理内部类调用AQS,先看非公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//2. sync的非公平实现,快速调用父类AQS的CAS设置state是,若是失败,进入acquire构造Node节点,加入CLK
//
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//3. AQS的acruird,tryAcquire交给子类自定实现,addWaiter加入CLK节点,acquireQueued,全部节点自旋,获取同步状态
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//1. 外部调用Lock
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public Condition newCondition() {
return sync.newCondition();
}
}
//4. AQS中的实现,tryAcquire交给子类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//5. ReentranLock里Sync非公平锁实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS中state
int c = getState();
if (c == 0) {
//AQS的CAS同步设置state,并设置持有线程
if (compareAndSetState(0, acquires)) {
//设置持有线程
setExclusiveOwnerThread(current);
return true;
}
}
//可重入关键,同一线程可重复获取锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//6. 加入CLK链表尾部,经过AQS,CAS设置tail节点,构造链表见自定容器Stack
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//快速设置tail,若是失败进入enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//死循环设置尾节点,直到成功,以死循环的方式,同步设置tail节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//7. 最后,每一个构造的节点,都以自旋的方式,获取前一节点,若是前一节点是头节点,而且获取到锁,把当前本身设置成头结点,获的锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//parkAndCheckInterrupt 调用LockSupport挂起线程,进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//unlock,调用AQS释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//LockSupport的unparking,释放线程挂起状态
unparkSuccessor(h);
return true;
}
return false;
}
//1. 交给子类实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}