CyclicBarrier是如何成为一个"栅栏"的

CyclicBarrier是一种相似于栅栏的存在,意思就是在栅栏开放以前你都只能被挡在栅栏的一侧,当栅栏移除以后,以前被挡在一侧的多个对象则同时开始动起来。html

1. 如何使用CyclicBarrier

  在介绍其原理以前,先了解一下CyclicBarrier应该如何使用。java

  假设如今有这样的场景,咱们须要开一个会议,须要张一、张二、张3三我的参加,
会议须要三我的都到齐以后才能开始,不然只能干等着;这个场景用CyclicBarrier能够很契合的模拟出来。代码以下:app

public static void main(String[] args) {
    // 线程池,每一个线程表明一我的
    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    // 会议所需的人数为3
    CyclicBarrier barrier = new CyclicBarrier(3);

    executor.execute(() -> {
        try {
            System.err.println("张1到达会议室");
            barrier.await();
            System.err.println("会议开始,张1开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张2到达会议室");
            barrier.await();
            System.err.println("会议开始,张2开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张3先去个厕所,内急解决再去开会");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("张3到达会议室");
            barrier.await();
            System.err.println("会议开始,张3开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

结果图:
例子图
  经过上方代码能够知道CyclicBarrier的几点:ide

  1. 使用await()来表示完成了某些事情。(上方例子的表现为到达了会议室)
  2. 使用await()以后当前线程就进入阻塞状态,须要等待彻底知足CyclicBarrier的条件后唤醒才能继续接下来的操做。(上方例子中 为3我的都到达会议室)
  3. 在最后一个线程达到条件以后,以前阻塞的线程所有放开,继续接下来的操做。(上方例子为张3到达会议室)

  这个简单的例子也让咱们了解CyclicBarrier的使用方法,那来看看其内部到底是如何实现栅栏的效果的。函数

2. CyclicBarrier是如何成为"栅栏"的

  从第一节的代码中咱们也能看到,须要关注的就两个地方this

  1. 构造函数
  2. await()方法

只要了解这两个方法的内部,至关于了解了CyclicBarrier的内部。
那在深刻了解以前,先来看下CyclicBarrier的几个变量,不用刻意去记,看代码的时候知道这个东西作什么用的就好了:线程

lock:CyclicBarrier类建立的ReentrantLock实例,关于ReentrantLock不清楚的能够->传送。code

trip:lock中的conditionCyclicBarrier使用该变量来实现各线程之间的阻塞和同时唤醒。一样,不明白condition做用的=>传送门htm

parties:须要知足条件(调用await方法)的总数,就是说当有parties个线程await()以后就会唤醒所有线程。对象

barrierCommand:一个Runnable变量,在await方法的调用次数到达总数parties以后,在唤醒所有线程以前执行其run()方法

generation:其内部类,能够理解为周期,周期内须要完成n个任务,只要一个任务失败,当前周期的全部任务就算失败,结束当前周期,再开启下个周期。

count:当前周期剩余须要完成的任务数(剩余调用await方法的次数)

如下为源码:

public class CyclicBarrier {
    // 内部类,可理解为周期
    private static class Generation {
        // 当前周期是否失败
        boolean broken = false;
    }

    // 锁的实例
    private final ReentrantLock lock = new ReentrantLock();
    // ReentrantLock的condition变量,用来控制线程唤醒和阻塞
    private final Condition trip = lock.newCondition();
    // 须要知足条件的次数,即须要调用await方法的次数
    private final int parties;
    // 知足条件次数达到parties以后,唤醒全部线程以前执行其 run()方法
    private final Runnable barrierCommand;
    // 当前周期
    private Generation generation = new Generation();
    // 剩余知足条件次数
    private int count;
    
    // ...
}

  看完CyclicBarrier的几个变量后,来看其具体的内部实现。

  首先来看构造函数,其构造函数有两个,一个在达到条件总数(parties)后直接叫醒全部线程;另外一个指定一个Runnable在达到条件总数后先执行其run()方法再叫醒。

  • 不指定Runnable,参数只有一个:须要达成的任务数
public CyclicBarrier(int parties) {
    // 直接调用另外一个构造方法,Runnable传null,表示不执行
    this(parties, null);
}
  • 指定Runnable的构造方法,赋值任务总数、剩余任务数、唤醒操做以前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 任务总数
    this.parties = parties;
    // 剩余须要完成的任务数
    this.count = parties;
    // 唤醒以前执行的Runnable
    this.barrierCommand = barrierAction;
}

  在第一节咱们使用的是第一个构造方法,来试试第二个

public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    /** =======增长Runnable,其余地方保持一致=============*/
    CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在会议开始以前,先给你们发下开会资料"));

    executor.execute(() -> {
        try {
            System.err.println("张1到达会议室");
            barrier.await();
            System.err.println("会议开始,张1开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张2到达会议室");
            barrier.await();
            System.err.println("会议开始,张2开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张3先去个厕所,内急解决再去开会");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("张3到达会议室");
            barrier.await();
            System.err.println("会议开始,张3开始发言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

结果图:

pic2

 看完构造函数,就算理解了一半CyclicBarrier了,接下来来看另外一半——await();跟踪代码,看到是这样的

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

直接调用dowait方法,传参为false0,意思就是不限时等待,除非线程被打断或者唤醒。再进入dowait方法,这个方法就是CyclicBarrier的另外一半,在下方的代码中很清楚的写了整个执行流程

/** 参数说明, timed:是否限时, nanos:限时时间*/
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 锁
    final ReentrantLock lock = this.lock;
    // 获取锁,若是失败的话线程睡眠,进入同步队列(AQS中的知识)
    lock.lock();
    try {
        /* 拿到锁以后进入代码处理逻辑*/
        
        // 当前周期
        final Generation g = generation;

        // 若是当前周期是失败的,那么直接抛错
        if (g.broken)
            throw new BrokenBarrierException();

        // 若是当前线程被打断了,那么这次周期失败,设置相关参数,而后抛错
        if (Thread.interrupted()) {
            // 实现代码在下行的注释中,设置相关参数来提醒其余线程周期失败了
            breakBarrier();
            /*
             * private void breakBarrier() {
             *     generation.broken = true;
             *     count = parties;
             *     // 唤醒condition中的全部线程
             *     trip.signalAll();
             * }
             */
            throw new InterruptedException();
        }

        // 若是成功了,那么剩余任务数(count)减1
        int index = --count;
        // 若是为0则表示达到剩余的任务数没有了,达到CyclicBarrier的条件总数了,须要唤醒其余线程
        if (index == 0) {  
            boolean ranAction = false;
            try {
                // 唤醒以前的Runnable
                final Runnable command = barrierCommand;
                // 若是不为空的话执行其run方法
                if (command != null)
                    command.run();
                ranAction = true;
                // 开启下个周期,这个方法是CyclicBarrier能够复用的缘由,具体实如今下行注释
                nextGeneration();
                /* private void nextGeneration() {
                 *     // 首先叫醒当前周期的其余线程,告诉其周期结束了,能够执行接下来的操做了
                 *     trip.signalAll();
                 *     // 而后开启下个周期,剩余任务数重置
                 *     count = parties;
                 *     // 下个周期
                 *     generation = new Generation();
                 * }
                 */
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是还不能结束本周期,就一直等待直到结束或者周期失败
        for (;;) {
            try {
                // await的过程当中是释放锁的
                // 不限时的话就一直等待直到被唤醒或者打断
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    // 不然的话等待一段时间后醒来
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

  到这里就基本理解CyclicBarrier的内部实现了,其余像带参数的await也是同样逻辑,只不过是多了限时的条件而已。

  其实若是你了解ReentrantLock的话,就知道CyclicBarrier整个就是对ReentrantLockcondition的活用而已。

3.总结

  总体来讲CyclicBarrier的实现相对较简单,说是ReentrantLockcondition的升级版也不为过。其关键点为两个,一个为其构造函数,决定任务个数和唤醒前操做;另一个点为await方法,在正常状况下每次await都会减小一个任务数(总数由构造方法决定),在任务数变为0的时候表示周期结束,须要唤醒condition的其余线程,而途中遇到失败的话当前周期失败,唤醒其余线程一块儿抛错。



失败不会让你变得弱小,惧怕失败会。

相关文章
相关标签/搜索