Java并发编程中级篇(五):更强大的多阶段并发控制Phaser

Java API还提供了一个强大的同步辅助类Pahser,它能够控制多阶段并发辅助任务。当咱们有并发任务,而且须要分阶段执行,每阶段都须要等待全部线程执行本阶段执行完毕才可以继续执行,这种机制就很是好用。Phaser类一样须要一个整形做为初始化参数来肯定有几个线程参与执行。java

下面咱们来看一个例子,在这个例子中咱们把一个并发任务分为三个阶段,每一阶段都须要全部线程完成后才能继续执行下一阶段的任务。并发

咱们来定义一个带有Phaser机制的线程类PhaserRunnable,在线程开始运行的时候调用arriverAndAwaitAdvance()方法来表明线程已经进入执行状态,其实这个也能够算做一步,就是等待全部任务线程都启动后你们一块儿执行。而后开始执行第一步任务,每一个线程随机休眠一段时间来模拟任务执行耗时,执行完毕后调用arriverAndAwaitAdvance()来表示任务执行完毕,而后等待其余线程,等全部线程都滴啊用arriverAndAwaitAdvance()方法后,全部休眠的线程都被唤醒而后继续执行第二步。也是随机休眠一段时间后,第二部执行完毕,可是这里有一个不一样,若是休眠时间是一个双数那么线程将再也不执行第三步操做而是直接返回,这里要调用phaser.arriveAndDeregister()方法来表示线程已经结束,之后再也不须要等待我了。dom

public class PhaserRunnable implements Runnable{
    private Phaser phaser;

    public PhaserRunnable(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: start.\n", Thread.currentThread().getName());

        long duration1 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step1 start duration %d seconds.\n", Thread.currentThread().getName(), duration1);
        try {
            TimeUnit.SECONDS.sleep(duration1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step1 done.\n", Thread.currentThread().getName());
        phaser.arriveAndAwaitAdvance();

        long duration2 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step2 start duration %d seconds.\n", Thread.currentThread().getName(), duration2);
        try {
            TimeUnit.SECONDS.sleep(duration2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (duration2 % 2 == 0) {
            System.out.printf("%s: step2 done and task finished.\n", Thread.currentThread().getName());
            phaser.arriveAndDeregister();
            return;
        } else {
            System.out.printf("%s: step2 done.\n", Thread.currentThread().getName());
            phaser.arriveAndAwaitAdvance();
        }

        long duration3 = (long) (Math.random() * 10) + 1;
        System.out.printf("%s: step3 start duration %d seconds.\n", Thread.currentThread().getName(), duration3);
        try {
            TimeUnit.SECONDS.sleep(duration3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: step3 done.\n", Thread.currentThread().getName());
        phaser.arriveAndDeregister();
    }
}

主方法类中启动三个任务线程,若是第二步都是休眠单数秒你能够尝试多运行几回,最后打印Phaser状态。ide

public class Main {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread[] threads = new Thread[3];

        for (int i = 0; i < 3; i++) {
            threads[i] = new Thread(new PhaserRunnable(phaser));
            threads[i].start();
        }

        for (int i = 0; i < 3; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.printf("%s: Phaser terminated.\n", Thread.currentThread().getName());
    }
}

查看控制台日志: this

Thread-2: start.
Thread-1: start.
Thread-0: start.
Thread-2: step1 start duration 9 seconds.
Thread-0: step1 start duration 2 seconds.
Thread-1: step1 start duration 6 seconds.
Thread-0: step1 done.
Thread-1: step1 done.
Thread-2: step1 done.
Thread-2: step2 start duration 7 seconds.
Thread-0: step2 start duration 2 seconds.
Thread-1: step2 start duration 1 seconds.
Thread-1: step2 done.
Thread-0: step2 done and task finished.
Thread-2: step2 done.
Thread-2: step3 start duration 3 seconds.
Thread-1: step3 start duration 7 seconds.
Thread-2: step3 done.
Thread-1: step3 done.
main: Phaser terminated.

Phaser类有两种状态:Active和Termination。有任务参与的时候Phaser状态为Active;当全部参与同步的线程都结束后Phaser也就没有参与者了,这时Phaser进入了Termination态。当Phaser处于终止态,同步方法arriveAndAwaitAdvance()会当即返回。线程

Phaser类的一个重大特性就是没必要对它的方法进行异常处理。不像其余通不辅助类,被Phaser类置于休眠状态的线程不会响应中断事件,也不会抛出InterruptedException异常。日志

Phaser类还提供了一些改变Phaser对象的方法,这些方法以下。code

  • arriver():这个方法通知phaser对象一个参与者已经完成了当前阶段,可是他等待其余参与者都完成当前阶段。必须当心使用这个方法,由于他不会与其它线程同步。
  • awaitAdvance(int phase):若是传入的阶段参数与当前阶段一致,那么这个方法会将当前线程置于休眠,直到这个阶段的全部参与者都完成运行。若是传入阶段参数与当前阶段不一致,这个方法会当即返回。
  • awaitAdvanceInterruptibly(int phaser):这个方法跟awaitAdvance(int phase)同样,不一样之处在于,若是这个方法中休眠的线程被中断,它将抛出InterruptedException异常。

Phaser类能够动态地改变参与线程的数量:对象

  • register():这个方法能够将一个新的线程注册到Phaser中,这个新的参与者被当成本阶段的任务来执行。
  • bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,全部这些新的参与者都将被当成本阶段的任务来执行。

Phaser提供了一个方法forceTermination()方法来强制Phaser进入Termination状态,这个方法无论Phaser中是否还有注册的线程。当一个参与的线程出现错误,强制phaser终止是有意义的。当一个Phaser处于Termination状态的时候,awaitAdvance()和arriveAndAwaitAdvance()方法都马上返回一个负数,这样就能够验证Phaser状态是否是终止了,能够根据这个状态来终止线程的执行,直接返回或者作一些处理,好比数据回滚什么的。事件

相关文章
相关标签/搜索