初步了解AQS是什么(三)

前言

  1. 第一篇文章经过ReentranctLock的公平锁介绍了AQS的核心部分,第二篇文章经过Condition分析了独占锁是的具体的工做机制,还有公平锁和非公平锁的区别,也分析了中断机制在AQS的实现
  2. 本篇文章主要了解的是AQS的共享模式,有些内容须要前面两篇文章的内容,因此阅读以前能够先阅读以前的文章,这样效果更佳!
  3. 文章连接:<初步了解什么是AQS(一)><初步了解什么是AQS(二)>
  4. 本文章仅供我的学习用,但愿你们能够互帮互助!

共享锁和独占锁的区别

独占锁

  1. 独占的,排他的
  2. 当某个线程拥有独占锁,其余线程只能等待这个线程释放这个独占锁才能去争取,而且同一时刻只能有一个线程能够争取独占锁成功

共享锁

  1. 锁是共享的,能够被多个线程同时拥有
  2. 若是一个线程拥有了这个锁,那么其余等待这个共享锁的线程能够去尝试获取锁,而且有很大概率获取成功

方法区别

独占锁 共享锁
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

由表格咱们知道,除了最后一个doReleased()是共享锁独有以外,其余的方法独占锁和共享锁基本都是一一对应的。html

在独占锁中,release方法中尝试释放锁,若是成功就unparkSuccessor(h)java

在共享锁中,releaseShared方法中尝试释放锁,若是成功就doReleaseShared()node

因此通常来讲unparkSuccessor(h)doReleaseShared()通常是互相对应的,可是doReleaseShare()要执行的逻辑比前者多。这点咱们到后面再看!app

这里提一下:ide

咱们以前说过函数

  • 在独占模式中,当一个线程获取锁以后,只有释放锁以后才会去唤醒下一个线程。oop

  • 在共享模式中,若是一个线程成功获取了共享锁,他就会立刻唤醒下一个等待的线程,并不须要该拥有锁的线程释放锁才唤醒下一个线程,由于共享锁的设定就是为了让多个线程同时拥有所资源的!这里有两种状况会唤醒后面等待的线程 1. 当前线程成功拥有锁 , 2. 拥有锁的线程释放锁 . 但愿读者记住这两点,这对于后面咱们分析源码的时候有指导性的做用。源码分析

前面咱们先说了独占锁和共享锁的区别,是为了让读者更好地去区分它们。下面咱们就经过分析CountDownLatch的源码,去体会AQS中共享模式是如何工做的。post

CountDownLatch

说明

CountDownLatch类是典型的AQS的共享模式使用,而且是一个高频使用的类。学习

关于CountDownLatch的用法,咱们就不一一在这里赘述。若是有不了解的读者,能够先去百度一下再往下看源码分析,这样效果会更佳。

这里推荐一篇关于怎么用的博客,我的看的时候以为比较通俗易懂

www.cnblogs.com/cuglkb/p/85…


源码分析

构造方法

须要传入一个不小于0的数,其实也就是设置AQS的共享资源

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }


//----------Sync是CountDownLatch的内部类
/** 咱们这里说一下,咱们看到Sync就是继承了AQS这个抽象类,而后重写了tryAcquireShared和 tryReleaseShared的方法,也就是本身加了个getCount()方法,因此后面看到Sync类调用的方法,除了重写那两个,其余的基本都是父类的方法,读者要时刻记住这一点! **/
  private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count); //AQS
        }

        int getCount() {
            return getState();
        }

      	//重写AQS的方法
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//重写AQS的方法
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
          
复制代码

咱们能够看到,在构造方法里面咱们就设置了AQS的state值。

全部调用了await()方法的线程就会等待挂起,而后另外的线程就会执行state = state -1 的操做。

当state的值为0的时候,那么将state减为0的线程就会立刻唤醒以前调用了await()的线程,而后这些线程就会去获取共享锁啦!

在咱们知道CountDownLatch的用法以后,咱们须要注意它的两个方法,一个是CountDown(),另一个是await()方法

countDown()每次会把state的值减1,直到state的值变为0,唤醒调用await()的线程

await()顾名思义,调用这个方法的线程会阻塞,直到被唤醒。

上面说的但愿读者能够认真理解一下,理解到位了看下面的源码才不会半知半懂

示例代码

咱们经过下面的源码,来分析countDownawait的源码

public static void main(String[] args) {
       CountDownLatch latch = new CountDownLatch(2);
    //t1和t2负责countDown,也就是将state减1
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(5000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(7000);

               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t2");

        t1.start();
        t2.start();

    // t3 和 t4负责await,等待state为0而后被唤醒
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + "从await回来了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }
        },"t3");

        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "从await回来了");
            }
        },"t4");

        t3.start();
        t4.start();
    }
复制代码

结果(t3和t4不是按照绝对的顺序输出的,后面的分析咱们按照t3先入队)


下面咱们就按照步骤来

latch先await()阻塞,等待被唤醒(等待state变为0),而后await()返回去作其余事情!

await源码

public void await() throws InterruptedException {
     	//调用sync的方法,关于Sync咱们前面已经说过,这里就很少赘述了
        sync.acquireSharedInterruptibly(1);
    }


public final void acquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
    	//这个方法是响应中断的有中断就抛出异常呗
        if (Thread.interrupted())
            throw new InterruptedException();
    	//t3和t4到这里的时候,确定是小于0的(此时state = 2)
    	//因此先看看tryAcquireShared方法
    	if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


protected int tryAcquireShared(int acquires) { //重写AQS的方法
        //只有state>0的时候才会返回1
    	return (getState() == 0) ? 1 : -1;
        }

//从这个方法名咱们知道,这个方法是获取共享锁而且是响应中断的
private void doAcquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
    
    	//步骤1. 将当前节点设置为共享模式,而后加入阻塞队列!
    	//关于这些知识,在第一篇和第二篇文章就说过啦,不懂的回去看看吧!
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //这样也是同样,只要state > 0,就返回-1,(此时state = 2)
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步骤2. 由于是刚开始,因此不会进到前面那个if语句,因此会直接来到这
                //这里咱们都很熟悉了,就是在阻塞队列找到一个地方让这个线程能够挂起!
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

咱们把上面的步骤用图来描述一下你们就清楚啦!

t3通过步骤1以后,也就是入队以后,会变成以下图所示

而后tryAcquiredShared会返回-1,那么就执行shouldParkAfterFailedAcquire就会将t3的pre指向的节点的WaitStatus(下面统称为ws)置为-1,以下图所示。(t3的ws为-1)

执行完shouldParkAfterFailedAcquire以后执行parkAndCheckInterrupt就会把t3挂起啦!

而后后面t4进来的时候和t3是同样的步骤,到后面就是t4加入阻塞队列,而后把t3的ws置为-1,结果以下图所示。(t4的ws为-1)

接着,t4就就被挂起啦,如今t3和t4就已经全挂起了,就等待着被唤醒啦!


这里说明一下,由于能够是由多个线程调用countDown的,那么有些线程调用的时候state还不是0,因此这些线程不会唤醒await的线程,只有当某个线程调用了countDown方法,而后state从1变为0了,那么这个线程就唤醒其余await的线程!明白这点很重要。

接下来咱们继续看countDown方法

countDown源码

public void countDown() {
        sync.releaseShared(1);
    }

public final boolean releaseShared(int arg) { // AQS
    	//只有将state设置为0的时候才会返回true,不然只是每次将state-1而已
        if (tryReleaseShared(arg)) {
            //到这里说明已经成功把state置为0了,那么就开始唤醒如今还在等待的线程啦!
            doReleaseShared();
            return true;
        }
    	//state原本就是0的话,确定释放不了,就返回false啦
        return false;
    }

protected boolean tryReleaseShared(int releases) {// 重写AQS
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0) 
                    return false;
                int nextc = c-1;
                //每次都是CAS将state-1
                if (compareAndSetState(c, nextc))
                    //若是state减1成功变为0的话,确定就返回true啦!
                    return nextc == 0;
            }
        }


复制代码

下面咱们来着重分析下doReleaseShared()这个方法

//此时state为0,先把流程走一遍先,先看注释便可,其余先不看
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //t3入队的时候,已经把head节点设置为-1啦,因此会到if语句里面
                if (ws == Node.SIGNAL) {
                    //将head设置为0,也就是恢复到初始状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到这里就是唤醒head节点的下一个节点,这时候也就是唤醒t3
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

在t3被唤醒以后,咱们回到t3被唤醒的地方和接下来要作的事情

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//此时head就是t3的前缀啦,因此能够进来这里
                    int r = tryAcquireShared(arg);
                    //这里r > 0啦,由于以前说过state == 0的时候才会返回1
                    if (r >= 0) {
                        //那么此时t3就走到这一步了,下面咱们看下这个方法
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步骤1:t3被唤醒以后就会从这里继续执行啦,假设没有中断的状况,那么是不会进到if语句里面的啦
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

此时t3会进入setHeadAndPropagate这个方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node); //步骤1:t3先把本身设置为头结点先
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //基本都会执行这个函数,那么这里也就说明在共享模式中,当节点被唤醒
                //的时候都会执行这个函数,也就是会唤醒下一个节点
                //那么这里就是说t3把本身设置为头部以后,就会立刻去唤醒t4去抢锁啦!
                doReleaseShared(); 
        }
    }
复制代码

那么在这里,咱们就好好分析一下doReleaseShared()这个方法了,因此在这里咱们知道,t3已是head节点了

//调用这个方法的时候咱们已经知道此事state为0了
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            /** h == null 说明阻塞队列已经为空 h == tail 说明阻塞队列刚被初始化的头结点 还有一种状况就是 以前是普通的节点,可是此节点已是头结点,那么就是说该节点是已经被唤醒了, 后面阻塞队列已经没有节点了 因此上面两种状况都不须要被唤醒 */
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //ws == -1,由于以前t4已经把t3设置为-1(SIGNAL)啦
                if (ws == Node.SIGNAL) {
                    //问题1:这里若是将节点设置为初始化状态失败的缘由待会解释!
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到达这里,就说明t3的ws已经设置为0,而后t3就去唤醒t4啦!
                    //那么t4接下来被唤醒,就像t3被唤醒后作的事情差很少是如出一辙,
                    //因此想分析t4接下来要干什么,看上面t3被唤醒以后作了什么就行了
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                   	//问题2:
                    //在咱们举的例子中,t3是确定不会来到这里的,这里主要判断的是
                    //若是有节点加入进来,可是这个节点准备到这里的时候把它的前驱节点
                    //就设置为-1了,关于这里,咱们下面说明解释。
                    continue;                // loop on failed CAS
            }
            //此时h指向的仍是t3那个节点,可是head节点可能已经变了
            //若是是同一个节点,退出循环,让被唤醒的线程去唤醒后面的线程。
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

下面咱们来看一张图,就能够解释注释的内容了(但愿你们能够好好看看这张图,方便理解后面的解释)

接着咱们来解释一些内容

问题1

就是第一个CAS为何会失败的,咱们看到'唤醒cur的next节点'的流程,好比说t3唤醒t4以后,t3和t4此时就同时进行执行各自的代码了(这里咱们把t3和cur对应,t4和next对应),因此在next被唤醒以后,就会把next节点设置为head节点,咱们以前也说过,在setHeadAndPropagate里面有doReleased方法,因此会有下面的状况

  • cur节点唤醒next节点以后,cur因为某种缘由阻塞了,next节点成功把本身设置为head节点,执行后面的操做,可是还没到第一个CAS.此时cur又继续往下走,判断此时的h已经不是执行head节点了,那么就会指向新的head,也就是此时的next节点.而后cur也继续往下走.因此如今cur和next这两个节点都是同时执行doReleased方法的,因此也会有可能同时到达第一个CAS,因此此时确定只有一个线程CAS操做成功啦,另一个不成功的就根据状况往下走了!这里举几个例子。

    好比说有t3->t4->t5->t6>......tn

    (1)把t3和t4分别对应前面说的那个cur和next,那么此时假设是t3唤醒t5,但t5尚未把本身设置为head节点,那么head节点仍然t4。又由于t4以前CAS是失败的,因此会自旋一次,而后执行最后一个if语句的时候,会直接退出啦!t3执行最后一个if的时候,因为t5尚未把本身设置为head,因此此时t3也退出啦。那么就说明t5之后的节点没人唤醒了吗?非也,由于t5成功设置本身为头结点的话,也会执行doReleased()方法啦!就由t5和它的后继去唤醒吧!

    在(1)的中,若是t3唤醒t5以后,t5也成功设置本身为头结点了,那么t5此时会可能会唤醒后面的,这里先不考虑。而后t3和t4又指向新的head节点啦。这种状况又开始循环啦。

    这里说明一下,t3和t4之中只能有一个CAS成功,另一个就continue再次自旋啦,因此是t4唤醒t5,那么t3就像(1)说的t4同样啦。

咱们看到在,在setHeadAndPropagate里面有doReleased方法,让后面的节点能够更快地唤醒,增大唤醒的吞吐量,不得不说设计者的厉害之处啊!

问题2:

在解释问题2以前,咱们经过下面的步骤慢慢分析

假设这是某一状况阻塞队列的状况

  1. 而后t3被唤醒以后,此时阻塞队列可能没有节点了(阻塞队列不包含head节点)

  1. 咱们在流程图里面说过,可能以前await的节点可能在addWaiter以前阻塞了,这时候可能又不阻塞了,而后执行了addWaiter方法,可是没有执行shouldParkAfterFailedAcquire,也就是t3没有把t2的ws设置为SIGNAL,因此就可能在执行if (h != null && h != tail)以前会有以下图所示

  1. doReleased函数中,咱们已经把ws赋值为t3的ws,也就是ws为0,那么在2的前提下,咱们来到这段代码else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),咱们很容易知道,ws为0,那么此次CAS有可能失败,为何呢?在2 咱们说过shouldParkAfterFailedAcquire可能还没执行,那么也有可能CAS以前,就已经执行了这个函数,也就是此时t3的ws为-1了,那么此时确定CAS失败啦!若是失败的话,就让t3再回去检查下状态,看看是否能够唤醒后继节点啦,因此在else if以后就是让它continue啦!

  2. 若是CAS成功,那么就是说shouldParkAfterFailedAcquire这个函数尚未执行,因此在执行最后一个if语句的时候t3可能退出啦,那么t4继续执行shouldParkAfterFailedAcquire,CAS把t3设置为-1以后,就去执行到setHeadAndPropagate这个方法,那么也会执行doRelease,t4的使命就结束了!

通过上面4点的说明,咱们对问题2的分析就已经很清楚啦!

扩展

在解决问题2的时候,咱们若是把else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))代码去掉的话,会有什么影响么?分析一下!

假如没有这段代码,假设此时阻塞队列以下所示

假设这是t1唤醒t2,但t2还不是head节点,那么在t4入队的时候,由于咱们假设没有那个else if语句嘛,因此会执行shouldParkAfterFailedAcquire,那么将t2的wsCAS为-1,此时head可能已经指向t2了,前面咱们也说过,在共享模式中,只要一个节点成为头结点,就会执行doReleased方法,因此在t2设置为head节点的时候,可能在t4休眠以前就被t2park了,可是这个操做时能够容许的,当咱们unpark一个并无被park的线程时,该线程在下一次调用park方法时就不会被挂起,而这一行为是符合咱们的场景的——由于当前的共享锁处于可获取的状态,后继的线程应该直接来获取锁,不该该被挂起。

CyclicBarrier

说明

下面咱们来讲下CyclicBarrier,和CountDownLatch差很少,CyclicBarrier 基于 Condition 来实现。因此若是没了解过Condition的话,建议你们看下前一篇文章初步了解AQS是什么(二)

啥都不说,开局一张图,(引用别人大佬画的图)

源码分析

下面咱们开始分析咯,依然await是最重要的方法

咱们先看下一些细节

public class CyclicBarrier {
    //CyclicBarrier 是能够重复使用的,每次从开始使用到穿过栅栏当作"一代",或者"一个周期"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
   
    //这里看出是基于Condition的,因此等待线程的条件就是全部线程都在栅栏上await啦
    private final Condition trip = lock.newCondition();
    
    //参与跨过线程的线程数
    private final int parties;
    
    /* 若是设置这个,那么跨过栅栏以前须要执行的操做*/
    private final Runnable barrierCommand;
    
    /** 当前所处的代 */
    private Generation generation = new Generation();
复制代码

下面咱们看看如可开启新的一代吧

private void nextGeneration() {
        // 须要唤醒全部等待在栅栏上的线程
        trip.signalAll();
        
        count = parties;
       	//开启新的一代!
        generation = new Generation();
    }
复制代码

基本能够看出,咱们开启新的一点,差很少和重写生成一个CyclicBarrier差很少

咱们接下来看下如何打破一个栅栏

private void breakBarrier() {
    	//设置标志位
        generation.broken = true;
     	//从新设置count的值
        count = parties;
     	//唤醒全部在等待的线程
        trip.signalAll();
    }
复制代码

如今已经有了铺垫了,那么咱们来看下await方法吧

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        //这里咱们先得到锁呗,在finally后面再释放锁
        //由于在Condition中,await的线程须要先得到锁的啦!
        lock.lock();
        try {
            final Generation g = generation;
			//先检查栅栏是否有被打破,被打破就抛出异常呗
            if (g.broken)
                throw new BrokenBarrierException();
			//检查中断,有则抛出
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			//index 就是这个返回值
            //index 存储当前还有多少count
            int index = --count;
            //若是index 为 0,那么就说明全部线程在栅栏啦,那么就准备打破而后下一代啦
            if (index == 0) {  // tripped
                //用来标记barrierCommand在执行的时候有没有发生异常
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        //执行barrierCommand的操做
                        command.run();
                    ranAction = true;
                    //开启下一代啦,这里建议看下源码!
                    nextGeneration();
                    return 0;
                } finally {
                    //若是barrierCommand执行的时候发生过异常,那就打破栅栏呗!
                    //这里也建议看下源码!
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //若是到达这里,说明上面的index不为0,救是说不是最后一个线程到达栅栏的
            for (;;) {
                try {
                    //这里是没有超时机制的
                    //到这里也就是说到达栅栏了,那么就等待呗,因此就调用Condition的
                    //await,等待最后一个线程,而后被唤醒(Condition的signal)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若是到达这里,就说明线程在await的时候被中断了
                    if (g == generation && ! g.broken) {
                        //到这里就打破栅栏呗
                        breakBarrier();
                        //打破栅栏以后,抛出异常信息,让外层本身去处理
                        throw ie;
                    } else {
                        //这里就是说g != genneration,就是说前面已经开启一个新的
                        //时代啦,说明最后一个线程await完成,全部线程就被唤醒啦!
                        //可是这个中断已经没有意义啦,因此记录下就好!
                        Thread.currentThread().interrupt();
                    }
                }
				//若是栅栏被打破,这里的被打破就是以前执行barrierCommand的时候
                //发生异常,那么被就抛出异常啦
                if (g.broken)
                    throw new BrokenBarrierException();

                //到这里基本都要退出啦
                //由于以前barrierCommand在执行完任务以后,就会 nextGeneration
                //开启一个新的时代,而后释放锁,等待的线程都是await到这里的,因此肯
                //定await以前的时代和被唤醒后的时代不同啦!
                //若是以前的栅栏破了或者await的时候被唤醒了,都是在前面就抛出异常
                //都会直接返回啦!
                if (g != generation)
                    return index;
				//这里是超时的操做了,就不作分析了
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
复制代码

上面的await方法咱们已经说得已经很清楚啦,咱们接下来看看如何获取在栅栏等待的线程数

public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //也就是运算一下就好啦!
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
复制代码

检查栅栏有没有被打破,其实也就是看看那个标志位有没有被设置就好啦!

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
复制代码

下面咱们来总结一下何时栅栏会被打破吧!

  1. 若是在await的线程被中断,那么就会打破栅栏,而后抛出InterruptedException的异常啦
  2. 若是在指定的操做被抛出异常的时候,也会打破栅栏!

咱们再来看下重置栅栏的源码

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
复制代码

跟简单,就是打破栅栏,而后重建栅栏!也就是不破不立!

咱们来假设一个状况,加入parties设置为4,此时有3个线程await了,那么在第4个await以前重置

那么首先打破栅栏嘛,那么就会跑出BrokenBarrierException 异常,而后开启新的一代,再重置CyclicBarrier 的count和generation,一切从0开始!

关于AQS的核心功能的源码分析就到此结束啦!继续征战下一个知识!

总结

  1. 本片博客重点分析了CountDownLatch的源码,在分析的过程当中,有两个问题迷惑了本身好久,可是本身在尝试跳出局部思惟以后,发现有些问题能够迎刃而解,也对AQS共享模式的工做机制有了必定的了解因此但愿之后能够保持这种思惟。
  2. 在分析CountDownLatch的源码过程当中,本身起码能够耐下心去琢磨,但愿之后能够更加锻炼阅读优秀源码的能力吧
  3. 在写本博客的时候参考了别人的想法和内容,但愿本身能够早日独立写出优秀的文章!!!!!
相关文章
相关标签/搜索