源码分析JDK8之AbstractQueuedSynchronizer

前言

源码分析我认为主要有两个做用:知足好奇心,我想每个有追求的人都不会知足于仅仅作一个API Caller实现功能就好,咱们也想知道它究竟是怎么实现的;借鉴与升华,当咱们明白了一个类的设计原理,在必定的情境下咱们能够借鉴其设计哲学,甚至针对咱们本身特殊的业务场景对其进行改良与优化。html

下面我就以这篇文章开启个人源码阅读之旅。整体而言,我会从这个类基本结构入手,而后分析原理,再看看已有的应用,并进行分析与理解。java

我以前一篇文章里提到过java的显示锁ReentrantLock。此外,若是你编写过并发程序那你通常也应该用过CountDownLatch,Semaphore等等,这些都是同步器,而它们都基于AbstractQueuedSynchronizer(简称AQS)实现的,那么咱们今天就来看看这个牛逼的AQS是怎么实现这么多功能的。node

首先打开IDEA,随便新建一个类,而后输入CountDownLatch,在它上面敲下Ctrl+B,就打开了CountDownLatch的源码,而后发现有一个很是重要的静态内部类Sync继承了AbstractQueuedSynchronizer,再次Ctrl+B,咱们就打开了AQS的源码,立刻就能够解开它的神秘面纱了,哼哼。segmentfault

映入眼帘的首先就是大段大段的文档,大意就是这个类 提供了一个基于FIFO队列的实现了阻塞锁和相关同步器(信号量,事件等)的框架...... 读完了大概就了解这个类究竟是怎么工做的了。下面咱们开始分类型研究源码,固然不可能所有分析一遍,这里只把重点的列出来。
实际代码分析中,我通常先看看这个结构图:api

而后读一读开始的综述文档,而后从实例开始,像方法调用那样依次深刻查看,就能依次看到相关的方法、内部类和属性,仍是Ctrl+B大法好啊,这属于自底向上的源码分析方法。若是直接从上面那张图开始,对属性、方法、内部类挨个分析就属于自顶向下的分析法了。我以为对一个陌生的东西要想有清晰的认知最好先自底向上捋一遍,便于搞清楚一个个具体功能的实现机制,而后再自顶向下看一遍,便于把控总体架构,宏观把握。这样走两遍再来总结一下就能比较透彻的掌握该技术了。架构

1、方法与属性

方法中,protected类型的通常要求具体的同步器子类来实现可是有些也能够直接用,public类型通常都是能够直接使用的固然也能够本身实现,private就是AQS本身的内部实现了,与具体子类无关。并发

state相关

一个private volatile int state;属性表明了线程之间争用的资源。与之相关的方法有三个oracle

protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)//CAS原子性地修改state

都是protected类型,可见咱们能够进行Override,来定义state的获取与释放从而实现咱们自定义的同步器。很是简单就不把所有源码摆出来了。框架

同步队列queue相关

这个queue是一个FIFO的队列,每一个节点都是下面的内部类Node类型,等待着state这个资源,主要由两个属性决定private transient volatile Node head;private transient volatile Node tail; 与之相关的方法有:ide

// 节点node进入队列,采用CAS的方式,返回其前驱
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,先初始化
            if (compareAndSetHead(new Node()))//设置头结点
                tail = head;
        } else {// 队列不为空
            node.prev = t;// 插入节点至队列尾部
            if (compareAndSetTail(t, node)) {//CAS修改队尾为node,之因此CAS是由于可能有多个线程争相入队
                t.next = node;
                return t;
            }
        }
    }
}
// 将当前线程以mode的方式(EXCLUSIVE或者SHARED)构成新节点并入队,返回这个新节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 更快的入队方式,若是失败再采用较慢的标准入队方式enq
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 把node设置为新的头,老的头出队
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

资源获取与释放相关

资源获取分为EXCLUSIVESHARED两种模式,对应acquirereleaseacquireSharedreleaseShared

首先是EXCLUSIVE资源获取:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里tryAcquire须要继承类本身实现(成功true,失败false),若是tryAcquire成功则直接返回,不然addWaiter将当前线程以独占节点的方式置于同步队列尾部等待。acquireQueued使得该节点等待获取资源,一直获取到资源才返回,整个等待过程当中若是有中断是不响应的,可是获取资源后会用selfInterrupt补上。

// 节点得到资源才能返回不然一直自旋,中断该线程不会实时响应,可是若是被中断过会返回true,不然返回false
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// node前驱是头结点,那么即可以尝试去获取资源了
                setHead(node);// 获取成功,能够把node设为头结点,也就是说头结点是独占资源的惟一拥有者
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 走到这里说明获取失败,检查是否应该阻塞和中断
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//若是失败了,就把waitStatus置为CANCELLED表示取消了
    }
}

// 获取资源失败后,当前节点是否应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)// 前驱pred得到资源后会通知当前节点node,因此能够放心的阻塞了(waitStatus会在下面内部类解释)
        return true;
    if (ws > 0) {// 前驱取消了资源获取,那么当前节点就要找到前面最近一个正在等待的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;//此处 pred.waitStatus < 0,亦即pred 还在等待尝试获取资源
    } else {// 前驱正在等待,则设置其状态为SIGNAL,让他获取资源后通知本节点,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        // 可是本节点不能立刻阻塞,由于设置不必定能成功,须要下次再次检查
    }
    return false;
}

// 阻塞本线程。被唤醒后要返回本线程是否被中断过。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

而后是EXCLUSIVE资源释放:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

与上面相对应,这里tryRelease也须要继承类本身实现(成功true,失败false),若是释放成功,则调用unparkSuccessor唤醒后继节点返回true,不然返回false。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)// 可能须要释放通知信号,把状态置零,容许失败
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;// 找到后继节点
    if (s == null || s.waitStatus > 0) {// 若是后继节点为空或者已经取消
        s = null;// 确保该节点的释放
        for (Node t = tail; t != null && t != node; t = t.prev)// 从队尾开始找到须要通知的最近的后继节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)// 若是需唤醒的后继节点存在则唤醒之
        LockSupport.unpark(s.thread);
}

再看SHARED资源获取:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这里tryAcquireShared也须要本身实现(负值说明失败,非负值表示获取成功后剩下的可用资源数),若是获取失败就调用doAcquireShared进入同步队列等待。

// 等待获取共享资源时不响应中断,可是获取资源成功后会用selfInterrupt补上
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);// 入队尾
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {// 处于队列第二个位置,能够尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 获取成功
                    setHeadAndPropagate(node, r);// 将本身设为队列头,并唤醒可能获取资源的后面几个节点
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 同acquireQueued的分析
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 旧的头
    setHead(node); // 设置新的头
    // 若是还有资源,则唤醒下一个,采用保守策略,多唤醒几回即便没获取到资源也无所谓,尽可能作到不漏掉资源
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {        
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

最后SHARED资源释放:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

这里tryReleaseShared依然要本身实现(若是能够容许下一个节点得到资源则返回true,不然false),若是释放成功则调用doReleaseShared唤醒后继节点。须要注意的是tryReleaseShared因为可能多个线程并发操做因此通常须要CAS而tryRelease不须要。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {// 须要唤醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//设置WaitStatus失败
                    continue;            
                unparkSuccessor(h);// 必定要设置成功才唤醒
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;// CAS设置失败则继续循环
        }
        if (h == head)// 头变了,不须要继续唤醒
            break;
    }
}

此外,资源获取除了一直等待的方式以外还有对应的限制等待时间的方法如tryAcquiretryAcquireNanos,没必要多言,释放就只有一直等而没有限制等待时间的了。也有响应中断与不响应的对应,如acquireInterruptiblyacquire,差异不大,没必要多言。

2、内部类

Node

等待队列的节点类,等待队列是CLH(Craig,Landin,Hagersten)锁队列的一种变体,CLH锁一般用来做为自旋锁。

每一个节点主要维护了下面一些状态

  • 对应的线程thread
  • 等待状态waitStatus 含,0:初始状态;CANCELLED 1:被取消;SIGNAL -1:当前线程释放资源或取消后须要唤醒后继节点;CONDITION -2:条件等待;PROPAGATE -3:下一个acquireShared操做应该被无条件传播。实际使用中,通常只关注正负,非负数就意味着节点不须要释放信号
  • 资源获取模式有SHARED(默认)和EXCLUSIVE两个
  • 同步队列中的前驱后继节点prevnext
  • 做为同步队列节点时,nextWaiter有:EXCLUSIVESHARED标识当前节点是独占模式仍是共享模式;与ConditionObject搭配使用做为条件等待队列节点时,nextWaiter保存后继节点。因此实际上这个Node类是被复用了,既用于同步队列,也用于条件等待队列

ConditionObject

这个类实现了Condition接口,主要用来完成常见的条件等待、唤醒等操做。一个ConditionObject 包含一个等待队列,由firstWaiterlastWaiter决定。当前线程调用Condition.await()方法时,会被构形成为节点,而后置于条件等待队列队尾。
咱们看最经常使用的条件等待方法

// 条件等待,响应中断
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();// 响应中断
        Node node = addConditionWaiter();// 当前线程加入条件等待队列
        int savedState = fullyRelease(node);// 释放资源,并得到本节点须要的资源数,以便再次获取
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {// 若是当前节点不是在同步队列,也就是还在等待队列等待着条件发生
            LockSupport.park(this);// 就会一直阻塞
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//加入同步队列等待获取资源,成功后要检查中断响应
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 若是最后一个条件等待节点是取消的状态
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();// 清理整个链路的无效节点
            t = lastWaiter;
        }
        //以条件等待的方式将当前线程封装成节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)//条件等待队列为空就初始化
            firstWaiter = node;
        else// 队列不空,插入队尾
            t.nextWaiter = node;
        lastWaiter = node;
        return node;// 返回新插入的节点
    }

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 本节点须要的资源数
            if (release(savedState)) {// 释放掉这么多资源
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

而后是信号方法:

public final void signal() {
        if (!isHeldExclusively())// 要使用该方法必须先是独占线程
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);//找到第一个条件等待节点,并发出信号
    }
    
    // 去掉条件等待队列的节点,直到赶上没取消的或者空节点
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 节点已被取消
            return false;
        Node p = enq(node);// 条件等待队列的第一个节点被加入同步队列的队尾
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);// 唤醒节点对应线程
        return true;
    }

3、已有应用分析

下面用两个例子来看看AQS的具体使用场景,分别是使用独占模式的ReentrantLock和共享模式的CountDownLatch
通常使用AQS的类,都会用一个内部类Sync来继承AQS,并实现那几个protected的方法。

ReentrantLock

ReentrantLockFairSyncNonfairSync两个类来实现公平锁和非公平锁,咱们看非公平锁,主要几个方法是

  • lock(),使得NonfairSync调用compareAndSetState把state从0设为1并用setExclusiveOwnerThread把当前线程设为独占线程(亦即首次得到锁),若是失败则使用acquire(1)调用nonfairTryAcquire。整体流程就是若是state为0,那么就是本线程首次得到锁,把state置为1,不然若是当前线程是独占线程则将state+1(这也是锁可重入的关键),若是都不是就进入acquireQueued流程等待得到锁了了
  • unlock(),调用AQS的release(1)方法,其实是调用了Sync的tryRelease(1)方法,若是state-1为0,那么返回true,不然返回false。也就是说,重入锁必须释放够重入次数才算真正释放成功,可是unlock()方法自己不会管这个最终结果,只管释放
  • tryLock(),与lock()区别是不等待,当即返回,只有唤醒时就是独占线程才能返回true,实现方法是nonfairTryAcquire
  • newCondition()直接返回了了AQS的内部类ConditionObject
  • isLocked() 若是state为0则表示未加锁返回false,不然返回true

CountDownLatch

CountDownLatch 主要几个方法是

  • CountDownLatch(int count),构造方法,设置 AQS 的 state 为 count
  • await(),调用 AQS 的 acquireSharedInterruptibly(int arg) 方法,而后调用本身覆盖的tryAcquireShared(int acquires)来得到state的值是否为0,若是是0就结束等待直接返回了,若是不是0就调用 AQS 的 doAcquireSharedInterruptibly(int arg)方法,该方法会循环等待,直到state为0才返回或者被中断。
  • countDown(),调用 AQS 的 releaseShared(int arg) 方法,其实是调用了本身覆盖的 tryReleaseShared(int releases) 方法,把 state 减了1,若是此时state为0,则调用 AQS 的doReleaseShared()方法

分析

整体而言,AQS提供了一个模板方法模式,将得到锁释放锁一些必要的流程操做都规定好了,咱们只须要填充一些具体的得到与释放方法

  • getState(),setState(int newState),compareAndSetState(int expect,int update):是资源相关操做,保证原子性
  • tryAcquire(int arg):尝试独占获取资源。成功返回true,失败返回false。
  • tryRelease(int arg):尝试独占释放资源。成功返回true,失败返回false。
  • tryAcquireShared(int arg):尝试共享获取资源。负数表示失败,非负数表示成功表明剩余可用资源
  • tryReleaseShared(int arg):尝试共享释放资源。若是释放后能够唤醒后续等待结点返回true,不然返回false。
  • isHeldExclusively():表明当前线程是否独占资源,只有用到Condition之时才须要去实现它。

自定义同步器时,通常都是本身写一个 static class Sync extends AbstractQueuedSynchronizer 静态内部类来实现具体的方法。

阅读原文:MageekChiu

相关文章
相关标签/搜索