从ReentrantLock来理解AbstractQueuedSynchronizer

 

目录

1.基本概念

2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁

2.1 基本特性

2.1.1 重入

2.1.2 同步

2.1.3 实际场景

2.2 AQS重要方法与ReentrantLock的关联

2.3 ReentrantLock和AQS交互过程

2.3.1 交互图

2.3.2 流程图-加锁

2.3.3 解析

2.4 AQS的框架作用如何体现

2.4.1 加锁

2.4.2 解锁

2.4.3 总结

3.通过AbstractQueuedSynchronizer创建简单的同步锁

4.AQS其他的应用场景


1.基本概念

AbstractQueuedSynchronizer

AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

简单来说,作用就是维护锁的当前状态和线程等待列表,维护的关键则是对字段state以及双端双向队列的使用

ReenTrantLock

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,即当前线程获取该锁再次获取不会被阻塞。

2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁

2.1 基本特性

2.1.1 重入

// 可重入测试
public class ReentrantTest {
    public static void main(String[] args) {
        // main --> firstAction --> secondAction --> lastAction
        SyncAction.doAction(ReentrantTest::firstAction);
    }

    private static void firstAction() {
        SyncAction.doAction(ReentrantTest::secondAction);
    }

    private static void secondAction() {
        SyncAction.doAction(ReentrantTest::lastAction);
    }

    private static void lastAction() {
        System.out.println("Hello World, last Action.");
    }
}
// 同步方法
public class SyncAction {
     private static ReentrantLock lock = new ReentrantLock();
//    自定义锁
//    private static CustomLock lock = new CustomLock();
    
    /**
     * 功能描述:同步方法
     *
     * @param runnable
     * @author lvchengyi
     * @date 10:43 下午 2020/9/1
     */
    public static void doAction(Runnable runnable) {
        lock.lock();
        try {
            runnable.run();
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            lock.unlock();
        }
    }
}

在同一个线程中,代码中三次lock操作,都进入了锁,体现锁的重入性。

2.1.2 同步

// 同步测试
public class SyncTest {
    public static void main(String[] args) {
        // main --> firstAction 终止,死锁发生
        // main线程拥有ReentrantLock的锁,firstAction线程等待main线程的锁释放
        // main线程等待firstAction方法执行完毕,也在等待,因此发生死锁
        ReentrantTest.syncAction(SyncTest::firstAction);
    }

    public static void firstAction() {
        System.out.println(Thread.currentThread().getName());
        CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::secondAction)).join();
    }

    public static void secondAction() {
        System.out.println(Thread.currentThread().getName());
        CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::lastAction)).join();
    }

    private static void lastAction() {
        System.out.println(Thread.currentThread().getName());
        System.out.println("Hello World, last Action.");
    }
}

在不同线程中,第二次lock发生了阻塞,拿不到锁,体现锁的同步性。

2.1.3 实际场景

public class NormalLockTest {
    private static int sum = 0;

    public static void main(String[] args) {
        List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
        // 开启10个线程都做递增方法
        for (int i = 0; i < 10; i++) {
            // 加锁后进行递增
//             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> SyncAction.doAction(NormalLockTest::addSum));

            // 未加锁进行递增
            CompletableFuture<Void> future = CompletableFuture.runAsync(NormalLockTest::addSum);

            completableFutureList.add(future);
        }
        // 等待10个线程执行完毕
        CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
        System.out.println(sum);
    }

    private static void addSum() {
        // 给sum变量加1000
        for (int i = 0; i < 1000; i++) {
            sum++;
        }
    }
}

在并发场景下,假设有T1、T2、T3竞争同一个ReentrantLock锁

若T1拿到锁后,T1(lock),T2(park),T3(park)

Waited Queue → Head → T2 next → T3

T1(unlock) → T2.unpark

T2(free),T3(park)    可能会存在竞争锁

Waited Queue → Head(T2) → T3

假设T2(lock),T3(park)

......

具体如何实现见下文

2.2 AQS重要方法与ReentrantLock的关联

方法名 描述
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

ReentrantLock是独占锁,所以实现了tryAcquire和tryRelease。

ReentrantLock使用state字段实现同步和可重入:

  1. State初始化的时候为0,表示没有任何线程持有锁。

  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。

  3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。

2.3 ReentrantLock和AQS交互过程

2.3.1 交互图

2.3.2 流程图-加锁

 

2.3.3 解析

加锁:

  • 通过ReentrantLock的加锁方法Lock进行加锁操作。

  • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。

  • AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。

  • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

解锁:

  • 通过ReentrantLock的解锁方法Unlock进行解锁。

  • Unlock会调用内部类Sync的Release方法,该方法继承于AQS。

  • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。

  • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。

2.4 AQS的框架作用如何体现

这里以ReentrantLock的非公平锁为例进行分析

2.4.1 加锁

在非公平锁中,有一段这样的代码:

static final class NonfairSync extends Sync {
	...
	final void lock() {
		// 如果当前state为0,则拿到锁
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
		// 尝试获取锁
			acquire(1);
	}
  ...
}

看一下AQS的Acquire是怎么写的:

public final void acquire(int arg) {
	// 尝试获取锁,如果获取锁失败,则将当前线程信息放入到内部队列中
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		// 手动中断线程
		selfInterrupt();
}

再看一下tryAcquire方法:

// AQS的tryAcquire,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作
protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

2.4.2 解锁

在ReentrantLock中,解锁的代码:

public class ReentrantLock implements Lock, java.io.Serializable {
	...
	public void unlock() {
   	 	sync.release(1);
	}
    ...
}

非公平锁直接调AQS的release(int arg)方法

public final boolean release(int arg) {
	// 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
		// 判断头部节点是否处于被挂起状态
        if (h != null && h.waitStatus != 0)
			// 唤醒线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

再看一下tryRelease方法:

// AQS的tryRelease,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

2.4.3 总结

由上可得,继承AQS实现同步锁,只需要在下层实现方法中维护好锁的状态,锁的双端双向队列、唤醒线程等逻辑AQS都已经在框架中实现了。

3.通过AbstractQueuedSynchronizer创建简单的同步锁

public class CustomLock implements Lock {

    private Sync sync = new Sync();

    @Override
    public void lock() {
        System.out.println(Thread.currentThread().getName() + "===尝试获取锁===");
        sync.acquire(1);
        System.out.println(Thread.currentThread().getName() + "===已经获取锁===");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName() + "===尝试释放锁===");
        sync.release(1);
        System.out.println(Thread.currentThread().getName() + "====已经释放锁===");
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

4.AQS其他的应用场景

除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryReleaseShared会增加计数,acquireShared会减少计数
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)