(接上文《线程基础:JDK1.5+(8)——线程新特性(上)》)css
从这个小节開始,咱们将以一个“赛跑”的样例。解说JDK1.5环境下一些线程控制工具(包含Semaphore、CountDownLatch和java.util.concurrent.atomic子包),并且复习这个专题讲到的知识点:同步快、锁、线程池、BlockingQueue、Callable等。java
现在您不只可以经过咱们已经介绍的知识点,实现对100米田径比赛的初赛和决赛的模拟,而后发布出比赛的冠亚季军。还可以基于这些知识,模拟机场T1和T2跑道的起降工做。这里咱们一块儿实现前者的需求,首先来看看100米田径比赛的业务需求:安全
选手要參加比赛,首先就要报名。markdown
为了使功能足够简单,參赛选手的基本仅仅包含:姓名、起跑指数(最低速度)、參赛号三个信息。数据结构
同一个选手的状态不稳定性。也就是说某一个选手,在初赛阶段的速度多是A,但是决赛阶段因为发挥失常,可能速度就变成了B。而这一切都是随机进行的多线程
选手们首先进行“初赛”。所有选手的“初赛”成绩将进行汇总。成绩最好的5名选手。将參加“决赛”。“决赛”成绩最好的三名选手,将分别得到冠亚季军。并发布出来。并发
比赛场地仅仅有一个,总共同拥有5条跑道可供使用。dom
因此不论是“初赛”仍是“决赛”。同一时间參加比赛的选手都不能超过5名。ide
本小节兴许的内容中,咱们将对跑步比赛的实现代码进行屡次更改优化,但是无论实现代码怎样变化,有几个基本的模型是不会变化的:选手描写叙述和比赛结果描写叙述。函数
选手除了名字、參赛编号的描写叙述外,另外一个“最低速度”的描写叙述,这是为了保证无论这个选手跑多少次。其状态都不会太过失常。
“最低速度”是在建立选手时,系统随机生成的。
下面是Player选手类的定义代码:
package test.thread.track;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
/** * 这就是一个选手。 * 为了简单起见,咱们仅仅记录这个选手名字、选手编号、最低速度(建立时随机生成)。* 固然。最为一名选手,最重要的工做就是“跑步” * @author yinwenjie */ public class Player implements Callable<Result> , Comparable<Player>{ /** * 选手编号 */ private int number; /** * 选手的名字 */ private String name; /** * 最低速度 */ private float minSpeed; /** * 本次比赛结果 */ private Result result; /** * 跑道 */ private Semaphore runway; public Player(String name , int number , Semaphore runway) { this.name = name; this.number = number; this.runway = runway; // 这个最低速度设置是 8米/秒(不然就真是‘龟速’了) this.minSpeed = 8f; } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ @Override public Result call() throws Exception { try { // 申请上跑道 this.runway.acquire(); return this.doRun(); } catch(Exception e) { e.printStackTrace(System.out); } finally { // 都要进入初赛结果排序(中途退赛的成绩就为0) this.runway.release(); } // 假设运行到这里,说明异常发生了 this.result = new Result(Float.MAX_VALUE); return result; } /** * 開始跑步 * @return * @throws Exception */ private Result doRun() throws Exception { /* * 为了表现一个选手每一次跑步都有不一样的状态(但是都不会低于其最低状态), * 因此每一次跑步,系统都会为这个选手分配一个即时速度。 * * 这个即时速度不会低于其最小速度。但是也不会高于 14米/秒(不然就是‘超人’咯) * */ // 生成即时速度 float presentSpeed = 0f; presentSpeed = this.minSpeed * (1.0f + new Random().nextFloat()); if(presentSpeed > 14f) { presentSpeed = 14f; } // 计算跑步结果(BigDecimal的使用可自行查阅资料) BigDecimal calculation = new BigDecimal(100).divide(new BigDecimal(presentSpeed) , 3, RoundingMode.HALF_UP); float presentTime = calculation.floatValue(); // 让线程等待presentSpeed的时间。模拟该选手跑步的过程 synchronized (this) { this.wait((long)(presentTime * 1000f)); } // 返回跑步结果 this.result = new Result(presentTime); return result; } /** * @return the result */ public Result getResult() { return result; } /** * @return the number */ public int getNumber() { return number; } /** * @return the name */ public String getName() { return name; } /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(Player o) { /* * 两个选手间。还可以经过他们的result进行比較 * 耗时越小,固然越靠前 * */ Result myReslut = this.getResult(); Result targetReslut = o.getResult(); // 假设出现了reslut为null或者targetReslut为null,说明比赛结果出现了问题 // 固然假设真的出现这种问题,最可能的选手中途退赛了 if(myReslut == null) { return 1; } if(targetReslut == null) { return -1; } // 耗时越少的选手,固然应该排在“成绩”队列的越前面 if(myReslut.getTime() < targetReslut.getTime()) { return -1; } else { return 1; } } }
为何Player选手类要实现Comparable接口呢?在实现代码中,我将使用PriorityBlockingQueue队列,将选手根据其比赛成绩进行排序。为了可以保证PriorityBlockingQueue队列可以正常排序。因此需要实现该接口。
固然有的读者会说,实现Comparable接口后,使用普通的List也可以排序。但是List接口的实现类(ArrayList、LinkedList、Vector等等)并不是线程安全的。它们常用的处理场景仍是在某一个线程内进行数据线性化处理时使用。
而就眼下咱们的场景来看。程序猿根本就不知道某一个选手何时可以跑完100米,并且多个选手跑步的处理结果都将随机的送入队列。因此保证线程安全性是需求实现中重要的一部分。
固然。假设您硬是要使用传统的List也行。
能可以经过JDK提供的“同步包装器”(Collections.synchronizedList)将它变成线程安全的。但这个问题不是本小节讨论的范围。
另外。作为一个选手来讲。最根本的功能就是“跑”这个动做。并且根据需求,很是明显咱们需要在选手“跑完后”知道“跑”的成绩。
因此咱们还需要Player类实现Callable接口,以便让选手可以跑起来。
为了模拟跑的过程和选手的状态有关,代码中使用随机数肯定本次选手“跑”的速度。但是这个速度不会低于选手的“最低速度”(眼下给定的是14秒)。
另一个不会变更的基本类就是Result成绩:
package test.thread.track;
/** * 选手某一次跑步的成绩 * @author yinwenjie * */
public class Result {
/** * 记录了本次赛跑的用时状况 */
private float time;
public Result(float time) {
this.time = time;
}
/** * @return the time */
public float getTime() {
return time;
}
/** * @param time the time to set */
public void setTime(float time) {
this.time = time;
}
}
每一次选手“跑”的成绩都是不同的。成绩中仅仅包含一个属性。就是跑完100米的用时状况。
Semaphore信号量,是concurrent包的一个重要工具类。它经过申请和回收“证书”。实现多个线程对同一资源的訪问控制。详细的作法是,某个线程在訪问某个(可能出现资源抢占的)资源的时候,首先向Semaphore对象申请“证书”,假设没有拿到“证书”就一直堵塞;当拿到“证书”后。线程就解除堵塞状态,而后訪问资源;在完毕资源操做后,再向Semaphore对象归还“证书”;让咱们先来看看Semaphore信号的简单演示样例:
package test.thread.semaphore;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Throwable {
new SemaphoreTest().doTest();
}
public void doTest() {
Semaphore semp = new Semaphore(5 , false);
// 咱们建立10个线程,并经过0-9的index进行编号
for(int index = 0 ; index < 10 ; index++) {
Thread semaphoreThread = new Thread(new SemaphoreRunnableNonfair(semp , index));
semaphoreThread.start();
}
}
/** * 測试Semaphore的非公平模式 * @author yinwenjie */
private static class SemaphoreRunnableNonfair implements Runnable {
private Semaphore semp;
/** * 编号 */
private Integer index;
public SemaphoreRunnableNonfair(Semaphore semp , Integer index) {
this.semp = semp;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("线程" + this.index + "等待信号。。。。
。。"); this.semp.acquire(); // 中止一段时间,模拟业务处理过程 synchronized(this) { System.out.println("index 为 " + this.index + " 的线程,得到信号。開始处理业务"); this.wait(5000); } } catch (InterruptedException e) { e.printStackTrace(System.out); } finally { // 最后都要释放这个信号/证书 this.semp.release(); } } } }
以上代码咱们建立了10个线程。分别编号为0-9(这里咱们没有使用Thread自带的id,主要仍是为了读者可以看得清楚)。Semaphore信号量对象中。咱们放置了5个“证书”,也就是说最多同一时候可以有5个线程进行业务处理。处理完毕后向线程向Semaphore信号对象归还“证书”。以上代码的处理结果,可能例如如下图所看到的(注意。是“可能”):
线程0等待信号。。。。。。
线程2等待信号。。
。
。。
。 index 为 2 的线程。得到信号,開始处理业务 index 为 0 的线程,得到信号,開始处理业务 线程3等待信号。
。。
。。。 index 为 3 的线程。得到信号,開始处理业务 线程4等待信号。。。。。。 index 为 4 的线程,得到信号,開始处理业务 线程5等待信号。
。。。
。。 index 为 5 的线程,得到信号,開始处理业务 线程7等待信号。。。。。。 线程8等待信号。。
。。
。。 线程6等待信号。
。。。
。。 线程9等待信号。。。。
。。 线程1等待信号。。。。。。 index 为 8 的线程,得到信号。開始处理业务 index 为 7 的线程。得到信号,開始处理业务 index 为 6 的线程,得到信号,開始处理业务 index 为 9 的线程。得到信号。開始处理业务 index 为 1 的线程。得到信号。開始处理业务
为了方便读者查阅,这里咱们列举了Semaphore中常用的操做方式
申请/获取证书:
void acquire():今后信号量获取一个许可,在Semaphore可以提供一个许可前。当前线程将一直堵塞等待。
假设在等待过程当中。当前线程收到了interrupt信号,那么将抛出InterruptedException异常。
void acquire(permits):今后信号量获取permits个许可,在Semaphore可以提供permits个许可前,当前线程将一直堵塞等待。
假设在等待过程当中。当前线程收到了interrupt信号,那么将抛出InterruptedException异常。
void acquireUninterruptibly():今后信号量获取一个许可,在Semaphore可以提供一个许可前,当前线程将一直堵塞等待。使用这种方法获取许可时。不会受到线程interrupt信号的影响。
void acquireUninterruptibly(permits):今后信号量获取permits个许可。在Semaphore可以提供permits个许可前,当前线程将一直堵塞等待。使用这种方法获取许可时,不会受到线程interrupt信号的影响。
boolean tryAcquire():今后信号量获取一个许可,假设没法获取,线程并不会堵塞在这里。
假设获取到了许可,则返回true,其它状况返回false。
boolean tryAcquire(permits):今后信号量获取permits个许可,假设没法获取,线程并不会堵塞在这里。
假设获取到了许可,则返回true,其它状况返回false。
boolean tryAcquire(int permits, long timeout, TimeUnit unit):今后信号量获取permits个许可,假设没法获取,则当前线程等待设定的时间。假设超过等待时间后。仍是没有拿到许可。则解除等待继续运行。
假设获取到了许可,则返回true,其它状况返回false。
证书状态:
int availablePermits():返回此信号量中当前可用的许可数。
int getQueueLength():返回正在等待获取的线程的预计数目。该值仅是预计的数字,因为在此方法遍历内部数据结构的同一时候。线程的数目可能动态地变化。此方法用于监视系统状态,不用于同步控制。
boolean hasQueuedThreads():查询是否有线程正在等待获取。注意,因为同一时候可能发生取消,因此返回 true 并不保证有其它线程等待获取许可。此方法主要用于监视系统状态。
boolean isFair():假设此信号量的公平设置为 true,则返回 true。
释放/返还证书:
void release():释放一个许可,将其返回给信号量。
最好将这种方法的调用,放置在finally程序块中运行。
void release(permits):释放给定数目的许可,将其返回到信号量。
最好将这种方法的调用,放置在finally程序块中运行。
fair:公平与非公平
Semaphore一共同拥有两个构造函数,各自是:Semaphore(int permits)和Semaphore(int permits, boolean fair);permits是指由Semaphore信号量控制的“证书”数量。
fair參数是设置这个信号量对象的工做方式。
当fair參数为true时,信号量将以“公平方式”运行。
即首先申请证书,并进入堵塞状态的线程,将有权利首先获取到证书。当fair參数为false时。信号量对象将不会保证“先来先得”。默认状况下。Semaphore採用“非公平”模式运行。
在介绍了Semaphore的使用方式后,现在咱们就要将Semaphore增长“赛跑比赛”的代码实现中。
很是显然Semaphore在咱们需求中的应用任务是:给选手使用“跑道”的证书/权利,以便让选手“跑步”,并且在选手使用完跑道后,回收跑道的使用证书/权利,给下一位选手。
...... // 这就是跑道。需求上说了仅仅有5条跑道。因此仅仅有5个permits。 Semaphore runway = new Semaphore(5); ......
这个代码片断控制着所有选手的跑步动做:仅仅有在得到跑道的使用权限后,才干运行“跑步”动做。
什么状况下视为“初赛”、“决赛”完毕?
那么最直观的描写叙述就是:所有报名的选手都完毕了跑步过程(中途退赛也算),才干算“初赛”完毕;“初赛”排名最靠前的前5名选手都完毕了跑步过程(中途退赛也算)才算是“决赛”完毕。
假设没有完毕“初赛”,那么比赛进程就必须停在那里。直到“初赛”过程完毕;假设没有完毕“决赛”过程,比赛进程就必须停在那里,知道“决赛”完毕:
......
//! 仅仅有当PLAYERNAMES.length位选手的成绩都产生了,才干进入决赛。这很是重要
synchronized (this.preliminaries) {
while(this.preliminaries.size() < OneTrack.PLAYERNAMES.length) {
try {
this.preliminaries.wait();
} catch(InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
......
//! 仅仅有当5位选手的决赛成绩都产生了,才干到下一步:发布成绩
synchronized (this.finals) {
while(this.finals.size() < 5) {
try {
this.finals.wait();
} catch(InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
......
在咱们定义的Player选手类中,已经实现了Callable接口,并且将会在运行完毕后。返回Result结果信息。因此看选手是否完毕了跑步过程,仅仅需要监控Player的Future就可以了。
但是监控Player的Future可不能在100米比赛的主线程上进行。不然就会出现上一个选手没有跑完就不能启动下一个选手的跑步线程的状况。
因此咱们需要为每一个选手都建立一个“监控线程”FutureThread:
/** * 这是计分线程,是为了保证产生比赛结果后,在计入PriorityBlockingQueue * 这样才有排列成绩的根据 * @author yinwenjie * */
private class FutureThread extends Thread {
/** * 选手跑步任务(Player)的运行状态对象 */
private Future<Result> future;
/** * 跑步成绩出来后,需要操做的队列 * (要将相应的选手增长到队列,以便根据成绩进行排序) */
private PriorityBlockingQueue<Player> achievementQueue;
/** * 当前进行跑步的选手 */
private Player player;
public FutureThread(Future<Result> future , Player player , PriorityBlockingQueue<Player> achievementQueue) {
this.future = future;
this.player = player;
this.achievementQueue = achievementQueue;
}
/* (non-Javadoc) * @see java.lang.Thread#run() */
@Override
public void run() {
// 假设条件成立,最有可能的就是选手在比胜过程中,
// 因为某种缘由退赛了!if(this.future == null) { System.out.println("选手退赛,计分为0"); } else { try { // 假设选手没有跑完。FutureThread将堵塞在这里 // 固然出现跑步过程当中退赛。就会抛出异常 this.future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 运行到这里,就说明这个选手跑完了(或者退赛了) // 无论什么状况,都计入队列,而后通知主线程 this.achievementQueue.put(this.player); synchronized (this.achievementQueue) { this.achievementQueue.notify(); } } }
这样,每一个选手在跑步过程当中,就会有两个线程:一个用来跑步的线程:Player-Callable;另外一个用来监控跑步状况。并操做成绩队列的线程:FutureThread。
实现代码中基本的问题都攻克了,现在咱们可以给出完毕的实现代码了(注意,以前已经给出的代码,就不在赘述了):
package test.thread.track;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
/** * 这是第一个比赛程序。 * @author yinwenjie * */
public class OneTrack {
private static final String[] PLAYERNAMES = new String[]{"白银圣斗士","黄金圣斗士"
,"青铜圣斗士","神斗士","冥斗士","哈迪斯","龟仙人","孙悟空","孙悟饭","贝吉塔","孙悟天"};
/** * 报名队列(非线程安全) */
private List<Player> signupPlayers = new LinkedList<Player>();
/** * 初赛结果队列(有排序功能。且线程安全) */
private PriorityBlockingQueue<Player> preliminaries = new PriorityBlockingQueue<Player>();
/** * 决赛结果队列(有排序功能。且线程安全) */
private PriorityBlockingQueue<Player> finals = new PriorityBlockingQueue<Player>();
public void track() {
/* * 赛跑分为下面几个阶段进行; * * 一、报名 * 二、初赛。10名选手,分红两组,每组5名选手。 * 分两次进行初赛(因为场地仅仅有5条赛道,仅仅有拿到进场许可的才干使用赛道,进行比赛) * * 三、决赛:初赛结果将被写入到一个队列中进行排序。仅仅有成绩最好的前五名选手。可以參加决赛。* * 四、决赛结果的前三名将分别做为冠亚季军被发布出来 * */ //一、================报名 // 这就是跑道。需求上说了仅仅有5条跑道,因此仅仅有5个permits。 Semaphore runway = new Semaphore(5); this.signupPlayers.clear(); for(int index = 0 ; index < OneTrack.PLAYERNAMES.length ; ) { Player player = new Player(OneTrack.PLAYERNAMES[index], ++index , runway); this.signupPlayers.add(player); } //二、================进行初赛 // 这是裁判 ExecutorService refereeService = Executors.newFixedThreadPool(5); for (final Player player : this.signupPlayers) { Future<Result> future = null; future = refereeService.submit(player); new FutureThread(future, player, this.preliminaries).start(); } //! 仅仅有当PLAYERNAMES.length位选手的成绩都产生了,才干进入决赛,这很是重要 synchronized (this.preliminaries) { while(this.preliminaries.size() < OneTrack.PLAYERNAMES.length) { try { this.preliminaries.wait(); } catch(InterruptedException e) { e.printStackTrace(System.out); } } } // 三、============决赛(仅仅有初赛结果的前5名可以參见) for(int index = 0 ; index < 5 ; index++) { Player player = this.preliminaries.poll(); Future<Result> future = null; future = refereeService.submit(player); new FutureThread(future, player, this.finals).start(); } //! 仅仅有当5位选手的决赛成绩都产生了,才干到下一步:发布成绩 synchronized (this.finals) { while(this.finals.size() < 5) { try { this.finals.wait(); } catch(InterruptedException e) { e.printStackTrace(System.out); } } } // 四、============发布决赛成绩(前三名) for(int index = 0 ; index < 3 ; index++) { Player player = this.finals.poll(); switch (index) { case 0: System.out.println("第一名:" + player.getName() + "[" + player.getNumber() + "],成绩:" + player.getResult().getTime() + "秒"); break; case 1: System.out.println("第二名:" + player.getName() + "[" + player.getNumber() + "]。成绩:" + player.getResult().getTime() + "秒"); break; case 2: System.out.println("第三名:" + player.getName() + "[" + player.getNumber() + "]。成绩:" + player.getResult().getTime() + "秒"); break; default: break; } } } public static void main(String[] args) throws RuntimeException { new OneTrack().track(); } //......这里是FutureThread的代码。上面已给出了 }
下面是可能的运行结果。
“可能的运行结果”那是因为结果全然是随机的,您的运行结果可能和我给出的不同:
第一名:龟仙人[7],成绩:7.143秒
第二名:白银圣斗士[1],成绩:7.477秒
第三名:哈迪斯[6],成绩:7.531秒
(接下文:CountDownLatch同步器、java.util.concurrent.atomic子包)