CyclicBarrier源码探究 (JDK 1.8)

CyclicBarrier也叫回环栅栏,可以实现让一组线程运行到栅栏处并阻塞,等到全部线程都到达栅栏时再一块儿执行的功能。“回环”意味着CyclicBarrier能够屡次重复使用,相比于CountDownLatch只能使用一次,CyclicBarrier能够节省许多资源,而且还能够在构造器中传入任务,当栅栏条件知足时执行这个任务。CyclicBarrier是使用了ReentrantLock,主要方法在执行时都会加锁,所以并发性能不是很高。安全

1.相关字段

//重入锁,CyclicBarrier内部经过重入锁实现线程安全
    private final ReentrantLock lock = new ReentrantLock();
    //线程阻塞时的等待条件
    private final Condition trip = lock.newCondition();
    //须要等待的线程数
    private final int parties;
    //栅栏打开以后首先执行的任务
    private final Runnable barrierCommand;
    //记录当前的分代标记
    private Generation generation = new Generation();
    //当前还须要等待多少个线程运行到栅栏位置
    private int count;

须要注意的是generation字段,用于标记栅栏当前处在哪一代。当知足必定的条件时(例如调用了reset方法,或者栅栏打开等),栅栏状态会切换到下一代,实际就是new一个新的Generation对象,这是CyclicBarrier的内部类,代码很是简单,以下:并发

private static class Generation {
        boolean broken = false;   //标记栅栏是否被破坏
    }

实际使用的过程当中,会利用generation字段判断当前是否在同一个分代,而使用broker字段判断栅栏是否被破坏。app

2.构造函数

CyclicBarrier有两个重载的构造函数,构造函数只是对上述的相关字段进行初始化,以下:函数

public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

3.核心方法

  • await
    await是开发时最经常使用到的方法了,同CountDownLatch同样,CyclicBarrier也提供了两个await方法,一个不带参数,一个带有超时参数,其内部只是简单调用了一下dowait方法:
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

接下来看看相当重要的dowait方法:oop

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加剧入锁
        lock.lock();
        try {
            //首先获取年龄代信息
            final Generation g = generation;
            //若是栅栏状态被破坏,抛出异常,例如先启动的线程调用了breakBarrier方法,后启动的线程就可以看到g.broker=true
            if (g.broken)
                throw new BrokenBarrierException();
            //检测线程的中断状态,若是线程设置了中断状态,则经过breakBarrier设置栅栏为已破坏状态,并唤醒其余线程
            //若是这里可以检测到中断状态,那只多是在await方法外部设置的
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //每调用一次await,就将须要等待的线程数减1
            int index = --count;
            //index=0表示这是最后一个到达的线程,由该线程执行下面的逻辑
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //若是在构造器中传入了第二个任务参数,就在放开栅栏前先执行这个任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //正常结束,须要唤醒阻塞的线程,并换代
                    nextGeneration();
                    return 0;
                } finally {
                    //try代码块若是正常执行,ranAction就必定等于true,而try代码块惟一可能发生异常的地方就是command.run(),
                    //所以这里为了保证在任务执行失败时,将栅栏标记为已破坏,唤醒阻塞线程
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //没有设置超时标记,就加入等待队列
                    if (!timed)
                        trip.await();
                    //设置了超时标记,但目前尚未超时,则继续等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若是线程等待的过程当中被中断,会执行到这里
                    //g == generation表示当前还在同一个年龄分代中,!g.broker表示当前栅栏状态没有被破坏
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        //上面的条件不知足,说明:1)g!=generation,说明线程执行到这里时已经换代了
                        //2)没有换代,可是栅栏被破坏了
                        //不管哪一种状况,都只是简单地设置一下当前线程的中断状态
                        Thread.currentThread().interrupt();
                    }
                }
                //栅栏被破坏,抛出异常
                //注意,在breakBarrier方法中会唤醒全部等待条件的线程,这些线程会执行到这里,判断栅栏已经被破坏,都会抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //距离上一次设置g变量的值已通过去很长时间了,在执行过程当中generation可能已经发生改变,
                //当前线程仍是前几代的,不须要再循环阻塞了,直接返回上一代剩余须要等待的线程数
                //注意:代码中breakBarrier方法和nextGeneration方法都会唤醒阻塞的线程,可是breakBarrier在上一个判断就被拦截了,
                //所以走到这里的有三种状况:
                //a)最后一个线程正常执行,栅栏打开致使其余线程被唤醒;不属于当前代的线程直接返回,
                //属于当前代的则可能由于没到栅栏开放条件要继续循环阻塞
                //b)栅栏被重置(调用了reset方法),此时g!=negeration,全都直接返回
                //c)线程等待超时了,不属于当前代的返回就能够了,属于当前代的则要设置generation.broken = true
                if (g != generation)
                    return index;
                //若是线程等待超时,标记栅栏为破坏状态并抛出异常,若是还没超时,则自旋后又从新阻塞
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //别忘了解锁
            lock.unlock();
        }
    }

dowait的方法逻辑是:每个调用await方法的线程都会将计数count1,最后一个线程将count减为0时,顺带还要执行barrierCommand指定的任务,并将generation切换到下一代,固然,最重要的仍是要唤醒以前在栅栏处阻塞的线程。因为trip对应的Condition对象没有任何地方会修改,所以trip.signalAll()会唤醒全部在该条件上等待的线程,若是线程在等待的过程当中,其余线程将generation更新到下一代,就会出现被唤醒的线程中有部分还属于以前那一代的状况。
接下来将会对dowait用到的一些方法进行简单介绍。性能

  • breakBarrier
    dowait方法有四个地方调用了breakBarrier,从名字能够看出,该方法会将generation.broken设置为true,除此以外,还会还原count的值,而且唤醒全部被阻塞的线程:
private void breakBarrier() {
        generation.broken = true;
        count = parties;
        //唤醒全部的阻塞线程
        trip.signalAll();
    }

纵观CyclicBarrier源码,generation.broken统一在breakBarrier方法中被设置为true,而一旦将generation.broken设置为true以后,代码中检查到这个状态以后都会抛出异常,栅栏就没办法再使用了(能够手动调用reset进行重置),而源码中会在如下几种状况调用breakBarrier方法:
1) 当前线程被中断
2)经过构造器传入的任务执行失败
3) 条件等待时被中断
4) 线程等待超时
5) 显式调用reset方法this

  • nextGeneration
private void nextGeneration() {
        // 唤醒全部的阻塞线程
        trip.signalAll();
        // 开启下一代
        count = parties;
        generation = new Generation();
    }
  • reset
    reset方法主要是结束这一代,并切换到下一代
public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

介绍到这里,整个CyclicBarrier已经差很少介绍完了,可是内部的流程远远没有这么简单,由于很大一部分逻辑封装在AbstractQueuedSynchronizer中,这个类定义了阻塞的线程如何加入等待队列,又如何被唤醒,所以若是想要深刻了解线程等待的逻辑,还须要仔细研究AbstractQueuedSynchronizer才行。本文不会对这部份内容进行介绍,后面有时间的话将会专门对其进行介绍。线程

相关文章
相关标签/搜索