AQS源码分析看这一篇就够了

好了,咱们来开始今天的内容,首先咱们来看下AQS是什么,全称是
AbstractQueuedSynchronizer翻译过来就是【抽象队列同步】对吧。经过名字咱们也能看出这是个抽象类

并且里面定义了不少的方法

里面这么多方法,我们固然不是一个个去翻。里面还有不少的抽象方法,我们还得找它的实现多麻烦对不对。因此咱们换个方式来探索。java

场景模拟

  咱们先来看下这样一个场景

在这里咱们有一个能被多个线程共享操做的资源,在这个场景中应该能看出咱们的数据是不安全的,由于咱们并不能保证咱们的操做是原子操做对吧。基于这个场景咱们经过代码来看看效果node

package com.example.demo;

public class AtomicDemo {

    // 共享变量
    private static int count = 0;

    // 操做共享变量的方法
    public static void incr(){
        // 为了演示效果  休眠一会儿
        try {
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

经过执行发现,执行的结果是一个不肯定的值,但老是会小于等于1000,至于缘由,是由于incr() 方法不是一个原子操做。为何不是原子操做这个我们今天就不深究此处了.
迎合今天的主题,咱们经过Lock来解决安全

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享变量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操做共享变量的方法
    public static void incr(){
        // 为了演示效果  休眠一会儿
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

而后咱们运行发现结果都是 1000了,这也就是1000个线程都去操做这个 count 变量,结果符合咱们的预期了。那lock究竟是怎么实现的呢?函数

需求分析

  咱们先来分析分析

这样的图片看着比较复杂,我们简化下。

咱们本身假设下,若是要你去设计这样的方法,你应该要怎么设计,他们须要实现哪些功能,
  首先是lock方法,它是否是要知足这几个功能。

需求清楚了,那咱们怎么设计呢?
第一个互斥怎么作,也就是多个线程只有一个线程能抢占到资源,这个时候咱们能够这样设置源码分析

// 给一个共享资源
Int state = 0 ; // 0表示资源没有被占用,能够抢占
if(state == 0 ){
   // 表示能够获取锁
}else{
   // 表示锁被抢占 须要阻塞等待
}


而后就是没有抢占到锁的线程的存储,咱们能够经过一个队列,利用FIFO来实现存储。
最后就是线程的阻塞和唤醒。你们说说有哪些阻塞线程的方式呀?ui

1.wait/notify: 不合适,不能唤醒指定的线程
2.Sleep:休眠,相似于定时器
3.Condition:能够唤醒特定线程
4.LockSupport:
LockSupport.park():阻塞当前线程
LockSupport.unpark(Thread t):唤醒特定线程
结合今天的主题,咱们选择LockSupport来实现阻塞和唤醒。

好了,到这儿咱们已经猜测到了Lock中的实现逻辑,可是在探究源码以前咱们还有个概念须要先和你们讲下,由于这个是咱们源码中会接触到的一个,先讲了,看的时候就比较轻松了对吧。this

什么是重入锁?

  咱们先来看看重入锁的场景代码spa

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享变量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操做共享变量的方法
    public static void incr(){
        // 为了演示效果  休眠一会儿
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
            // 调用了另一个方法。
            decr();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void decr(){
        try {
            // 重入锁
            lock.lock();
            count--;
        }catch(Exception e){

        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}

首先你们考虑这段代码会死锁吗? 你们给我个回复,我看看你们的理解的怎么样
好了,有说会死锁的,有说不会,其实这儿是不会死锁的,并且结果就是0.为何呢?
  这个实际上是锁的一个嵌套,由于这两把锁都是同一个 线程对象,咱们讲共享变量的设计是
  当state=0;线程能够抢占到资源 state =1; 若是进去嵌套访问 共享资源,这时 state = 2 若是有多个嵌套 state会一直累加,释放资源的时候, state–,直到全部重入的锁都释放掉 state=0,那么其余线程才能继续抢占资源,说白了重入锁的设计目的就是为了防止 死锁!线程

AQS类图


经过类图咱们能够发现右车的业务应用其实内在都有相识的设计,这里咱们只须要搞清楚其中的一个,其余的你本身应该就能够看懂~,好了咱们就具体结合前面的案例代码,以ReentrantLock为例来介绍AQS的代码实现。翻译

源码分析

  在看源码以前先回顾下这个图,带着问题去看,会更轻松

Lock.lock()

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

这个方法逻辑比较简单,if条件成立说明 抢占锁成功并设置 当前线程为独占锁
else 表示抢占失败,acquire(1) 方法咱们后面具体介绍

compareAndSetState(0, 1):用到了CAS 是一个原子操做方法,底层是UnSafe.做用就是设置 共享操做的 state 由0到1. 若是state的值是0就修改成1

setExclusiveOwnerThread:代码很简单,进去看一眼便可

acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1.tryAcquire()尝试直接去获取资源,若是成功则直接返回(这里体现了非公平锁,每一个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
2.addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3.acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。若是在整个等待过程当中被中断过,则返回true,不然返回false。若是线程在等待过程当中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
  固然这里代码的做用我是提早研究过的,对于你们确定不是很清楚,咱们继续里面去看,最后你们能够回到这儿再论证。

tryAcquire(int)

  再次尝试抢占锁

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
//再次尝试抢占锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
// 重入锁的状况
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
// false 表示抢占失败
    return false;
}

addWaiter

  将阻塞的线程添加到双向链表的结尾

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失败则经过enq入队。
    enq(node);
    return node;
}

enq(Node)

private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,建立一个空的标志结点做为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一个if语句

else语句

线程3进来会执行以下代码

那么效果图

acquireQueued(Node, int)
  OK,经过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你马上应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其余线程完全释放资源后唤醒本身,本身再拿到资源,而后就能够去干本身想干的事了。没错,就是这样!是否是跟医院排队拿号有点类似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干能够休息),直到拿到号后再返回。这个函数很是关键,仍是上源码吧:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程当中是否被中断过

        //又是一个“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            //若是前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(多是老大释放完资源唤醒本身的,固然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。因此head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收之前的head结点。也就意味着以前拿完资源的结点出队了!
                failed = false; // 成功获取资源
                return interrupted;//返回等待过程当中是否被中断过
            }

            //若是本身能够休息了,就经过park()进入waiting状态,直到被unpark()。若是不可中断的状况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//若是等待过程当中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        if (failed) // 若是等待过程当中没有成功获取资源(如timeout,或者可中断的状况下被中断了),那么取消结点在队列中的等待。
            cancelAcquire(node);
    }
}

到这里了,咱们先不急着总结acquireQueued()的函数流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具体干些什么。

shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //若是已经告诉前驱拿完号后通知本身一下,那就能够安心休息了
        return true;
    if (ws > 0) {
        /*
         * 若是前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,因为被本身“加塞”到它们前边,它们至关于造成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //若是前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知本身一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

整个流程中,若是前驱结点的状态不是SIGNAL,那么本身就不能安心去休息,须要去找个安心的休息点,同时能够再尝试下看有没有机会轮到本身拿号。

parkAndCheckInterrupt()

  若是线程找好安全休息点后,那就能够安心去休息了。此方法就是让线程去休息,真正进入等待状态。

private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);//调用park()使线程进入waiting状态
     return Thread.interrupted();//若是被唤醒,查看本身是否是被中断的。
 }

好了,咱们能够小结下了。

看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),如今让咱们再回到acquireQueued(),总结下该函数的具体流程:

1.结点进入队尾后,检查状态,找到安全休息点;
2.调用park()进入waiting状态,等待unpark()或interrupt()唤醒本身;
3.被唤醒后,看本身是否是有资格能拿到号。若是拿到,head指向当前结点,并返回从入队到拿到号的整个过程当中是否被中断过;若是没拿到,继续流程1。
最后咱们再回到前面的acquire方法来总结下

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

总结下它的流程吧

1.调用自定义同步器的tryAcquire()尝试直接去获取资源,若是成功则直接返回;
2.没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3.acquireQueued()使线程在等待队列中休息,有机会时(轮到本身,会被unpark())会去尝试获取资源。获取到资源后才返回。若是在整个等待过程当中被中断过,则返回true,不然返回false。
4.若是线程在等待过程当中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

Lock.unlock()

  好了,lock方法看完后,咱们再来看下unlock方法

release(int)

  它会释放指定量的资源,若是完全释放了(即state=0),它会唤醒等待队列里的其余线程来获取资源。这也正是unlock()的语义,固然不只仅只限于unlock()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

tryRelease(int)

  此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

public final boolean release(int arg) {
        if (tryRelease(arg)) {//这里是先尝试释放一下资源,通常均可以释放成功,除了屡次重入但只释放一次的状况。
            Node h = head;
            //这里判断的是 阻塞队列是否还存在和head节点是不是tail节点,由于以前说过,队列的尾节点的waitStatus是为0的
            if (h != null && h.waitStatus != 0)
                //到这里就说明head节点已经释放成功啦,就先去叫醒后面的直接节点去抢资源吧
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
    //这里,node通常为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,容许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个须要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//若是为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)//从这里能够看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这个函数并不复杂。一句话归纳:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里咱们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即便p!=head也不要紧,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已是等待队列中最前边的那个未放弃线程了,那么经过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),而后s把本身设置成head标杆结点,表示本身已经获取到资源了,acquire()也返回了

  好了,到这咱们就由于把源码看完了,再回头来看下这张图

是否是就清楚了AQS究竟是怎么实现的咱们上面的猜测的了吧。那么对应的下课后让你本身去看
这几个的源码,你是否是就应该能看懂了,好了本文就介绍到此,本文对你有帮助的欢迎关注点赞,谢谢

相关文章
相关标签/搜索