深刻浅出AQS之条件队列

相比于独占锁跟共享锁,AbstractQueuedSynchronizer中的条件队列可能被关注的并非不少,但它在阻塞队列的实现里起着相当重要的做用,同时若是想全面了解AQS,条件队列也是必需要学习的。node

原文地址:http://www.jianshu.com/p/3f8b...segmentfault

这篇文章会涉及到AQS中独占锁跟共享锁的一些知识,若是你已经对这两块内容很了解了,那就直接往下看。不然在读本文以前仍是建议读者先去看看我以前写的两篇文章温习一下。
深刻浅出AQS之独占锁模式
深刻浅出AQS之共享锁模式并发

1、使用场景介绍

区别于前面两篇文章,可能以前不少人都没有太在乎AQS中的这块内容,因此这篇文章咱们先来看下条件队列的使用场景:源码分析

//首先建立一个可重入锁,它本质是独占锁
private final ReentrantLock takeLock = new ReentrantLock();
//建立该锁上的条件队列
private final Condition notEmpty = takeLock.newCondition();
//使用过程
public E take() throws InterruptedException {
        //首先进行加锁
        takeLock.lockInterruptibly();
        try {
            //若是队列是空的,则进行等待
            notEmpty.await();
            //取元素的操做...
            
            //若是有剩余,则唤醒等待元素的线程
            notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        //取完元素之后唤醒等待放入元素的线程
    }

上面的代码片断截取自LinkedBlockingQueue,是Java经常使用的阻塞队列之一。
从上面的代码能够看出,条件队列是创建在锁基础上的,并且必须是独占锁(缘由后面会经过源码分析)。学习

2、执行过程概述

等待条件的过程:ui

  1. 在操做条件队列以前首先须要成功获取独占锁,否则直接在获取独占锁的时候已经被挂起了。
  2. 成功获取独占锁之后,若是当前条件还不知足,则在当前锁的条件队列上挂起,与此同时释放掉当前获取的锁资源。这里能够考虑一下若是不释放锁资源会发生什么?
  3. 若是被唤醒,则检查是否能够获取独占锁,不然继续挂起。

条件知足后的唤醒过程(以唤醒一个节点为例,也能够唤醒多个):this

  1. 把当前等待队列中的第一个有效节点(若是被取消就无效了)加入同步队列等待被前置节点唤醒,若是此时前置节点被取消,则直接唤醒该节点让它从新在同步队列里适当的尝试获取锁或者挂起。

注:说到这里必需要解释一个知识点,整个AQS分为两个队列,一个同步队列,一个条件队列。只有同步队列中的节点才能获取锁。前面两篇独占锁共享锁文章中提到的加入队列就是同步队列。条件队列中所谓的唤醒是把节点从条件队列移到同步队列,让节点有机会去获取锁。线程

2、源码深刻分析

下面的代码稍微复杂一点,由于它考虑了中断的处理状况。我因为想跟文章开头的代码片断保持一致,因此选取了该方法进行说明。若是只想看核心逻辑的话,那推荐读者看看awaitUninterruptibly()方法的源码。指针

//条件队列入口,参考上面的代码片断
        public final void await() throws InterruptedException {
            //若是当前线程被中断则直接抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //把当前节点加入条件队列
            Node node = addConditionWaiter();
            //释放掉已经获取的独占锁资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //若是不在同步队列中则不断挂起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //中断处理,另外一种跳出循环的方式
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //走到这里说明节点已经条件知足被加入到了同步队列中或者中断了
            //这个方法很熟悉吧?就跟独占锁调用一样的获取锁方法,从这里能够看出条件队列只能用于独占锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //走到这里说明已经成功获取到了独占锁,接下来就作些收尾工做
            //删除条件队列中被取消的节点
            if (node.nextWaiter != null) 
                unlinkCancelledWaiters();
            //根据不一样模式处理中断
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

流程比较复杂,一步一步来分析,首先看下加入条件队列的代码:code

//注:1.与同步队列不一样,条件队列头尾指针是firstWaiter跟lastWaiter
        //注:2.条件队列是在获取锁以后,也就是临界区进行操做,所以不少地方不用考虑并发
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            //若是最后一个节点被取消,则删除队列中被取消的节点
            //至于为啥是最后一个节点后面会分析
            if (t != null && t.waitStatus != Node.CONDITION) {
                //删除全部被取消的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //建立一个类型为CONDITION的节点并加入队列,因为在临界区,因此这里不用并发控制
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        //删除取消节点的逻辑虽然长,但比较简单,就不单独说了,就是链表删除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

把节点加入到条件队列中之后,接下来要作的就是释放锁资源:

//入参就是新建立的节点,即当前节点
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //这里这个取值要注意,获取当前的state并释放,这从另外一个角度说明必须是独占锁
            //能够考虑下这个逻辑放在共享锁下面会发生什么?
            int savedState = getState();
            //跟独占锁释放锁资源同样,不赘述
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                //若是这里释放失败,则抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            //若是释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中只须要判断最后一个节点是否被取消就能够了
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

走到这一步,节点也加入条件队列中了,锁资源也释放了,接下来就该挂起了(先忽略中断处理,单看挂起逻辑):

//若是不在同步队列就继续挂起(signal操做会把节点加入同步队列)
     while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //中断处理后面再分析
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }
    //判断节点是否在同步队列中
    final boolean isOnSyncQueue(Node node) {
        //快速判断1:节点状态或者节点没有前置节点
        //注:同步队列是有头节点的,而条件队列没有
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
        if (node.next != null) 
            return true;
        //上面若是没法判断则进入复杂判断
        return findNodeFromTail(node);
    }

    //注意这里用的是tail,这是由于条件队列中的节点是被加入到同步队列尾部,这样查找更快
    //从同步队列尾节点开始向前查找当前节点,若是找到则说明在,不然不在
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

若是被唤醒且已经被转移到了同步队列,则会执行与独占锁同样的方法acquireQueued()进行同步队列独占获取。
最后咱们来梳理一下里面的中断逻辑以及收尾工做的代码:

while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //这里被唤醒多是正常的signal操做也多是中断
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }

     //这里的判断逻辑是:
     //1.若是如今不是中断的,即正常被signal唤醒则返回0
     //2.若是节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
     private int checkInterruptWhileWaiting(Node node) {
           return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
     }

     //修改节点状态并加入同步队列
     //该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列
     final boolean transferAfterCancelledWait(Node node) {
        //这里设置节点状态为0,若是成功则加入同步队列
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //与独占锁一样的加入队列逻辑,不赘述
            enq(node);
            return true;
        }
        //若是上面设置失败,说明节点已经被signal唤醒,因为signal操做会将节点加入同步队列,咱们只需自旋等待便可
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
     }

在把唤醒后的中断判断作好之后,看await()中最后一段逻辑:

//在处理中断以前首先要作的是从同步队列中成功获取锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
//因为当前节点多是因为中断修改了节点状态,因此若是有后继节点则执行删除已取消节点的操做
//若是没有后继节点,根据上面的分析在后继节点加入的时候会进行删除
if (node.nextWaiter != null) 
      unlinkCancelledWaiters();
if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);

//根据中断时机选择抛出异常或者设置线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
      if (interruptMode == THROW_IE)
           throw new InterruptedException();
      else if (interruptMode == REINTERRUPT)
           //实现代码为:Thread.currentThread().interrupt();
           selfInterrupt();
}

至此条件队列await操做所有分析完毕。signal()方法相对容易一些,一块儿看源码分析下:

//条件队列唤醒入口
   public final void signal() {
       //若是不是独占锁则抛出异常,再次说明条件队列只适用于独占锁
       if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
       //若是条件队列不为空,则进行唤醒操做
       Node first = firstWaiter;
       if (first != null)
            doSignal(first);
   }

   //该方法就是把一个有效节点从条件队列中删除并加入同步队列
   //若是失败则会查找条件队列上等待的下一个节点直到队列为空
   private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&(first = firstWaiter) != null);
   }

    //将节点加入同步队列
    final boolean transferForSignal(Node node) {
        //修改节点状态,这里若是修改失败只有一种可能就是该节点被取消,具体看上面await过程分析
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //该方法很熟悉了,跟独占锁入队方法同样,不赘述
        Node p = enq(node);
        //注:这里的p节点是当前节点的前置节点
        int ws = p.waitStatus;
        //若是前置节点被取消或者修改状态失败则直接唤醒当前节点
        //此时当前节点已经处于同步队列中,唤醒会进行锁获取或者正确的挂起操做
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

3、总结

相比于独占锁跟共享锁,条件队列多是最不受关注的了,但因为它是阻塞队列实现的关键组件,仍是有必要了解一下其中的原理。其实我认为关键点有两条,第一是条件队列是创建在某个具体的锁上面的,第二是条件队列跟同步队列是两个队列,前者依赖条件唤醒后者依赖锁释放唤醒,了解了这两点之后搞清楚条件队列就不是什么难事了。


至此,Java同步器AQS中三大锁模式就都分析完了。虽然已经尽力思考,尽可能写的清楚,但鉴于水平有限,若是有纰漏的地方,欢迎广大读者指正。明天就是国庆长假了,我本身也计划出国玩一趟,散散心。提早祝广大朋友国庆快乐。

相关文章
相关标签/搜索