(十一)java多线程之Phaser

本人邮箱: <kco1989@qq.com>
欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kco
github: https://github.com/kco1989/kco
代码已经所有托管github有须要的同窗自行下载java

引言

讲完了CyclicBarrierCountDownLatch,今天讲一个跟这两个类有点相似的Phaser.->移相器git

java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser.Phaser拥有与CyclicBarrierCountDownLatch相似的功劳.可是这个类提供了更加灵活的应用.CountDownLatchCyclicBarrier都是只适用于固定数量的参与者.移相器适用于可变数目的屏障,在这个意义上,能够在任什么时候间注册新的参与者.而且在抵达屏障是能够注销已经注册的参与者.所以,注册到同步移相器的参与者的数目可能会随着时间的推移而变化.如CyclicBarrier同样,移相器能够重复使用,这意味着当前参与者到达移相器后,能够再一次注册本身并等待另外一次到达.所以,移相器会有多代.一旦为某个特定相位注册的全部参与者都到达移相器,就增长相数.相数从零开始,在达到Integer.MAX_VALUE后,再次绕回0.当移相器发生变化时,经过重写onAdvance方法,能够自行可选操做.这个方法也可用于终止移相器.移相器一旦被终止,全部的同步方法就会当即返回,并尝试注册新的失败的参与者.
移相器的另外一个重要特征是:移相器多是分层的,这容许你以树形结构来安排移相器以减小竞争.很明显,更小的组将拥有更少的竞争同步的参与者.所以,将大量的参与者分红较小的组能够减小竞争.虽然建立移相器能增长中的吞吐量,可是这须要更多的开销.最后,移相器的另外一个重要的特征在于监控功能,使用独立的对象能够监视移相器的当前状态.监视器能够查询注册到移相器的参与者的数量,以及已经到达和尚未到达某个特定相数的参与者的数量.1github

例子1 用Phaser代替CyclicBarrier

将以前(九)java多线程之CyclicBarrier旅游的例子改写一下,微信

Phaser替代CyclicBarrier比较简单,CyclicBarrier的await()方法能够直接用Phaser的arriveAndAwaitAdvance()方法替代
CyclicBarrierPhaser:CyclicBarrier只适用于固定数量的参与者,而Phaser适用于可变数目的屏障.多线程

  • TourismRunnable 旅游类dom

public class TourismRunnable implements Runnable{
    Phaser phaser;
    Random random;
    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
        this.random = new Random();
    }

    @Override
    public void run() {
        tourism();
    }

    /**
     * 旅游过程
     */
    private void tourism() {
        goToStartingPoint();
        goToHotel();
        goToTourismPoint1();
        goToTourismPoint2();
        goToTourismPoint3();
        goToEndPoint();
    }

    /**
     * 装备返程
     */
    private void goToEndPoint() {
        goToPoint("飞机场,准备登机回家");
    }

    /**
     * 到达旅游点3
     */
    private void goToTourismPoint3() {
        goToPoint("旅游点3");
    }

    /**
     * 到达旅游点2
     */
    private void goToTourismPoint2() {
        goToPoint("旅游点2");
    }

    /**
     * 到达旅游点1
     */
    private void goToTourismPoint1() {
        goToPoint("旅游点1");
    }

    /**
     * 入住酒店
     */
    private void goToHotel() {
        goToPoint("酒店");
    }

    /**
     * 出发点集合
     */
    private void goToStartingPoint() {
        goToPoint("出发点");
    }

    private int getRandomTime(){
        int time = this.random.nextInt(400) + 100;
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return time;
    }

    private void goToPoint(String point){
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + " 花了 " + getRandomTime() + " 时间才到了" + point);
            phaser.arriveAndAwaitAdvance();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • TestMain 测试类ide

public class TestMain {

    public static void main(String[] args) {
        String name = "明刚红丽黑白";
        Phaser phaser = new Phaser(name.length());
        List<Thread> tourismThread = new ArrayList<>();
        for (char ch : name.toCharArray()){
            tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch));
        }
        for (Thread thread : tourismThread){
            thread.start();
        }
    }
}

运行结果学习

小红 花了 122 时间才到了出发点
小明 花了 259 时间才到了出发点
小白 花了 267 时间才到了出发点
小丽 花了 306 时间才到了出发点
小刚 花了 385 时间才到了出发点
小黑 花了 486 时间才到了出发点
小白 花了 299 时间才到了酒店
小刚 花了 345 时间才到了酒店
小黑 花了 449 时间才到了酒店
小丽 花了 452 时间才到了酒店
小明 花了 462 时间才到了酒店
小红 花了 480 时间才到了酒店
小丽 花了 107 时间才到了旅游点1
小红 花了 141 时间才到了旅游点1
小明 花了 212 时间才到了旅游点1
小黑 花了 286 时间才到了旅游点1
小白 花了 305 时间才到了旅游点1
小刚 花了 386 时间才到了旅游点1
小丽 花了 119 时间才到了旅游点2
小黑 花了 222 时间才到了旅游点2
小明 花了 259 时间才到了旅游点2
小刚 花了 299 时间才到了旅游点2
小红 花了 354 时间才到了旅游点2
小白 花了 422 时间才到了旅游点2
小丽 花了 112 时间才到了旅游点3
小白 花了 182 时间才到了旅游点3
小刚 花了 283 时间才到了旅游点3
小明 花了 295 时间才到了旅游点3
小红 花了 386 时间才到了旅游点3
小黑 花了 483 时间才到了旅游点3
小黑 花了 152 时间才到了飞机场,准备登机回家
小白 花了 178 时间才到了飞机场,准备登机回家
小明 花了 248 时间才到了飞机场,准备登机回家
小红 花了 362 时间才到了飞机场,准备登机回家
小丽 花了 428 时间才到了飞机场,准备登机回家
小刚 花了 432 时间才到了飞机场,准备登机回家
  • Phaser(int parties) 建立一个指定parties个线程参与同步任务.测试

  • ``this

例子2 用Phaser代替CountDownLatch

将以前(十)java多线程之CountDownLatch旅游回来坐飞机的例子改写一下,

CountDownLatch主要使用的有2个方法

  • await()方法,可使线程进入等待状态,在Phaser中,与之对应的方法是awaitAdvance(int n)

  • countDown(),使计数器减一,当计数器为0时全部等待的线程开始执行,在Phaser中,与之对应的方法是arrive()

  • Airplane飞机类

public class Airplane {
    private Phaser phaser;
    private Random random;
    public Airplane(int peopleNum){
        phaser = new Phaser(peopleNum);
        random = new Random();
    }

    /**
     * 下机
     */
    public void getOffPlane(){
        try {
            String name = Thread.currentThread().getName();
            Thread.sleep(random.nextInt(500));
            System.out.println(name + " 在飞机在休息着....");
            Thread.sleep(random.nextInt(500));
            System.out.println(name + " 下飞机了");
            phaser.arrive();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doWork(){

        String name = Thread.currentThread().getName();
        System.out.println(name + "准备作 清理 工做");
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("飞机的乘客都下机," + name + "能够开始作 清理 工做");

    }

}
  • TestMain 测试类(没有改)

public class TestMain {

    public static void main(String[] args) {
        String visitor = "明刚红丽黑白";
        String kongjie = "美惠花";

        Airplane airplane = new Airplane(visitor.length());
        Set<Thread> threads = new HashSet<>();
        for (int i = 0; i < visitor.length(); i ++){
            threads.add(new Thread(() -> {
                airplane.getOffPlane();
            }, "小" + visitor.charAt(i)));
        }
        for (int i = 0; i < kongjie.length(); i ++){
            threads.add(new Thread(() ->{
                airplane.doWork();
            }, "小" + kongjie.charAt(i) + "空姐"));
        }

        for (Thread thread : threads){
            thread.start();
        }
    }
}

运行结果

小花空姐准备作 清理 工做
小惠空姐准备作 清理 工做
小美空姐准备作 清理 工做
小黑 在飞机在休息着....
小明 在飞机在休息着....
小红 在飞机在休息着....
小丽 在飞机在休息着....
小刚 在飞机在休息着....
小明 下飞机了
小红 下飞机了
小黑 下飞机了
小白 在飞机在休息着....
小丽 下飞机了
小刚 下飞机了
小白 下飞机了
飞机的乘客都下机,小美空姐能够开始作 清理 工做
飞机的乘客都下机,小花空姐能够开始作 清理 工做
飞机的乘客都下机,小惠空姐能够开始作 清理 工做

例子3 高级用法

前面两个例子都比较简单,如今咱们还用Phaser一个比较高级一点用法.仍是用旅游的例子
假若有这么一个场景,在旅游过程当中,有可能很凑巧遇到几个朋友,而后他们据说大家在旅游,因此想要加入一块儿继续接下来的旅游.也有可能,在旅游过程当中,忽然其中有某几我的临时有事,想退出此次旅游了.在自由行的旅游,这是很常见的一些事情.若是如今咱们使用CyclicBarrier这个类来实现,咱们发现是实现不了,这是用Phaser就可实现这个功能.

  • 首先,咱们改写旅游类 TourismRunnable,此次改动相对比较多一点

public class TourismRunnable implements Runnable{
    Phaser phaser;
    Random random;
    /**
     * 每一个线程保存一个朋友计数器,好比小红第一次遇到一个朋友,则取名`小红的朋友0号`,
     * 而后旅游到其余景点的时候,若是小红又遇到一个朋友,这取名为`小红的朋友1号`
     */
    AtomicInteger frientCount = new AtomicInteger();
    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
        this.random = new Random();
    }

    @Override
    public void run() {
        tourism();
    }

    /**
     * 旅游过程
     */
    private void tourism() {
        switch (phaser.getPhase()){
            case 0:if(!goToStartingPoint()) break;
            case 1:if(!goToHotel()) break;
            case 2:if(!goToTourismPoint1()) break;
            case 3:if(!goToTourismPoint2()) break;
            case 4:if(!goToTourismPoint3()) break;
            case 5:if(!goToEndPoint()) break;
        }
    }

    /**
     * 准备返程
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToEndPoint() {
        return goToPoint("飞机场,准备登机回家");
    }

    /**
     * 到达旅游点3
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToTourismPoint3() {
        return goToPoint("旅游点3");
    }

    /**
     * 到达旅游点2
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToTourismPoint2() {
        return goToPoint("旅游点2");
    }

    /**
     * 到达旅游点1
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToTourismPoint1() {
        return goToPoint("旅游点1");
    }

    /**
     * 入住酒店
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToHotel() {
        return goToPoint("酒店");
    }

    /**
     * 出发点集合
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToStartingPoint() {
        return goToPoint("出发点");
    }

    private int getRandomTime() throws InterruptedException {
        int time = random.nextInt(400) + 100;
        Thread.sleep(time);
        return time;
    }

    /**
     * @param point 集合点
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean goToPoint(String point){
        try {
            if(!randomEvent()){
                phaser.arriveAndDeregister();
                return false;
            }
            String name = Thread.currentThread().getName();
            System.out.println(name + " 花了 " + getRandomTime() + " 时间才到了" + point);
            phaser.arriveAndAwaitAdvance();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 随机事件
     * @return 返回true,说明还要继续旅游,不然就临时退出了
     */
    private boolean randomEvent() {
        int r = random.nextInt(100);
        String name = Thread.currentThread().getName();
        if (r < 10){
            int friendNum =  1;
            System.out.println(name + ":在这里居然遇到了"+friendNum+"个朋友,他们说要一块儿去旅游...");
            phaser.bulkRegister(friendNum);
            for (int i = 0; i < friendNum; i ++){
                new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.getAndAdd(1) + "号").start();
            }
        }else if(r > 90){
            System.out.println(name + ":忽然有事要离开一下,不和他们继续旅游了....");
            return false;
        }
        return true;
    }
}

代码解析

tourism这个方法的case写法看起有点怪异,若是是为了知足咱们这个需求,这里的case的意思是-->case 第几回集合: if(是否继续旅游) 若不继续则break,不然继续后面的旅游
phaser.getPhase() 初始值为0,若是所有人到达集合点这个Phase+1,若是phaser.getPhase()达到Integer的最大值,这从新清空为0,在这里表示第几回集合了
phaser.arriveAndDeregister(); 表示这我的旅游到这个景点以后,就离开这个旅游团了
phaser.arriveAndAwaitAdvance(); 表示这我的在这个景点旅游完,在等待其余人
phaser.bulkRegister(friendNum); 表示这我的在这个景点遇到了friendNum个朋友,他们要加入一块儿旅游

  • 最后咱们的测试代码仍是差很少的,比例子1多了一个到齐后的操做

public class TestMain {

    public static void main(String[] args) {
        String name = "明刚红丽黑白";
        Phaser phaser = new SubPhaser(name.length());
        List<Thread> tourismThread = new ArrayList<>();
        for (char ch : name.toCharArray()){
            tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch));
        }
        for (Thread thread : tourismThread){
            thread.start();
        }
    }
    public static class SubPhaser extends Phaser{
        public SubPhaser(int parties) {
            super(parties);
        }

        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            System.out.println(Thread.currentThread().getName() + ":所有"+getArrivedParties()+"我的都到齐了,如今是第"+(phase + 1)
                    +"次集合准备去下一个地方..................\n");
            return super.onAdvance(phase, registeredParties);
        }
    }
}

运行输出如下结果:

小白 花了 109 时间才到了出发点
小红 花了 135 时间才到了出发点
小丽 花了 218 时间才到了出发点
小黑 花了 297 时间才到了出发点
小明 花了 303 时间才到了出发点
小刚 花了 440 时间才到了出发点
小刚:所有6我的都到齐了,如今是第1次集合准备去下一个地方..................

小明:忽然有事要离开一下,不和他们继续旅游了....
小刚:忽然有事要离开一下,不和他们继续旅游了....
小红 花了 127 时间才到了酒店
小丽 花了 162 时间才到了酒店
小黑 花了 365 时间才到了酒店
小白 花了 474 时间才到了酒店
小白:所有4我的都到齐了,如今是第2次集合准备去下一个地方..................

小黑:忽然有事要离开一下,不和他们继续旅游了....
小丽:忽然有事要离开一下,不和他们继续旅游了....
小红 花了 348 时间才到了旅游点1
小白 花了 481 时间才到了旅游点1
小白:所有2我的都到齐了,如今是第3次集合准备去下一个地方..................

小白 花了 128 时间才到了旅游点2
小红 花了 486 时间才到了旅游点2
小红:所有2我的都到齐了,如今是第4次集合准备去下一个地方..................

小红 花了 159 时间才到了旅游点3
小白 花了 391 时间才到了旅游点3
小白:所有2我的都到齐了,如今是第5次集合准备去下一个地方..................

小白:在这里居然遇到了1个朋友,他们说要一块儿去旅游...
小白 花了 169 时间才到了飞机场,准备登机回家
小红 花了 260 时间才到了飞机场,准备登机回家
小白的朋友0号 花了 478 时间才到了飞机场,准备登机回家
小白的朋友0号:所有3我的都到齐了,如今是第6次集合准备去下一个地方..................

经过结果配合我上面的解释,仍是比较好理解的.

遗漏

这里还有phaser的中断和树形结构没有举例子,后续想到比较后的例子,我会继续作补充的

后记

这篇是我目前为止写的最慢的一篇博文,由于以前没有使用过phaser,致使在写的出现不少问题.因此一边查资料,一边学习,总算仍是把这个phaser给理解了.

打赏

若是以为个人文章写的还过得去的话,有钱就捧个钱场,没钱给我捧我的场(帮我点赞或推荐一下)
微信打赏
支付宝打赏


  1. java7
相关文章
相关标签/搜索