JDK中为了处理线程之间的同步问题,除了提供锁机制以外,还提供了几个很是有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。html
Phaser 是JDK1.7版本中新增的,是一个可重用的同步barrier,它的功能与 CountDownLatch、CyclicBarrier 类似,可是使用起来更加灵活。能够java
构造方法bash
方法名 | 描述 |
---|---|
Phaser() | 构建一个Phaser |
Phaser(int parties) | 建立一个指定屏障数量的Phaser |
Phaser(Phaser parent) | 至关于 Phaser(parent, 0) |
Phaser(Phaser parent, int parties) | 建立一个指定屏障数量的Phaser,此phaser是注册在另外一个Phaser parent下 |
方法摘要并发
方法名 | 描述 |
---|---|
public int arrive() | 到达此phaser的屏障点,使phaser的到达的线程数加一,但不会阻塞等待其余线程。
返回:
phase值,即当前阶段(周期)的索引,或者是负值(当Phaser 中止时) |
public int arriveAndDeregister() | 到达此phaser的屏障点,使phaser的到达的线程数加一,而且会取消一个屏障点的注册。也不会阻塞等待其余线程。
返回:
phase值,即当前阶段(周期)的索引,或者是负值(当Phaser 中止时) |
public int arriveAndAwaitAdvance() | 到达此phaser的屏障点,而且阻塞等待其余线程到达此屏障点。注意:这是
非中断的阻塞
,此方法与awaitAdvance(arrive())等同。若是你但愿阻塞机制支持timeout、interrupted响应,可使用相似的其余方法(参见下文)。若是你但愿到达后且注销,并且阻塞等到当前phase下其余的parties到达,可使用awaitAdvance(arriveAndDeregister())方法组合。
返回:
phase值,即当前阶段(周期)的索引;若是Phaser 中止,则返回负值 |
public int awaitAdvance(int phase) | 在指定的阶段(周期)phase下等待其余线程到达屏障点,注意:这是
非中断的阻塞
。若是指定的phase与Phaser当前的phase不一致,或者Phaser 中止了,则当即返回。
参数 phase:
一般就是arrive()、arriveAndDeregister()的返回值; |
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException |
此方法是可中断的,其余与awaitAdvance()一致 |
public int awaitAdvanceInterruptibly( int phase, long timeout,TimeUnit unit) throws InterruptedException, TimeoutException |
超时等待方法,其余与awaitAdvance()一致 |
public int register() | 新注册一个party,致使Phaser内部registerPaties数量加1;若是此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,若是Phaser已经中断,将会返回负数。 |
public int bulkRegister(int parties) | 批量注册多个party,与register()类似 |
protected boolean onAdvance(int phase, int registeredParties) | barrier action(屏障方法)。若是须要,则必须继承Phaser类,重写此方法。若是返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),不然能够继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。 默认实现为:return registeredParties == 0;在不少状况下,开发者能够经过重写此方法,来实现自定义的 |
public void forceTermination() | 强制终止,此后Phaser对象将不可用,即register等将再也不有效。此方法将会致使Queue中全部的waiter线程被唤醒。这个方法对于在一个或多个任务遇到意外异常以后协调恢复是颇有用的。 |
public int getArrivedParties() | 获取已经到达的parties个数。 |
public int getUnarrivedParties() | 获取没有到达的parties个数。 |
public Phaser getParent() | 获取其父亲类Phaser,没有则返回null |
public Phaser getRoot() | 返回该phaser的根祖先,若是没有父类,返回此phaser。 |
public boolean isTerminated() | 若是该phaser被终止,则返回true。 |
例子很简单,模拟跑步比赛的过程,分为三个阶段:一、参赛者到达起跑点,并在起跑点等待其余参赛者;二、参赛者齐人后,开始准备,并等待枪声。三、参赛这到达终点,并结束比赛,再也不等待任何状况。ide
public class PhaserTest{
public static MyPhaser myPhaser = new MyPhaser();
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
// 一次性注册5个party,即创建5个屏障点
myPhaser.bulkRegister(5);
for (int i = 0; i < 5; i++) {
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
// 第一阶段(周期),phaser的周期数初始值为0
System.out.println(Thread.currentThread().getName() + "到达了起跑点!");
// 到达了屏障点(起跑点),阻塞等待其余线程
myPhaser.arriveAndAwaitAdvance();
// 继续运行,将进入第二阶段,phaser的周期数加一
System.out.println(Thread.currentThread().getName() + "准备起跑!");
// 到达了屏障点(准备起跑),阻塞等待其余线程
myPhaser.arriveAndAwaitAdvance();
// 进入第三阶段
System.out.println(Thread.currentThread().getName() + "到达了终点!");
// 参数者到达了终点,结束比赛,再也不等待其余参赛者
myPhaser.arriveAndDeregister();// 取消注册一个party
}
}, "参赛者" + i + "号");
runner.start();
}
}
}
复制代码
MyPhaser类,定制 barrier action(屏障事件)
函数
public class MyPhaser extends Phaser {
@Override
//改写onAdvance方法
public boolean onAdvance(int phase, int registeredParties) {
//判断当前的Phaser是否终止
if (!isTerminated()) {
// 分红三个阶段,在不一样的阶段(周期),执行不一样的屏障事件
if (phase == 0) {
// ....
System.out.println("第一阶段:全部参赛者都到达了起跑点!");
} else if (phase == 1) {
// ....
System.out.println("第二阶段:全部参赛者都已经就位,并准备好!比赛正式开始");
} else if (phase == 2) {
// ....
System.out.println("第三阶段:全部参赛者都到达终点,比赛结束!!");
}
}
return super.onAdvance(phase, registeredParties);
}
}
复制代码
运行结果:
工具
参赛者0号到达了起跑点!
参赛者3号到达了起跑点!
参赛者4号到达了起跑点!
参赛者2号到达了起跑点!
参赛者1号到达了起跑点!
第一阶段:全部参赛者都到达了起跑点!
参赛者0号准备起跑!
参赛者1号准备起跑!
参赛者2号准备起跑!
参赛者3号准备起跑!
参赛者4号准备起跑!
第二阶段:全部参赛者都已经就位,并准备好!比赛正式开始
参赛者4号到达了终点!
参赛者1号到达了终点!
参赛者0号到达了终点!
参赛者2号到达了终点!
参赛者3号到达了终点!
第三阶段:全部参赛者都到达终点,比赛结束!
复制代码
下面的例子:每个Phaser周期类注册的线程数目不能超过TASKS_PER_PHASER
(例子中是4个),不然就要增长一层子phaser层。ui
public class PhaserTest6 {
//
private static final int = 4;
public static void main(String args[]) throws Exception {
//
final int phaseToTerminate = 3;
//建立一个Phaser父类对象
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) { //屏障方法
System.out.println("====== " + phase + " ======");
return phase == phaseToTerminate || registeredParties == 0;
}
};
//建立10个任务
final Task tasks[] = new Task[10];
build(tasks, 0, tasks.length, phaser);
for (int i = 0; i < tasks.length; i++) {
System.out.println("starting thread, id: " + i);
final Thread thread = new Thread(tasks[i]);
thread.start();
}
}
//递归分层,
public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
//若是任务的数量超过每一层的phaser的阈值TASKS_PER_PHASER,则要继续分层
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
//当前的phaser(ph)做为父周期,来建立一个子phaser
build(tasks, i, j, new Phaser(ph));
}
} else {
//线程的数量在阈值内,无需分红,能够直接注册线程到当前的Phaser
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(i, ph);
}
}
public static class Task implements Runnable {
//
private final int id;
private final Phaser phaser;
public Task(int id, Phaser phaser) {
this.id = id;
this.phaser = phaser;
this.phaser.register();
}
@Override
public void run() {
while (!phaser.isTerminated()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// NOP
}
System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
phaser.arriveAndAwaitAdvance();
}
}
}
}
复制代码
须要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),能够考虑使用较小的TASKS_PER_PHASER值,例如4。反之能够适当增大this
运行结果:spa
in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======
复制代码
文章源地址:https://www.cnblogs.com/jinggod/p/8494624.html