AQS实现的原理及一个实例分析(ReentrantLock)

1、 AQS是什么?

1.1 介绍:

AQS(Abstracting Queue Sychronizer),望文生义,即一个抽象队列 + 一个须要同步的状态,所谓抽象队列即这个队列并非真是存在的(通俗的讲,不是一个LinkedList对象),而是像HashMap中的链表同样,只存在Node之间的关系中,每一个Node负责维护前置与后置节点,以及持有一个请求线程(能够理解为将一个请求线程封装成Node);node

1.2 为何经过上述的方式实现:

  1. 为何不能够经过LinkedList去维护呢?缘由是在多线程尾插List时线程不安全,咱们都知道LinkedList并非一个线程安全的类,因此AQS采用了CAS+死循环的方式实现了插入的串行化,不知道在看的你看没看过 《高性能Mysql》 ,其中就提到一个邮箱系统,其实与这个场景相似,在事务隔离级别为第四级串行化时是能够保证线程安全的;
  2. 共享的状态是什么?是一个Volatile值,用Volatile保证每一个请求的线程均可以看见当前最新的状态,以避免产生线程冲突;
/**
     * The synchronization state.
     */
    private volatile int state;
复制代码

这个值当你本身去实现锁的时候你能够本身定义规则, 《Java并发编程的艺术》 一书中本身定义了一个能够同时被两个线程持有的锁(共享式),而且将state值设置为2,每当有一个线程获取到锁后,将该值减1,当state值再减去一便小于零时,这个线程便只能加入同步队列而且开始自旋等待锁。sql

2、为何须要AQS?

  1. 由于安全的在多线程下访问共享资源的需求在JAVA1.5的时候愈演愈烈,因此架包的实现者就想提供一个能够实现同步的基础框架; AQS(Abstracting Queue Sychronizer)面向的是锁的实现者,它简化了实现锁的方式,屏蔽了同步状态管理,FIFO队列管理,线程的等待与唤醒等底层操做,让锁的实现者更多的去关注锁须要实现的功能,并经过模板设计模式提供了比较好的灵活性;而锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节。

3、AQS是如何实现的?

3.1 AQS是如何将请求的线程封装成Node的呢?又是如何将Node链接成队列的呢?

  1. 既然是封装,那Node中便会持有一个请求Thread对象,而且为了创建Node之间的联系Node中会维护前置与后置节点指针,而AQS中会维护头尾节点指针,此时注意这里维护的是同步队列(在这个队列上的线程都在不断尝试是否能够获取到锁,由于在同步队列上即可以调用AQS的acquireQueue方法,而这个方法使得为获取到锁的线程检查本身是否有资格获取锁,若是没有,则调用LockSupport().park()方法将Node中的线程状态改成WAITING,等待被唤醒或被中断);

强调同步队列是由于,还有多个等待队列(与synchronized中Monitor对象的WaitSet一个意思,不过Monitor对象只有一个,而AQS能够有多个等待队列,视Conditon的数量为定),而且在Node节点中经过Condition指针维护,由于Node是同步队列与等待队列复用的,因此不可避免的产生了一些冗余;

2. 注意:此时最好要将synchronized的monitor机制与这里的AQS机制联系起来看: 在monitor机制中得到锁的线程若是调用 wait()方法,该线程所持有的锁会被释放并将该线程加入等待队列中,而Condition是调用 await()方法将该线程放入对应的Condition所持有的等待队列中去(我以为能够把Condition理解成操做系统中定义的线程唤醒条件),因此有几个Condition就会有几个对应的等待队列;

3.2 AQS是如何维护共享变量的可访问性呢?

  1. 在独占锁中,只有在同步队列的首节点的next节点能够尝试获取共享变量,由于在acquireQueue()方法中是这样定义判断条件的
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前节点的前置节点
                final Node p = node.predecessor();
                //注意这个与判断条件,第一个就是当前节点的前置节点是不是头节点,而做为第一个判断条件是由于与判断有一个为非即为非,就不会进行第二个条件的判断,这个在之后的编程中也是值得学习的
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

这样也就保证了获取共享资源的顺序性(即按照插入到队列的时间来定)编程

  1. 那AQS只能用来实现独占且公平锁吗?显然不是,AQS又是如何实现非公平锁和共享锁的呢?其实AQS不管用来实现什么锁,这些锁本质的区别就是在于获取共享资源访问权的方式不一样,而独占且公平的锁很明显获取访问权的方式是经过FIFO队列的顺序(即请求访问共享资源的顺序),而共享锁也是同样,只是能够获取访问权的线程数多了些;那么非公平锁是如何实现的呢?其实也很简单,就是舍弃队列的FIFO特性,只要持有共享资源的线程释放了锁,全部的在同步队列中的线程都会经过CAS操做去竞争锁;

4、AQS提供给锁实现者的API:

4.1 用于获取与设置共享资源的API:

  1. getState():获取当前同步状态
  2. setState():设置当前同步状态
  3. compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法可以保证状态设置的原子性

4.2 同步器可重写的方法:

这里AQS的设计者采用了模板设计模式将对同步状态的操做定义好过程,而将其中能够改变的过程交由每一个具体的同步器(即锁)来实现,保证了每一个同步器的特殊性; 上面讲的可能有点笼统,那咱们不妨分析一下AQS定义的模板是什么?但在此以前,咱们必定要牢记于心的是AQS是一个同步框架,即它全部的操做都是为了保证共享变量的安全!设计模式

  1. (以独占锁为例)在多线程的场景下,可能会有多个线程想要去访问共享变量,那么它们首先要作的是去看看本身有没有资格,即调用AQS的acquire()方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

能够看到这个方法会调用tryAcquire()方法 和咱们以前提到的 acquireQueued()方法,前者就是一个须要子类实现的模板方法,为何必定要子类去实现,由于每种锁都应该本身去定义当前共享变量处在一个什么状态下时,请求线程能够得到共享资源的访问权(举个例子,独占锁时,只要当前共享资源有线程在访问,那么以后全部请求线程都不能够再获取到锁;而若是是共享锁,那么这个方法就要再共享资源状态可访问数容许的状况下让该请求线程获取到锁);而若是子类定义的tryAcquire() 认为当前线程获取不到锁,就应该调用acquireQueued() 方法去死循环+CAS尝试获取锁安全

  1. 而后线程对共享资源操做完了,那它就会去释放共享资源,就会调用AQS的release(int arg)方法
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

同acquire()同样,如何去判断是否能够释放对共享资源的访问权也是须要不一样的锁本身去经过覆盖AQS中的tryRelease()方法去本身定义;bash

  1. 总结:因此模板方法即将框架搭好,但具备特殊性但又具备一致抽象的方法须要在子类中进行特殊化的实现;

5、 一个实例ReentrantLock(这里只看了公平锁的实现):

  1. 咱们看一下ReentrantLock()的具体实现,通常来说建议是将继承自AQS的实现类作为锁类的静态内部类;
  2. FairSync.lock()方法
//直接调用AQS的acquire()方法,这个输入参数在此处没有意义)
        final void lock() {
            acquire(1);
        }
复制代码

而这个acquire()方法利用Java多态实则是调用了FairSync的tryAcquires()方法多线程

/**
         * Fair version of tryAcquire.  Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires){ //省略了具体逻辑 } } 复制代码

6、 线程打断时队列的维护(我的以为比较难的一个点)

  1. 咱们都知道ReentrantLock() 可让线程在等待时间过长时放弃等待转而去作其余事情,那若是此时这个线程在同步队列上这么办?因此咱们须要将线程的当前状态信息同步到Node节点中去;
  2. 简单介绍一下线程的打断机制: 写在前面,必定要明白interrupt()方法只是改变了线程的中断标志位为True,并不能让线程直接死掉!而要等待线程自身自我kill** 当线程处于WAITING/TIMED_WAITING(无限期等待/限期等待)或者BLOCKED(等待获取一个排他锁)状态时,若是此时线程对象调用了interrupt()方法,就会抛出一个受检异常InterruptedException,并设置线程的中断标志位;
public class InterruptRunnableDemo extends Thread{

    @Override
    public void run() {
        //while循环的条件是当前线程的中断标志位是否为True
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("running");
        }
        System.out.println("done ");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new InterruptRunnableDemo();
        thread.start();
        Thread.sleep(1000);
        //打断子线程,并设置线程的中断标志位
        thread.interrupt();
    }
}
复制代码
输出结果为:
running
running
running
done 
复制代码

参考文章: zhuanlan.zhihu.com/p/27857336并发

  1. Node的属性:waitStatus : (1) CANCELLED :因为在同步队列中等待的线程等待超时或被中断,须要从同步队列中取消等待,节点进入该状态将不会变化;(2)SIGNAL:当前持有同步状态的节点释放或被取消时,在这个状态下,会通知后继处于等待状态的节点,使得后继节点的线程得以运行 (3)CONDITION:节点在等待队列中时节点处于此状态 (4)PROPAGATE :表示下一次共享式同步状态将会无条件被传播下去 (5)INITAL: 初始化状态 此时咱们须要关注的即是CANCELLED状态,节点是如何从其余状态变为CANCELLED状态的,而且进入这个状态对于维护等待队列有什么帮助?
  2. 当线程被另外一个线程改变了中断标志位时,AQS是如何改变Node的waitStatus状态的呢? AQS的acquireInterruptibly(int arg)方法,这个方法与acquire()方法相同,可是该方法响应中断,而且当前线程未获取到同步状态而进入同步队列中时,若是这个线程被中断,那么该方法会抛出InterruptedException并返回; 而acquire()方法是当节点经过tryAcquire()方法成功拿到访问共享资源的权力时,再去校验当前线程的中断标志位,若是为True则将Node的waitStatus状态改成CANCELLED,而且seltInterrupt()
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //调用时也会判断此时这个线程的中断标志位是否以及为True,是则直接抛出异常,并让调用者进行处理
        if (Thread.interrupted())
            throw new InterruptedException();
        //若是此时该节点已经获取到锁,但若是这个节点中的线程的中断标志位为True则也会抛出异常,而后调用doAcquireInterruptibly()中的finally代码块中的cancelAcquire()方法,将waitStatus状态改成CANCELLED
        if (!tryAcquire(arg)
            doAcquireInterruptibly(arg);
    }
复制代码
  1. 那改完这个状态有什么用呢? 关键在于unparkSuccessor(Node)方法,这个方法会将全部状态位CANCELLED的Node设置位null释放掉,不会再影响其后活跃线程竞争共享资源的访问权!

7、参考

  1. 参考的博客比较多,有用到图的都已经标注
  2. 《Java并发编程之美》
相关文章
相关标签/搜索