或许我能够把AQS讲清楚

AQS是JUC包中许多类的实现根基,这篇文章基于我的理解的前提下完成,因此在结构上跟其余AQS文章有些差别。html

1 AQS内脏图

tips:若是只是想看AQS的实现的话能够从第三节开始看,前面只是讲结构和使用java

1.1 总体结构

  在开始了解AQS以前,先看下AQS的内部结构,这样在看实现代码的时候至少有一个总体的概念,重点要记住的是Node类几种状态的做用,其余结构有个概念就行。node

1575639520339

  如上,在AQS中大体有:git

  1. state变量:内部维护了一个volatile修饰的资源变量state,能够简单的理解为锁,拿到资源就是拿到锁。
  2. 同步队列(CLH):全部关于资源的抢夺都是在这个队列中发生的;在AQS中只存了头尾节点,本质上就是一种双向链表,队列为先进先出队列(FIFO),也就是说对于资源state的争夺都是按照队列中的顺序来的,另外能参与资源争夺的队列只有有效的节点(节点状态不为取消或者同步)
  3. 等待队列:跟同步队列相似,只有头尾节点,不一样是的其在一个内部类ConditionObjet中,也就是说一个ConditionObject对象就是一个等待队列,因此容许有多个。处于等待队列中的节点不会参与资源的竞争,其状态为CONDITION,当节点被标记为CONDITION时(await方法)其会从同步队列中移除,加入对应的等待队列,而若是等待队列中的节点被唤醒(例如调用condition.signalAll())时会节点从新被放入同步队列尾部参与资源的竞争(ReentrantLock按组唤醒线程的实现原理就是这个)。

1.2 内部类Node

  在AQS中,内部类有两个:NodeConditionObjectNode是队列的实现根基,里面存放了许多重要的信息,如操做的线程、线程竞争的状态(特别重要)等;而ConditionObject则是Condition接口的实现类,用来实现唤醒指定线程组的(等待队列)。设计模式

  关系以下图(下方的Waiter节点也是Node节点,这里为了便于区分取名不一样):框架

1575905757547

Node内部类AQS两个队列的实现节点。ide

  • waitStatus :节点状态,取值为-3~1(整数)。当状态为1时表示没用了,其余状态表示是有用的。

    0:初始状态或者不表明任何意义时的取值。测试

    SIGNAL(-1):这个状态通常由下一个节点来设置,表明的意思是当前节点在释放了资源后将后续节点的线程唤醒。(大白话就是后续节点拜托前方的大哥东西用完了叫他,他先去睡会儿)ui

    CONDITION(-2):表示节点处于等待队列中,等待队列中的节点不会参与资源竞争,必须从等待队列出来后从新加入同步队列才能参与竞争。this

    PROPAGATE(-3):在共享模式的时候用到。共享模式下,不只只是唤醒下个节点,还可能唤醒下下个节点(根据当前剩余资源state的值可否知足最近节点的需求决定)。

    CANCELLED(1):表示该节点没用了,多是等过久了,也多是其余缘由,总之就是废了,处于该状态的节点不会再改变,因此AQS中常常会判断节点状态是否大于0来检查节点是否还有用。

  • thread:争夺资源的线程,存放在节点当中。
  • prev:同步队列中的上一个节点。
  • next:同步队列的下一个节点。
  • nextWaiter:下一个等待节点,用来实现等待队列。

2 简单的使用AQS

  如今对AQS有了模模糊糊的了解,来看看要如何使用这个框架。其采用模板设计模式实现,定义了许多顶级方法如acquirerelease等,这些方法子类不能重写可是能够调用,而要正确的使用这些方法则要按照其要求重写一些方法如tryAcquire顶级方法内部调用了开放方法)。

  能够重写的方法有tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively共五种,每一个方法里面没有具体的实现,反而是直接抛出了异常,可是不必定要所有重写,比方说只重写tryAcquiretryRelease则表示要实现的是独占模式的锁。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

  这些方法表示尝试去获取资源或者释放资源,其实现必需要跟state资源状态相关,举个例子,tryAcquire方法表示以独占的方式尝试获取资源,若是获取到了那么其余线程不得操做其资源,其中入参的arg则表示想要获取到的资源数量,例如我tryAcquire(5)成功了,那么状态变量state变量则增长5,若是tryRelease(5)成功则state状态变量减小5,等到state==0的时候则表示资源被释放,便可以理解为锁被释放。

  若是只是使用AQS的话,再加上几个变动状态的方法就能够了,咱们不须要了解更多的东西,如同AQS的文档给出的案例通常,简单的重写几个方法即可以实现一种锁,以下,一个不可重入锁的简单实现。

class Mutex implements Lock, java.io.Serializable {

   // 同步内部类,锁的真正操做都是经过该类的操做 
   private static class Sync extends AbstractQueuedSynchronizer {
     // 检查当前是否已经处于锁定的状态
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // 若是资源变量为0,则获取锁(资源)
     public boolean tryAcquire(int acquires) {
       // acquires的值只能是1,不然的话不进入下面代码
       assert acquires == 1;
       if (compareAndSetState(0, 1)) {
         // 设置持有当前锁的线程
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // 经过将状态变量state设定为0来表示锁的释放
     protected boolean tryRelease(int releases) {
       // 传入的参数只能是1,不然是无效操做
       assert releases == 1; 
       // 若是状态状态等于0,说明不是锁定状态
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // 提供Condition,返回其AQS内部类ConditionObject
     Condition newCondition() { return new ConditionObject(); }

     // 反序列化
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // 内部类已经实现了全部须要的方法,咱们只要封装一层就行
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

进行一个小测试

public static void main(String[] args) {
    Lock lock = new Mutex();
    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("得到锁线程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.err.println("3秒过去....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "释放锁");
        }
    }).start();

    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("得到锁线程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.err.println("3秒过去....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "释放锁");
        }
    }).start();
}

最终的结果图以下

1575643000409

  这样就实现了一个不可重入锁,是否是看起来很简单?

3 AQS的内部实现

  首先要先明白的是AQS分为两种模式——独占模式共享模式。通常来讲只会用到其中一种,两种模式的资源竞争都是在同步队列中发生的,不要跟等待队列混淆。

独占模式:每次只能容许一个节点获取到资源,每次释放资源以后也只会唤醒后驱节点。

共享模式:每次能够容许多个节点按照顺序获取资源,每次释放头节点资源后可能会唤醒后驱的后驱。(下方讲实现的时候有解释)

3.1 独占式释放资源——acquire

来看acquire方法(若是讲的不是容易让人理解,能够结合后方的流程图一块儿),ReentrantLocklock就是这个方法,能够类比理解。

  在看代码须要明确知道的是,tryAcquiretryRelease这些操做才是对资源的获取和释放AQS中的顶级方法如acquire的做用只是对资源获取操做以后的处理。

// 代码逻辑不复杂,首先尝试获取资源,若是成功则直接返回,失败则加入同步队列争夺资源
public final void acquire(int arg) {
    // 尝试得到锁,若是失败了则增长节点放入等待队列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

  能够看到总体的方法十分简单,就在一个if条件中调用了3个方法,tryAcquire就不说了,先说下addWaiter作了什么,addWaiter方法将当前线程封装成一个节点放入同步队列的尾部,若是失败就不断的尝试直到成功为止,其方法代码以下。

private Node addWaiter(Node mode) {
    // 将当前线程封装入一个节点之中,mode表明共享模式仍是独占模式
    Node node = new Node(Thread.currentThread(), mode);
    
    // 首先尝试一次快速的尾随其后,若是失败的话则采用正常方式入队
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 入队操做
    enq(node);
    return node;
}

再看下正常的入队操做

private Node enq(final Node node) {
    // 自旋
    for (;;) {
        Node t = tail;
        // 若是同步队列是空的话则进行队列的初始化
        if (t == null) { 
            // 这里注意初始化的时候head是一个新增的Node,其waitStatus为0
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不然的话尝试设置尾节点,失败的话从新循环
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  能够看出正常入队比快速入队也就多出来了自旋和初始化操做,其余的大体逻辑都是类似的。再看看acquire中的另外一个方法acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    // 默认获取失败
    boolean failed = true;
    try {
        /*
         * 线程打断标识,咱们知道使用interrupt()方法只是改变了线程中的打断标识变量,
         * 并不能打断正在运行的线程,而对于这个打断变量的处理通常有两种方式,
         * 一种是记录下来,一种是抛出异常,这里选择前者,而可打断的acquire则是选择后者
         */
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 拿到前驱节点
            final Node p = node.predecessor();
            // 若是前驱节点为头节点则尝试一次获取
            // 再次强调下,获取资源的操做是在tryAcquire中
            if (p == head && tryAcquire(arg)) {
                // 设置当前节点为头节点,而后设置prev节点为null
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 返回中断标识
                return interrupted;
            }
            // 获取资源失败了,判断当前线程的节点是否应该休息
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 若是是由于中断被唤醒的,要记录下来,以后acquire方法要补上中断
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 看看是否应该去休息这个方法中作了啥
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
    	// 若是前节点状态为SIGNAL,那么表示能够安兴去休息了,到了时候前驱节点会叫醒你的,返回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            /*
             * 状态大于0,则表示节点已经取消做废,那么须要一直往前找直到找到有效的节点
             * 这时还不能去休息,要是前驱节点是头结点又刚好头结点释放了资源,那么你不就
             * 不用挂起就能够拿到资源了,因此返回false,再循环一次
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 其余状况则表示前驱节点有效,将前驱节点状态设置尾SIGNAL,表示麻烦他到时候
             * 叫醒你。这里还不能够去休息,由于有可能前驱节点恰好变成了头结点又恰好执行完
             * 释放了资源,这时去休息岂不是亏了,因此返回false
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

// 若是上面的方法判断须要休息,那么将线程挂起
private final boolean parkAndCheckInterrupt() {
    	// 使用park方法将线程挂起
        LockSupport.park(this);
    	// 在上面咱们提到线程的打断标识,interrupted()方法返回后会重置这个标识
        return Thread.interrupted();
}

  光看代码可能有点绕(整个流程能够看下方的流程图),从新理一下逻辑:

​ 首先明确这个方法是不断自旋不会退出的,除非成功拿到资源,若是拿不到资源就挂起等待。(不考虑特殊状况)

整个流程的逻辑:

  1. 判断前驱节点是否为头节点,若是是头节点则尝试获取资源,成功了返回中断标识,失败了进行2。使用前驱节点判断的缘由是由于头结点 不会进到这个方法来;不是头结点还要去获取资源是由于要是在这个过程当中恰好头结点释放了资源,那么你就不用再去挂起傻傻等待了,节省了系统资源消耗。
  2. 进入shouldParkAfterFailedAcquire方法,这个方法的做用就是判断你当前这个线程能不能去休息(挂起),而能够去休息的标志就是前驱节点的状态为SIGNAL,这个状态表明前驱节点释放资源后会唤醒你。
    • 1 判断前驱节点状态是否为SIGNAL,若是是直接返回true,能够去休息了
    • 2 若是前驱节点状态>0,表示做废,那么将一直往前找直到找到一个有效的节点,而后进行链接,这时还不能去休息,要是前驱节点是头结点呢是吧,因此返回false。也就是在这个阶段中清理了同步队列中那些没用的节点,由于他们引用断了,以后GC会回收它们。
    • 3 将前驱节点的状态设置为SIGNAL,表示你准备去休息了要麻烦他叫醒你,而后先别休息,要是前驱节点这时候变成了头结点又进行了资源释放,那就能够省去挂起的操做直接获取资源了,因此要再循环一次看看,返回false
  3. 根据是否应该休息方法shouldParkAfterFailedAcquire的结果判断是否把线程挂起,若是返回true那么执行parkAndCheckInterrupt方法把线程挂起,若是是false那么则再循环一次。parkAndCheckInterrupt方法的做用是挂起线程,而后醒来的时候返回是否是由于被中断而醒来的,若是是的话,那么将interrupted字段赋值为true,在整个acquire方法结束的时候会根据这个标识来决定是否进行线程的自我中断

再回来看下acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 根据返回的中断标识决定是否执行下方的自我中断
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

  整个acquire的流程大体为

img

  独占式获取资源的主要方法差很少就是这样,还有可打断的独占式获取方法acquireInterruptibly,代码以下,其实现基本相同,只是对于咱们方才说的打断标识的处理从记录改为了抛出异常,因此才是可打断的,有兴趣能够本身再看下,基本逻辑相同,看起来也就耗费不了多少时间。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 抛出异常处理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

3.2 独占式释放资源——release

  了解完获取资源天然知道释放资源的过程,相对来讲释放资源要相对容易一些,大体逻辑为尝试释放资源,若是成功了,则改变节点的状态而且唤醒下一个可用节点(通常是下一个,可是可能出现下一个节点已经被取消的状况)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 修改线程的状态,而且唤醒下一个节点进行资源竞争
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
    // 改变节点状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 唤醒下一个可用节点,通常来讲是下一个节点,可是可能出现下个节点被取消
     * 或者为空的状况,这个时候就要从尾结点向前遍历直到找到有效的节点(从尾节点向前遍历
     * 是由于不管下个节点是空仍是取消的节点,正向遍历都不可能走得通了,取消的节点的next
     * 就是其自己,因此只能从后面开始往前遍历)
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到下个节点以后将其唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

  release的流程图以下:

img

3.3 共享模式——acquireShared

  在上面咱们讲的都是独占模式的获取和释放处理,那接下来看看共享模式是怎么实现的。首先理解AQS中共享模式的概念,其表明资源能够被队列中的多个节点按照顺序得到,什么意思呢?

  举个例子,咱们设置资源变量为3(state=3),首先头结点使用tryAcquireShared(1)获取到了一个资源,那么还剩下2个,这两个能够给头结点的后驱节点使用,若是后驱节点的需求是2那么获取成功并将本身设置为头结点同时断开跟原头结点的链接,可是若是需求是3的话则进入等待状态直到可获取的资源量达到其要求为止,这时就算后续的需求量是1也不会给后续节点,这就是按照顺序得到的意思。例子图以下:

img

  okay,那来看下共享模式下的实现,先看acquireShared方法:判断资源是否获取成功,是的话直接结束,不是的话进入队列进行资源竞争。须要注意的是tryAcquireShared返回值的语义:负值表明失败,其余表明成功而且当前还可获取的资源量。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

看看doAcquireShared作了什么

// 仍是强调一次,这些方法只是善后处理,资源的获取仍是在tryAcquireShared方法
private void doAcquireShared(int arg) {
    /*
     * 整个流程跟acquire方法有些相似,不一样点是其获取到资源后
     * 会唤醒后驱线程
     */
    
    // 加入队列尾,再也不赘述
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 一样记录一个打断标识
        boolean interrupted = false;
        for (;;) {
            // 前驱节点
            final Node p = node.predecessor();
            if (p == head) {
                // 若是前驱节点是头结点,那么尝试一次获取资源,根据其返回的值判断执行不一样操做
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 非负值表明资源获取成功,将本身设为头结点后唤醒后驱节点争取资源
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 跟acquire不一样的是,其补打断的地方在方法内层,再也不放外面
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    // 处理结束后就退出了
                    return;
                }
            }
            // 这里跟acquire同样,判断是否能够休息,休息后被唤醒后补充interrupted标识
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

看看获取资源成功后对后续节点的操做

/**
 * @param node 当前节点
 * @param propagate 当前剩余的资源量
 */
private void setHeadAndPropagate(Node node, int propagate) {
    // 记录原头结点
    Node h = head; 
    // 注意这里设置头结点的变化,这里要结合3.3一开始的例子图来理解
    /** setHead方法体:
      * head = node;
      * node.thread = null;
      * node.prev = null;
      */
    setHead(node);
    
    // 此时头结点已经变为当前节点了
    
    /*
     * 存在如下三种状况时唤醒当前节点后驱节点
     * 1.剩余资源量>0
     * 2.node的原前驱节点(即原头节点)释放了资源, == null表示释放完被回收了,<0则表示PROPAGATION
     *   状态,释放以后会将节点状态设置为PROPAGATION
     * 3.头结点可能再次发生了改变而且也释放了资源(竞争激烈的时候发生)
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 叫醒后续节点争夺资源,这个方法是释放方法的主要方法,放在下节讲
            doReleaseShared();
    }
}

  okay,到这里就是共享模式的acquireShared方法,总结一下逻辑:

  1. 尝试获取锁是否成功,是则结束,不然进入2
  2. 同acquire同样先来个自旋,判断前驱节点是否为头结点,不是的话挂起线程等待唤醒,是的话进入3
  3. 尝试获取资源,成功了唤醒后续线程,方法结束;失败了挂起线程等待唤醒

  线程被唤醒后重复2操做,如下是流程图:

img

3.4 共享模式——releaseShared

  直接上代码吧

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 这个方法理解为唤醒,不要理解为释放资源
        doReleaseShared();
        return true;
    }
    return false;
}

看看唤醒方法作了啥子

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 根据节点状态判断执行什么操做
            if (ws == Node.SIGNAL) {
                /*
                 * 若是是SIGNAL那么表示其后驱节点处于挂起的状态
                 * 使用CAS改变状态后唤醒后驱节点,失败则再次循环(说明被其余线程先执行了该方法)
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                // 唤醒线程,前面已经说过,再也不赘述
                unparkSuccessor(h);
            }
            // 将当前节点设置为PROPAGATE,失败则再次循环
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        // 若是头节点改变了,说明唤醒操做是其余线程作的,此时要再次循环
        if (h == head)                   
            break;
    }
}

  共享模式的release方法在咱们看过以前的方法后就简单得多了,这里就再也不画流程图了,到此AQS的两个模式和实现暂时告一段落。

总结

  整篇文章可能有些长,先讲了AQS内部的一些结构,而后使用AQS实现了简易的不可重入锁,接着接下来将AQS的两个模式和实现。

  两个模式的实现思路大体是相同的,可是方式不一样,独占模式每次只容许一个节点获取到资源,而共享模式则容许多个节点按照顺序获取;双方释放后的善后操做也不一样,独占模式只唤醒后驱节点,而共享模式则可能唤醒后驱的后驱(资源充足的状况)。

冲!冲!冲!

参考:http://www.javashuo.com/article/p-xcevmtwv-gz.html

https://snailclimb.gitee.io/javaguide/#/docs/java/Multithread/AQS

相关文章
相关标签/搜索