AbstractQueuedSynchronizer是基于一个FIFO双向链队列 ==CLH队列==,用于构建锁或者同步装置的类,也称为Java同步器,ReentrantLock的公平锁与非公平锁就是由该同步器构成,链队列结构图以下。node
你能够理解为银行ATM机取钱,一我的先去取,获取到了锁,在这个时间内其余线程处于阻塞状态,只有等他取完钱了,他走了,释放了锁,排在它后面的人才能够获取到释放的锁并进行取钱。多线程
该同步器利用一个int值表示状态,实现方式是==使用内部类继承该同步器的方式==实现它的tryRelease、tryAcquire等方法管理状态,管理状态使用如下三个方法:ide
节点包含的状态有:测试
节点其余信息:ui
Node prev |
前驱节点 |
---|---|
Node next |
后继节点 |
Node nextWaiter | 存储condition队列中的后继节点 |
Thread thread |
入队列时的当前线程 |
锁在一个时间点只能被一个线程锁占有,AQS实现的ReentrantLock,又分为公平锁和非公平锁this
保障了多线程下各线程获取锁的顺序,先到的线程优先获取锁spa
加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待线程
锁在一个时间点能够被多个线程同时获取,AQS实现的CountDownLatch、ReadWriteLockcode
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}复制代码
根据指定状态获取,能获取到 执行compareAndSetState方法设置新状态cdn
public final void acquire(int arg) {
//tryAcquire成功的话 acquire结束;
if (!tryAcquire(arg) &&
//AcquireQueued方法进行阻塞等待,直到获取锁为止
//addWaiter把当前线程添加到队列尾部
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//tryAcquire失败而且acquiredQueued成功的话把当前线程中断
selfInterrupt();
}复制代码
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取状态
int c = getState();
//若是该锁未被任何线程占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//若被获取到 查看是否被当前线程
else if (current == getExclusiveOwnerThread()) {
//是当前线程的话再次获取,计数+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}复制代码
在tryAcquire方法中使用了同步器提供的对state操做的方法,利用CAS原理保证只有一个线程可以对状态进行成功修改,而没有成功修改的线程将进入队列排队。
AcquireQueued方法进行阻塞等待,直到获取锁为止
//node为null,排他方式阻塞等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//p为当前节点的前一个节点
final Node p = node.predecessor();
//若是p为头结点而且获取成功就把当前线节点设置为头结点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//若是线程须要被阻塞 则interrputr为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}复制代码
判断线程是否须要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//线程须要运行
if (ws == Node.SIGNAL)
return true;
//ws>0 处于CANCEL状态的线程
if (ws > 0) {
//把这些线程从队列中清除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//把等待的设置为运行状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}复制代码
// park方法让其余线程处于等待状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}复制代码
调用release释放一个锁
public void unlock() {
//释放一个锁
sync.release(1);
}复制代码
释放锁
public final boolean release(int arg) {
//调用tryRelease尝试释放一个锁
if (tryRelease(arg)) {
Node h = head;
//释放成功后
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}复制代码
尝试释放当前锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}复制代码
锁释放后唤醒线程,一同竞争CPU资源
private void unparkSuccessor(Node node) {
//获取当前节点状态
int ws = node.waitStatus;
//把状态设置为等待获取锁状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//若是没有下一个节点或者下一个节点状态为CANCEL,则把它们清除
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//不然调用LockSupport中unpark方法,唤醒后一个节点
if (s != null)
LockSupport.unpark(s.thread);
}复制代码
调用UNSAFE的本地方法unpark唤醒线程
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}复制代码
AQS实现ReentrantLock公平锁与非公平锁最大的区别在下面这段代码:
源码以下:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}复制代码
==判断"当前线程"是否是在CLH队列的队首,来实现公平性==。
public class MyAQSLock implements Lock {
private final Sync sync;
public MyAQSLock() {
sync = new Sync();
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
/**
* 把lock、unlock实现使用AQS构建为内部类
*/
private class Sync extends AbstractQueuedSynchronizer{
Condition newCondition(){
return new ConditionObject();
}
@Override
protected boolean tryAcquire(int arg) {
//第一个线程进来拿到锁
int state = getState();
//用于重入锁判断
Thread current = Thread.currentThread();
if(state==0){
if(compareAndSetState(0,arg)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
//重入锁判断 当前线程和独占锁线程相同,则再次获取
else if(current==getExclusiveOwnerThread()){
int next = state+arg;
if(next<0){
throw new RuntimeException();
}
setState(next);
return true;
}
return false;
}
/**
* 可重入释放锁
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
if(Thread.currentThread()!=getExclusiveOwnerThread()){
throw new RuntimeException();
}
int state = getState()-arg;
if(state==0){
setExclusiveOwnerThread(null);
setState(0);
return true;
}
setState(0);
return false;
}
}
}复制代码
public class TestAQSLock2 {
MyAQSLock myLock = new MyAQSLock();
private int value;
private int value2;
public int a(){
myLock.lock();
try {
b();
return value++;
}finally {
myLock.unlock();
}
}
public void b(){
myLock.lock();
try {
System.out.println(++value2);
}finally {
myLock.unlock();
}
}
public static void main(String[] args) {
TestAQSLock2 myLock = new TestAQSLock2();
for(int i=0;i<50;i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":" + myLock.a());
}).start();
}
}
}复制代码