死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题

(1)CyclicBarrier是什么?java

(2)CyclicBarrier具备什么特性?app

(3)CyclicBarrier与CountDownLatch的对比?源码分析

简介

CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行。它与CountDownLatch很相似,但又不一样,CountDownLatch须要调用countDown()方法触发事件,而CyclicBarrier不须要,它就像一个栅栏同样,当一组线程都到达了栅栏处才继续往下走。学习

使用方法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after");
            }).start();
        }
    }
}

这段方法很简单,使用一个CyclicBarrier使得三个线程保持同步,当三个线程同时到达cyclicBarrier.await();处你们再一块儿往下运行。this

源码分析

主要内部类

private static class Generation {
    boolean broken = false;
}

Generation,中文翻译为代,一代人的代,用于控制CyclicBarrier的循环使用。线程

好比,上面示例中的三个线程完成后进入下一代,继续等待三个线程达到栅栏处再一块儿执行,而CountDownLatch则作不到这一点,CountDownLatch是一次性的,没法重置其次数。翻译

主要属性

// 重入锁
private final ReentrantLock lock = new ReentrantLock();
// 条件锁,名称为trip,绊倒的意思,多是指线程来了先绊倒,等达到必定数量了再唤醒
private final Condition trip = lock.newCondition();
// 须要等待的线程数量
private final int parties;
// 当唤醒的时候执行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 当前这一代还须要等待的线程数
private int count;

经过属性能够看到,CyclicBarrier内部是经过重入锁的条件锁来实现的,那么你能够脑补一下这个场景吗?code

彤哥来脑补一下:假如初始时count = parties = 3,当第一个线程到达栅栏处,count减1,而后把它加入到Condition的队列中,第二个线程到达栅栏处也是如此,第三个线程到达栅栏处,count减为0,调用Condition的signalAll()通知另外两个线程,而后把它们加入到AQS的队列中,等待当前线程运行完毕,调用lock.unlock()的时候依次从AQS的队列中唤醒一个线程继续运行,也就是说实际上三个线程先依次(排队)到达栅栏处,再依次往下运行。队列

以上纯属彤哥脑补的内容,真实状况是否是如此呢,且日后看。事件

构造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等于parties
    this.count = parties;
    // 初始化都到达栅栏处执行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

构造方法须要传入一个parties变量,也就是须要等待的线程数。

await()方法

每一个须要在栅栏处等待的线程都须要显式地调用await()方法等待其它线程的到来。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用dowait方法,不须要超时
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        
        // 检查
        if (g.broken)
            throw new BrokenBarrierException();

        // 中断检查
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        
        // count的值减1
        int index = --count;
        // 若是数量减到0了,走这段逻辑(最后一个线程走这里)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 若是初始化的时候传了命令,这里执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 调用下一代方法
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 这个循环只有非最后一个线程能够走到
        for (;;) {
            try {
                if (!timed)
                    // 调用condition的await()方法
                    trip.await();
                else if (nanos > 0L)
                    // 超时等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            
            // 检查
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常来讲这里确定不相等
            // 由于上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
            if (g != generation)
                return index;
            
            // 超时检查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // 调用condition的signalAll()将其队列中的等待者所有转移到AQS的队列中
    trip.signalAll();
    // 重置count
    count = parties;
    // 进入下一代
    generation = new Generation();
}

dowait()方法里的整个逻辑分红两部分:

(1)最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration()方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。

(2)非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await()方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。

图解

CyclicBarrier

学习过前面的章节,看这个图很简单了,看不懂的同窗还须要把推荐的内容好好看看哦^^

总结

(1)CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程你们再继续往下走;

(2)CyclicBarrier不是直接使用AQS实现的一个同步器;

(3)CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑;

彩蛋

CyclicBarrier与CountDownLatch的异同?

(1)二者都能实现阻塞一组线程等待被唤醒;

(2)前者是最后一个线程到达时自动唤醒;

(3)后者是经过显式地调用countDown()实现的;

(4)前者是经过重入锁及其条件锁实现的,后者是直接基于AQS实现的;

(5)前者具备“代”的概念,能够重复使用,后者只能使用一次;

(6)前者只能实现多个线程到达栅栏处一块儿运行;

(7)后者不只能够实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见CountDownLatch那章使用案例);

推荐阅读

一、死磕 java同步系列之开篇

二、死磕 java魔法类之Unsafe解析

三、死磕 java同步系列之JMM(Java Memory Model)

四、死磕 java同步系列之volatile解析

五、死磕 java同步系列之synchronized解析

六、死磕 java同步系列之本身动手写一个锁Lock

七、死磕 java同步系列之AQS起篇

八、死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

九、死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

十、死磕 java同步系列之ReentrantLock VS synchronized

十一、死磕 java同步系列之ReentrantReadWriteLock源码解析

十二、死磕 java同步系列之Semaphore源码解析

1三、死磕 java同步系列之CountDownLatch源码解析

1四、死磕 java同步系列之AQS终篇

1五、死磕 java同步系列之StampedLock源码解析


欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。

qrcode

相关文章
相关标签/搜索