并发工具类(五) Phaser类

前言

  JDK中为了处理线程之间的同步问题,除了提供锁机制以外,还提供了几个很是有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。html

简介

  Phaser 是JDK1.7版本中新增的,是一个可重用的同步barrier,它的功能与 CountDownLatch、CyclicBarrier 类似,可是使用起来更加灵活。能够java

用来解决控制多个线程分阶段共同完成任务的情景问题。

   Phaser中有两个重要的计数:

  • phase
    :当前的周期索引(或者 阶段索引),初始值为0,当全部线程执行完本阶段的任务后,phase就会加一,进入下一阶段;能够结合onAdvance()方法,在不一样的阶段,执行不一样的屏障方法。
  • parties
    :注册的线程数,即Phaser要监控的线程数量,或者说是 创建的屏障的数量。屏障的数量不是固定的,每一个阶段的屏障的数量均可以是不同。

下面详细介绍Phaser一些机制

一、Registration(注册机制):
与其余barrier不一样的是,Phaser中的
“注册的同步者(parties)”
会随时间而变化,Phaser能够经过构造器初始化parties个数,也能够在Phaser运行期间随时加入(方法
register( ), bulkRegister(int)
)新的parties,以及在运行期间注销(方法
arriveAndDeregister( )
)parties。
运行时能够随时加入、注销parties,只会影响Phaser内部的计数器,它创建任何内部的bookkeeping(帐本),所以task不能查询本身是否已经注册了,固然你能够经过实现子类来达成这一设计要求。

二、Synchronization(同步机制):
相似于CyclicBarrier,Phaser也能够awaited屡次,它的arrivedAndAwaitAdvance()方法的效果相似于CyclicBarrier的await()。Phaser的每一个周期(generation)都有一个phase数字,phase 从0开始,当全部的已注册的parties都到达后(arrive)将会致使此phase数字自增(advance),当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既能够标记和控制parties的wait行为、唤醒等待的时机。

  • Arrival:
    Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞(block),可是会返回相应的phase数字,当此phase中最后一个party也arrive之后,phase数字将会增长,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点相似于CyclicBarrier的barrier到达机制;更灵活的是,咱们能够经过重写onAdvance方法来实现更多的触发行为。
  • Waiting:
    Phaser中的awaitAdvance()方法,须要指定一个phase数字,表示此Thread阻塞直到phase推动到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期开始(或者当前phase结束)。不像CyclicBarrier,即便等待Thread已经interrupted,awaitAdvance方法会继续等待。Phaser提供了Interruptible和Timout的阻塞机制,不过当线程Interrupted或者timout以后将会抛出异常,而不会修改Phaser的内部状态。若是必要的话,你能够在遇到此类异常时,进行相应的恢复操做,一般是在调用forceTermination()方法以后。
    Phaser一般在ForJoinPool中执行tasks,它能够在有task阻塞等待advance时,确保其余tasks的充分并行能力。

三、Termination(终止):
Phaser能够进入Termination状态,能够经过isTermination()方法判断;当Phaser被终止后,全部的同步方法将会当即返回(解除阻塞),不须要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。固然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即全部的parties都会被deregister,即register个数为0。

四、Tiering(分层):
Phaser能够“分层”,以tree的方式构建Phaser来下降“竞争”。若是一个Phaser中有大量parties,这会致使严重的同步竞争,因此咱们能够将它们分组并共享一个parent Phaser,这样能够提升吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。当Child Phaser中中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;当Child Phaser中的parties变为0时(好比由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

五、Monitoring.(监控):
同步的方法只会被register操做调用,对于当前state的监控方法能够在任什么时候候调用,好比getRegisteredParties()获取已经注册的parties个数,getPhase()获取当前phase周期数等;由于这些方法并不是同步,因此只能反映当时的瞬间状态。

Phaser的API介绍

构造方法bash

方法名 描述
Phaser() 构建一个Phaser
Phaser(int parties) 建立一个指定屏障数量的Phaser
Phaser(Phaser parent) 至关于 Phaser(parent, 0)
Phaser(Phaser parent, int parties) 建立一个指定屏障数量的Phaser,此phaser是注册在另外一个Phaser parent下

方法摘要并发

方法名 描述
public int arrive() 到达此phaser的屏障点,使phaser的到达的线程数加一,但不会阻塞等待其余线程。
返回:
phase值,即当前阶段(周期)的索引,或者是负值(当Phaser 中止时)
public int arriveAndDeregister() 到达此phaser的屏障点,使phaser的到达的线程数加一,而且会取消一个屏障点的注册。也不会阻塞等待其余线程。
返回:
phase值,即当前阶段(周期)的索引,或者是负值(当Phaser 中止时)
public int arriveAndAwaitAdvance() 到达此phaser的屏障点,而且阻塞等待其余线程到达此屏障点。注意:这是
非中断的阻塞
,此方法与awaitAdvance(arrive())等同。若是你但愿阻塞机制支持timeout、interrupted响应,可使用相似的其余方法(参见下文)。若是你但愿到达后且注销,并且阻塞等到当前phase下其余的parties到达,可使用awaitAdvance(arriveAndDeregister())方法组合。
返回:
phase值,即当前阶段(周期)的索引;若是Phaser 中止,则返回负值
public int awaitAdvance(int phase) 在指定的阶段(周期)phase下等待其余线程到达屏障点,注意:这是
非中断的阻塞
。若是指定的phase与Phaser当前的phase不一致,或者Phaser 中止了,则当即返回。
参数 phase:
一般就是arrive()、arriveAndDeregister()的返回值;
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException
此方法是可中断的,其余与awaitAdvance()一致
public int awaitAdvanceInterruptibly(
int phase, long timeout,TimeUnit unit)
throws InterruptedException, TimeoutException
超时等待方法,其余与awaitAdvance()一致
public int register() 新注册一个party,致使Phaser内部registerPaties数量加1;若是此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,若是Phaser已经中断,将会返回负数。
public int bulkRegister(int parties) 批量注册多个party,与register()类似
protected boolean onAdvance(int phase, int registeredParties) barrier action(屏障方法)。若是须要,则必须继承Phaser类,重写此方法。若是返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),不然能够继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。
默认实现为:return registeredParties == 0;在不少状况下,开发者能够经过重写此方法,来实现自定义的
public void forceTermination() 强制终止,此后Phaser对象将不可用,即register等将再也不有效。此方法将会致使Queue中全部的waiter线程被唤醒。这个方法对于在一个或多个任务遇到意外异常以后协调恢复是颇有用的。
public int getArrivedParties() 获取已经到达的parties个数。
public int getUnarrivedParties() 获取没有到达的parties个数。
public Phaser getParent() 获取其父亲类Phaser,没有则返回null
public Phaser getRoot() 返回该phaser的根祖先,若是没有父类,返回此phaser。
public boolean isTerminated() 若是该phaser被终止,则返回true。

@ Example1 多阶段(周期)、带屏障事件示例

  例子很简单,模拟跑步比赛的过程,分为三个阶段:一、参赛者到达起跑点,并在起跑点等待其余参赛者;二、参赛者齐人后,开始准备,并等待枪声。三、参赛这到达终点,并结束比赛,再也不等待任何状况。ide

public class PhaserTest{

public static MyPhaser myPhaser = new MyPhaser();

    public static void main(String[] args) {
        MyPhaser myPhaser = new MyPhaser();
        // 一次性注册5个party,即创建5个屏障点
        myPhaser.bulkRegister(5);
        for (int i = 0; i < 5; i++) {
            Thread runner = new Thread(new Runnable() {

                @Override
                public void run() {
                    // 第一阶段(周期),phaser的周期数初始值为0
                    System.out.println(Thread.currentThread().getName() + "到达了起跑点!");
                    // 到达了屏障点(起跑点),阻塞等待其余线程
                    myPhaser.arriveAndAwaitAdvance();

                    // 继续运行,将进入第二阶段,phaser的周期数加一
                    System.out.println(Thread.currentThread().getName() + "准备起跑!");
                    // 到达了屏障点(准备起跑),阻塞等待其余线程
                    myPhaser.arriveAndAwaitAdvance();

                    // 进入第三阶段
                    System.out.println(Thread.currentThread().getName() + "到达了终点!");
                    // 参数者到达了终点,结束比赛,再也不等待其余参赛者
                    myPhaser.arriveAndDeregister();// 取消注册一个party
                }
            }, "参赛者" + i + "号");
            runner.start();
        }
    }
    }
复制代码

MyPhaser类,定制 barrier action(屏障事件)
函数

public class MyPhaser extends Phaser {

    @Override
    //改写onAdvance方法
    public boolean onAdvance(int phase, int registeredParties) {
        //判断当前的Phaser是否终止
        if (!isTerminated()) {
            // 分红三个阶段,在不一样的阶段(周期),执行不一样的屏障事件
            if (phase == 0) {
                // ....
                System.out.println("第一阶段:全部参赛者都到达了起跑点!");
            } else if (phase == 1) {
                // ....
                System.out.println("第二阶段:全部参赛者都已经就位,并准备好!比赛正式开始");
            } else if (phase == 2) {
                // ....
                System.out.println("第三阶段:全部参赛者都到达终点,比赛结束!!");
            }
        }
        return super.onAdvance(phase, registeredParties);
    }
    }
复制代码

运行结果:
工具

参赛者0号到达了起跑点!
参赛者3号到达了起跑点!
参赛者4号到达了起跑点!
参赛者2号到达了起跑点!
参赛者1号到达了起跑点!
第一阶段:全部参赛者都到达了起跑点!
参赛者0号准备起跑!
参赛者1号准备起跑!
参赛者2号准备起跑!
参赛者3号准备起跑!
参赛者4号准备起跑!
第二阶段:全部参赛者都已经就位,并准备好!比赛正式开始
参赛者4号到达了终点!
参赛者1号到达了终点!
参赛者0号到达了终点!
参赛者2号到达了终点!
参赛者3号到达了终点!
第三阶段:全部参赛者都到达终点,比赛结束!
复制代码

@ Example2 分层示例

下面的例子:每个Phaser周期类注册的线程数目不能超过TASKS_PER_PHASER(例子中是4个),不然就要增长一层子phaser层。ui

public class PhaserTest6 {  
    // 
    private static final int  = 4;  
  
    public static void main(String args[]) throws Exception {  
        // 
        final int phaseToTerminate = 3;  
        //建立一个Phaser父类对象
        final Phaser phaser = new Phaser() {  
            @Override  
            protected boolean onAdvance(int phase, int registeredParties) { //屏障方法 
                System.out.println("====== " + phase + " ======");  
                return phase == phaseToTerminate || registeredParties == 0;  
            }  
        };  
          
        //建立10个任务 
        final Task tasks[] = new Task[10];  
        build(tasks, 0, tasks.length, phaser);  
        for (int i = 0; i < tasks.length; i++) {  
            System.out.println("starting thread, id: " + i);  
            final Thread thread = new Thread(tasks[i]);  
            thread.start();  
        }  
    }  
  
  //递归分层,
    public static void build(Task[] tasks, int lo, int hi, Phaser ph) {  
        
        //若是任务的数量超过每一层的phaser的阈值TASKS_PER_PHASER,则要继续分层
        if (hi - lo > TASKS_PER_PHASER) {  
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {  
                int j = Math.min(i + TASKS_PER_PHASER, hi);  
                //当前的phaser(ph)做为父周期,来建立一个子phaser
                build(tasks, i, j, new Phaser(ph));  
            }  
        } else { 
            //线程的数量在阈值内,无需分红,能够直接注册线程到当前的Phaser
            for (int i = lo; i < hi; ++i)  
                tasks[i] = new Task(i, ph);  
        }  
    }  
  
    public static class Task implements Runnable {  
        // 
        private final int id;  
        private final Phaser phaser;  
  
        public Task(int id, Phaser phaser) {  
            this.id = id;  
            this.phaser = phaser;  
            this.phaser.register();  
        }  
  
        @Override  
        public void run() {  
            while (!phaser.isTerminated()) {  
                try {  
                    Thread.sleep(200);  
                } catch (InterruptedException e) {  
                    // NOP 
                }  
                System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);  
                phaser.arriveAndAwaitAdvance();  
            }  
        }  
    }  
}  
复制代码

须要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),能够考虑使用较小的TASKS_PER_PHASER值,例如4。反之能够适当增大this

运行结果:spa

in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======
复制代码

文章源地址:https://www.cnblogs.com/jinggod/p/8494624.html

相关文章
相关标签/搜索