java.util.Concurrent.CyclicBarrier 源码

类图

源码html

package java.util.concurrent;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class CyclicBarrier {
    //使用ReentrantLock可重入独占锁
    private final ReentrantLock lock = new ReentrantLock();
    //建立一个条件队列
    private final Condition trip = lock.newCondition();
    //经过构造器传入的参数.表示总的等待线程的数量
    private final int parties;
    //当屏障正常打开后运行的程序,经过最后一个调用await的线程来执行
    private final Runnable barrierCommand;
    //当前的Generation。每当屏障失效或者开闸以后都会自动替换掉。从而实现重置的功能
    private Generation generation = new Generation();
    //实际仍在等待的线程数.当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties
    private int count;

    //内部类
    private static class Generation {
        boolean broken = false;//表示当前的屏障是否被打破
    }

    //建立一个CyclicBarrier实例,parties指定参与相互等待的线程数
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    //建立一个CyclicBarrier实例,parties指定参与相互等待的线程数
    //barrierAction指定当全部线程到达屏障点以后,首先执行的操做,该操做由最后一个进入屏障点的线程执行。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    //返回参与相互等待的线程数
    public int getParties() {
        return parties;
    }

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
    //直到全部线程都到达屏障点,当前线程才会被唤醒
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
    //在timeout指定的超时时间内,等待其余参与线程到达屏障点
    //若是超出指定的等待时间,则抛出TimeoutException异常,若是该时间小于等于零,则此方法根本不会等待
    public int await(long timeout, TimeUnit unit) throws  InterruptedException,BrokenBarrierException,TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
    //在timeout指定的超时时间内,等待其余参与线程到达屏障点
    //若是超出指定的等待时间,则抛出TimeoutException异常,若是该时间小于等于零,则此方法根本不会等待
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)//若是当前Generation是处于打破状态则传播这个BrokenBarrierExcption
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();//若是当前线程被中断则使得当前generation处于打破状态,重置剩余count,而且唤醒状态变量.这时候其余线程会传播BrokenBarrierException
                throw new InterruptedException();
            }

            int index = --count;//尝试下降当前count
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //当全部参与的线程都到达屏障点,当即去唤醒全部处于休眠状态的线程,恢复执行
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)//若是运行command失败也会致使当前屏障被打破
                        breakBarrier();
                }
            }


            for (;;) {
                try {
                    if (!timed)
                        //让当前执行的线程阻塞,处于休眠状态
                        trip.await();
                    else if (nanos > 0L)
                        //让当前执行的线程阻塞,在超时时间内处于休眠状态
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    //唤醒全部处于休眠状态的线程,恢复执行
    //重置count值为parties
    //重置中断状态为false
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

    //唤醒全部处于休眠状态的线程,恢复执行
    //重置count值为parties
    //重置中断状态为true
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    //判断此屏障是否处于中断状态。
    //若是由于构造或最后一次重置而致使中断或超时,从而使一个或多个参与者摆脱此屏障点,或由于异常而致使某个屏障操做失败,则返回true;不然返回false
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    //将屏障重置为其初始状态
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //唤醒全部等待的线程继续执行,并设置屏障中断状态为true
            breakBarrier();
            //唤醒全部等待的线程继续执行,并设置屏障中断状态为false
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }

    //返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

类 CyclicBarrier

    extends Objectjava

    一个同步辅助类:它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。api

    在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 颇有用。函数

    由于该 barrier 在释放等待线程后能够重用,因此称它为循环 的 barrier。this

    CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达以后(但在释放全部线程以前),该命令只在每一个屏障点运行一次。若在继续全部参与线程以前更新共享状态,此屏障操做 颇有用。spa

 

构造方法摘要.net

CyclicBarrier(int parties) 
          建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。
CyclicBarrier(int parties, Runnable barrierAction) 
          建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。

 

方法摘要线程

 int await() 
          在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。
 int await(long timeout, TimeUnit unit) 
          在全部参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。
 int getNumberWaiting() 
          返回当前在屏障处等待的参与者数目。
 int getParties() 
          返回要求启动此 barrier 的参与者数目。
 boolean isBroken() 
          查询此屏障是否处于损坏状态。
 void reset() 
          将屏障重置为其初始状态。

 

CyclicBarrier:将使count等于parties

public CyclicBarrier(int parties,Runnable barrierAction)

    建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。调试

    参数:code

    parties - 在启动 barrier 前必须调用 await() 的线程数

    barrierAction - 在启动 barrier 时执行的命令;若是不执行任何操做,则该参数为 null

    抛出:

    IllegalArgumentException - 若是 parties 小于 1

 

CyclicBarrier:将使count等于parties

public CyclicBarrier(int parties)

    建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。

    参数:

    parties - 在启动 barrier 前必须调用 await() 的线程数

    抛出:

    IllegalArgumentException - 若是 parties 小于 1

 

getParties:获得parties的值

public int getParties()

    返回要求启动此 barrier 的参与者数目。

    返回:

        要求启动此 barrier 的参与者数目

 

await:count减1,若count等于0,则唤醒等待的线程;不等于0,则挂起等待

public int await() throws InterruptedException, BrokenBarrierException

    在全部 参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。

    若是当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生如下状况之一前,该线程将一直处于休眠状态:

  • 最后一个线程到达;或者
  • 其余某个线程中断当前线程;或者
  • 其余某个线程中断另外一个等待线程;或者
  • 其余某个线程在等待 barrier 时超时;或者
  • 其余某个线程在此 barrier 上调用 reset()

    若是当前线程:

  • 在进入此方法时已经设置了该线程的中断状态;或者
  • 在等待时被中断

    则抛出 InterruptedException,而且清除当前线程的已中断状态。

    若是在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。

    若是任何线程在等待时被 中断,则其余全部等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。

    若是当前线程是最后一个将要到达的线程,而且构造方法中提供了一个非空的屏障操做,则在容许其余线程继续运行以前,当前线程将运行该操做。若是在执行屏障操做过程当中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。

    返回:

        到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程

    抛出:

    InterruptedException - 若是当前线程在等待时被中断

    BrokenBarrierException - 若是 另外一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await 时 barrier 被损坏,抑或因为异常而致使屏障操做(若是存在)失败。

 

await:count减1,若count等于0,则唤醒等待的线程;不等于0,则挂起等待指定时间。在过了指定时间以后,count仍不等于0,则抛出异常。

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

    在全部 参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。

    若是当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生如下状况之一前,该线程将一直处于休眠状态:

  • 最后一个线程到达;
  • 超出指定的超时时间;
  • 其余某个线程中断当前线程;
  • 其余某个线程中断另外一个等待线程;
  • 其余某个线程在等待 barrier 时超时;
  • 其余某个线程在此 barrier 上调用 reset()

    若是当前线程,在如下状况中的一种时:

  • 在进入此方法时已经设置了该线程的中断状态;
  • 在等待时被中断

    则抛出 InterruptedException,而且清除当前线程的已中断状态。

    若是超出指定的等待时间,则抛出 TimeoutException 异常。若是该时间小于等于零,则此方法根本不会等待。

    若是在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。

    若是任何线程在等待时被中断,则其余全部等待线程都将抛出 BrokenBarrierException,并将屏障置于损坏状态。

    若是当前线程是最后一个将要到达的线程,而且构造方法中提供了一个非空的屏障操做,则在容许其余线程继续运行以前,当前线程将运行该操做。若是在执行屏障操做过程当中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。

    参数:

    timeout - 等待 barrier 的时间

    unit - 超时参数的时间单位

    返回:

        到达的当前线程的索引,其中,索引 getParties() - 1 指示第一个将要到达的线程,零指示最后一个到达的线程

    抛出:

    InterruptedException - 若是当前线程在等待时被中断

    TimeoutException - 若是超出了指定的超时时间

    BrokenBarrierException - 若是 另外一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者调用 await 时 barrier 被损坏,抑或因为异常而致使屏障操做(若是存在)失败。

 

isBroken:判断屏障是否损坏

public boolean isBroken()

    查询此屏障是否处于损坏状态。

    返回:

        若是屡次调用构造函数或者使用重置函数reset(),在屏障等待的参与者的等待状态会被中断或超时,从而抛出异常。由于异常而致使某个屏障操做失败,则返回 true;不然返回 false

 

reset:屏障状态重置为true,count等于parties。

public void reset()

    将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException

    注意:在因为其余缘由形成损坏(broken)以后,实行重置可能会变得很复杂;此时须要使用其余方式从新同步线程,并选择其中一个线程来执行重置。与为后续使用建立一个新 barrier 相比,这种方法可能更好一些。

 

getNumberWaiting:获得当前在屏障处等待的参与数

public int getNumberWaiting()

    返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。

    返回:

        当前阻塞在 await() 中的参与者数目。

 

使用实例:

  • 1.新建5个线程,这5个线程达到必定的条件时,它们才继续日后运行。
package com.thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest1 extends Thread {
    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
            // CyclicBarrier的count减1,若count等于0,则唤醒在屏障处等待的全部线程
            cb.await();
            System.out.println(Thread.currentThread().getName() + " continued.");
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        cb = new CyclicBarrier(SIZE);
        // 新建5个任务
        for (int i = 0; i < SIZE; i++)
            new CyclicBarrierTest1().start();
    }
}

    运行结果:

Thread-1 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-4 continued.
Thread-3 continued.
Thread-2 continued.
Thread-1 continued.
Thread-0 continued.

 

  • 2.新建5个线程,当这5个线程达到必定的条件时,执行某项任务。
package com.thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest2 extends Thread {
    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
            // CyclicBarrier的count减1,若count等于0,则唤醒在屏障处等待的全部线程
            cb.await();
            System.out.println(Thread.currentThread().getName() + " continued.");
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        cb = new CyclicBarrier(SIZE, new Runnable() {
            public void run() {//当其余的线程都已达到barrier,先执行当前任务,再让其余线程继续执行
                System.out.println("CyclicBarrier's parties is: " + cb.getParties());
            }
        });
        // 新建5个任务
        for (int i = 0; i < SIZE; i++)
            new CyclicBarrierTest1().start();
    }
}

    运行结果:

Thread-0 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-1 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-3 continued. Thread-1 continued. Thread-4 continued. Thread-0 continued. Thread-2 continued.

相关文章
相关标签/搜索