我在《JDK1.5引入的concurrent包》中,曾经介绍过CountDownLatch、CyclicBarrier两个类,还给出了CountDownLatch的演示案例。这里再系统总结下Java并发编程中的4个类CountDownLatch、CyclicBarrier、Semaphore、Phaser。html
1.CountDownLatchjava
CountDownLatch能够理解为一个计数器在初始化时设置初始值,当一个线程须要等待某些操做先完成时,须要调用await()方法。这个方法让线程进入休眠状态直到等待的全部线程都执行完成。每调用一次countDown()方法,内部计数器减1,直到计数器为0时唤醒。这个能够理解为特殊的CyclicBarrier。react
核心方法两个:countDown()和await() countDown():使CountDownLatch维护的内部计数器减1,每一个被等待的线程完成的时候调用 await():线程在执行到CountDownLatch的时候会将此线程置于休眠
案例场景:视频会议室里等与会人员到齐了会议才能开始。编程
package com.itszt.test3; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 视频会议室里等与会人员到齐了会议才能开始 */ public class CountDownLatchTest { private static int num=10;//与会人员数量 public static void main(String[] args) { VideoConference conference = new VideoConference(num); Thread threadConference = new Thread(conference); threadConference.start();//开启await()方法,在内部计数器为0以前线程处于等待状态 for (int i = 0; i < num; i++) { Participant p = new Participant(conference, "Participant " + i); Thread t = new Thread(p); t.start(); } } } //视频会议类 class VideoConference implements Runnable { private final CountDownLatch controller; public VideoConference(int number) { //计数器内等待其余线程的初始化数目 controller = new CountDownLatch(number); } public void arrive(String name) { System.out.printf("%s has arrived.\n", name); controller.countDown();//调用countDown()方法,使内部计数器减1 System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount()); } @Override public void run() { synchronized (VideoConference.class){ if(controller.getCount()!=0){ System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount()); } } try { controller.await();//等待,直到CoutDownLatch计数器为0 System.out.printf("VideoConference: All the participants have come\n"); System.out.printf("VideoConference: Let's start...\n"); } catch (InterruptedException e) { e.printStackTrace(); } } } //参加会议的人员类 class Participant implements Runnable { private VideoConference conference; private String name; public Participant(VideoConference conference, String name) { this.conference = conference; this.name = name; } @Override public void run() { Long duration = (long) (Math.random() * 10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } conference.arrive(name);//每到一我的员,CountDownLatch计数器就减小1 } }
代码执行结果以下:多线程
VideoConference: Initialization: 10 participants. Participant 3 has arrived. Participant 7 has arrived. VideoConference: Waiting for 9 participants. VideoConference: Waiting for 8 participants. Participant 4 has arrived. VideoConference: Waiting for 7 participants. Participant 9 has arrived. VideoConference: Waiting for 6 participants. Participant 2 has arrived. Participant 1 has arrived. VideoConference: Waiting for 5 participants. VideoConference: Waiting for 4 participants. Participant 5 has arrived. Participant 8 has arrived. VideoConference: Waiting for 3 participants. VideoConference: Waiting for 2 participants. Participant 0 has arrived. VideoConference: Waiting for 1 participants. Participant 6 has arrived. VideoConference: Waiting for 0 participants. VideoConference: All the participants have come VideoConference: Let's start...
须要注意的是,CountDownLatch是一个线程计数器。等计数器为0时,那些先前因调用await()方法休眠的线程被唤醒。CountDownLatch可以控制的线程是哪些呢?是那些调用了CountDownLatch的await()方法的线程。案例中,先运行await()方法的线程是视频会议的线程,而后执行与会者 线程,这里的处理是每到一位(每建立一个线程并运行run()方法时就使计数器减1)就让计数器减1,等计数器减为0时唤醒因调用await()方法进入休眠的线程。这里的与会者线程就是视频会议线程要等待的线程。并发
2.CyclicBarrierapp
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。
当一个线程到达集合点时,它将调用await()方法等待其它的线程。线程调用await()方法后,CyclicBarrier将阻塞这个线程,并将它置入休眠状态等待其它线程的到来。等最后一个线程调用await()方法时,CyclicBarrier将唤醒全部等待的线程,而后这些线程将继续执行。CyclicBarrier能够传入另外一个Runnable对象做为初始化参数。当全部的线程都到达集合点后,CyclicBarrier类将Runnable对象做为线程执行。框架
方法 await():使线程置入休眠直到最后一个线程的到来以后唤醒全部休眠的线程 CyclicBarrier类有两个经常使用的构造方法: (1)CyclicBarrier(int parties) 这里的parties也是一个计数器,例如,初始化时parties里的计数是3,因而拥有该CyclicBarrier对象的线程当parties的计数为3时就唤醒,注意:这里parties里的计数在运行时当调用CyclicBarrier:await()时,计数就加1,一直加到初始的值。 (2)CyclicBarrier(int parties, Runnable barrierAction) 这里的parties与上一个构造方法的解释是同样的,这里须要解释的是第二个入参(Runnable barrierAction),这个参数是一个实现Runnable接口的类的对象,也就是说当parties加到初始值时就触发barrierAction的内容。
案例场景:有4个游戏玩家玩游戏,游戏有三个关卡,每一个关卡必需要全部玩家都到达后才能容许经过。其实这个场景里的玩家中若是有玩家A先到了关卡1,他必须等到其余全部玩家都到达关卡1时才能经过,也就是说线程之间须要相互等待。这和CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干本身的其余事情,而这里的线程须要等待其余线程后才能继续完成后面的工做。 dom
package com.itszt.test3; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 测试CyclicBarrier */ public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { System.out.println("全部玩家进入第 2 关!"); } }); for (int i = 1; i <= 4; i++) { new Thread(new Player(i, cyclicBarrier)).start(); } } } /** * 玩家类 * * @author itmyhome */ class Player implements Runnable { private CyclicBarrier cyclicBarrier; private int id; public Player(int id, CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; this.id = id; } @Override public void run() { try { System.out.println("玩家" + id + "正在玩第 1 关..."); cyclicBarrier.await(); System.out.println("玩家" + id + "进入第 2 关..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
代码执行结果以下:jsp
玩家1正在玩第 1 关... 玩家3正在玩第 1 关... 玩家2正在玩第 1 关... 玩家4正在玩第 1 关... 全部玩家进入第 2 关! 玩家4进入第 2 关... 玩家1进入第 2 关... 玩家3进入第 2 关... 玩家2进入第 2 关...
3.Semaphore
信号量就是能够声明多把锁(包括一把锁,此时为互斥信号量)。
举个例子:一个房间若是只能容纳5我的,多出来的人必须在门外面等着。如何去作呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一我的就取走一把钥匙,没有钥匙的不能进入该房间,而是在外面等待。每出来一我的就把钥匙放回原处以方便别人再次进入。
经常使用方法 acquire():获取信号量,信号量内部计数器减1 release():释放信号量,信号量内部计数器加1 tryAcquire():这个方法试图获取信号量,若是可以获取返回true,不然返回false 信号量控制的线程数量在声明时肯定。例如: Semaphore s = new Semaphore(2);
能够说,Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们可以正确、合理的使用公共资源的设施,也是操做系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它至关于给线程规定一个量从而控制容许活动的线程数。
AQS(AbstractQueuedSynchronizer,抽象的队列式同步器)是 java.util.concurrent的基础。 Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等虽然各自都有不一样特征,
可是简单看一下源码,每一个类内部都包含一个以下的内部类定义: abstract static class Sync extends AbstractQueuedSynchronizer;
全部java.util.concurrent包中的同步器类都声明了一个私有的继承了AbstractQueuedSynchronizer
的内部类,而且把全部同步方法都委托给这个内部类。这样各个同步器类的公开方法就可使用适合本身的名称。子类只需定义状态的检查与更新相关的方法,这些方法控制着acquire和 release操做。
AQS维护了一个volatile int state(表明共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词。state的访问方式有三种:
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不一样的自定义同步器争用共享资源的方式也不一样。自定义同步器在实现时只须要实现共享资源state的获取与释放方式便可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现如下几种方法:
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其余线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。固然,释放锁以前,A线程本身是能够重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每一个子线程执行完后countDown()一次,state会CAS减1。等到全部子线程都执行完后(即state=0),会unpark()主调用线程,而后主调用线程就会从await()函数返回,继续后余动做。
通常来讲,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种便可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
同步器背后的基本思想很是简单。acquire操做以下:
while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued;
release操做以下:
update synchronization state; if (state may permit a blocked thread to acquire) unblock one or more queued threads;
为了实现上述操做,须要下面三个基本组件的相互协做:
同步器框架的核心决策是为这三个组件选择一个具体实现,同时在使用方式上又有大量选项可用。这里有意地限制了其适用范围,可是提供了足够的效率,使得实际上没有理由在合适的状况下不用这个框架而去从新建造一个。
到此,咱们再继续看Semaphore同步器。为了简单起见,咱们以一个停车场的运做为例。假设停车场只有三个车位,一开始三个车位都是空的。这时,若是同时来了五辆车,看门人容许其中三辆不受阻碍地进入,而后放下车拦,剩下的车则必须在停车场外的入口处等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,升起车拦,放入一辆,若是又离开两辆,则又能够放入两辆,如此往复。在这个场景中,每辆车就比如一个线程,看门人就比如一个信号量,看门人限制了能够活动的线程。假如里面依然是三个车位,可是看门人改变了规则,要求每次只能停两辆车,那么停车场在进入两辆车后,其后的车辆就要等到有车离开才能获准许进入。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。
Semaphore的主要方法有:
Semaphore(int permits):构造方法,建立具备给定许可数的计数信号量并设置为非公平信号量。 Semaphore(int permits,boolean fair):构造方法,当fair等于true时,建立具备给定许可数的计数信号量并设置为公平信号量。 void acquire():从该信号量获取一个许可前,线程将一直阻塞。至关于一辆车占了一个车位。 void acquire(int n):从该信号量获取给定数目许可,在提供这些许可前,一直将线程阻塞。好比n=2,就至关于一辆车占了两个车位。 void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。 void release(int n):释放n个许可。 int availablePermits():当前可用的许可数。
接下来写一个案例,有7我的,各自获取信号量的许可后,再释放许可。
package com.itszt.test3; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 测试Semaphore */ public class SemaphoreTest { private static final Semaphore semaphore = new Semaphore(3);//默认为非公平信号量 private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); //信号量控制的线程 private static class InformationThread extends Thread { private final String name; private final int age; public InformationThread(String name, int age) { this.name = name; this.age = age; } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":你们好,我是" + name + "我今年" + age + "岁当前时间为:" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(name + "要准备释放许可证了,当前时间为:" + System.currentTimeMillis()); System.out.println("当前可以使用的许可数为:" + semaphore.availablePermits()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { String[] name = {"李明", "王五", "张杰", "王强", "赵二", "李四", "张三"}; int[] age = {26, 27, 33, 45, 19, 23, 41}; for (int i = 0; i < 7; i++) { Thread t1 = new InformationThread(name[i], age[i]); threadPool.execute(t1); } } }
上述代码执行结果以下:
pool-1-thread-1:你们好,我是李明我今年26岁当前时间为:1524367640560 pool-1-thread-3:你们好,我是张杰我今年33岁当前时间为:1524367640560 pool-1-thread-2:你们好,我是王五我今年27岁当前时间为:1524367640560 李明要准备释放许可证了,当前时间为:1524367641560 王五要准备释放许可证了,当前时间为:1524367641560 张杰要准备释放许可证了,当前时间为:1524367641560 当前可以使用的许可数为:0 当前可以使用的许可数为:0 当前可以使用的许可数为:0 pool-1-thread-4:你们好,我是王强我今年45岁当前时间为:1524367641560 pool-1-thread-5:你们好,我是赵二我今年19岁当前时间为:1524367641560 pool-1-thread-2:你们好,我是李四我今年23岁当前时间为:1524367641560 李四要准备释放许可证了,当前时间为:1524367642563 赵二要准备释放许可证了,当前时间为:1524367642563 王强要准备释放许可证了,当前时间为:1524367642563 当前可以使用的许可数为:0 当前可以使用的许可数为:0 pool-1-thread-3:你们好,我是张三我今年41岁当前时间为:1524367642563 当前可以使用的许可数为:0 张三要准备释放许可证了,当前时间为:1524367643563 当前可以使用的许可数为:2
咱们上面用的是非公平信号量,改成公平信号量:
private static final Semaphore semaphore = new Semaphore(3,true);
这时运行结果以下:
pool-1-thread-2:你们好,我是王五我今年27岁当前时间为:1524367824968 pool-1-thread-3:你们好,我是张杰我今年33岁当前时间为:1524367824968 pool-1-thread-1:你们好,我是李明我今年26岁当前时间为:1524367824968 李明要准备释放许可证了,当前时间为:1524367825968 王五要准备释放许可证了,当前时间为:1524367825968 张杰要准备释放许可证了,当前时间为:1524367825968 当前可以使用的许可数为:0 当前可以使用的许可数为:0 当前可以使用的许可数为:0 pool-1-thread-5:你们好,我是赵二我今年19岁当前时间为:1524367825968 pool-1-thread-4:你们好,我是王强我今年45岁当前时间为:1524367825968 pool-1-thread-3:你们好,我是李四我今年23岁当前时间为:1524367825968 王强要准备释放许可证了,当前时间为:1524367826968 李四要准备释放许可证了,当前时间为:1524367826968 赵二要准备释放许可证了,当前时间为:1524367826968 当前可以使用的许可数为:0 当前可以使用的许可数为:0 pool-1-thread-1:你们好,我是张三我今年41岁当前时间为:1524367826968 当前可以使用的许可数为:0 张三要准备释放许可证了,当前时间为:1524367827968 当前可以使用的许可数为:2
Semaphore信号量的实现和ReetrantLock相似,都是经过内部类Sync,Sync是一个继承于AQS的抽象类; Semaphore信号量和ReentrantLock互斥锁的实现区别在于,ReentrantLock互斥锁的state若是为0则表示锁未被占用,若是为0以外的数值表示锁被重入的次数;Semaphore信号量的state表示许可的数目; Sync包括两个子类:公平信号量FairSync和非公平信号量NonfailrSync,默认是非公平信号量NonfairSync。其中,公平信号量是指若是线程不在同步队列头部则排队等候;非公平信号量是指不管当前线程是否在同步队列头部,都会尝试获取信号量。
信号量若是要实现单例模式,能够这样修改:
private static final Semaphore semaphore=new Semaphore(1);
再执行代码,结果则以下:
pool-1-thread-1:你们好,我是李明我今年26岁当前时间为:1524368235314 李明要准备释放许可证了,当前时间为:1524368236317 当前可以使用的许可数为:0 pool-1-thread-3:你们好,我是张杰我今年33岁当前时间为:1524368236317 张杰要准备释放许可证了,当前时间为:1524368237317 当前可以使用的许可数为:0 pool-1-thread-3:你们好,我是张三我今年41岁当前时间为:1524368237317 张三要准备释放许可证了,当前时间为:1524368238317 当前可以使用的许可数为:0 pool-1-thread-5:你们好,我是赵二我今年19岁当前时间为:1524368238317 赵二要准备释放许可证了,当前时间为:1524368239317 当前可以使用的许可数为:0 pool-1-thread-2:你们好,我是王五我今年27岁当前时间为:1524368239317 王五要准备释放许可证了,当前时间为:1524368240317 当前可以使用的许可数为:0 pool-1-thread-4:你们好,我是王强我今年45岁当前时间为:1524368240317 王强要准备释放许可证了,当前时间为:1524368241317 当前可以使用的许可数为:0 pool-1-thread-1:你们好,我是李四我今年23岁当前时间为:1524368241317 李四要准备释放许可证了,当前时间为:1524368242317 当前可以使用的许可数为:0
可见,Semaphore将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,而后这辆车出来后,下一辆车才能进。
另外,咱们在上面的案例中用到了线程池:
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
其中,ThreadPoolExecutor的构造方法体系有:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
对于构造方法的参数说明以下:
corePoolSize 核心线程数,默认状况下核心线程会一直存活,即便处于闲置状态也不会受keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。 maximumPoolSize 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。 keepAliveTime 非核心线程的闲置超时时间,超过这个时间就会被回收。 unit 指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。 workQueue 线程池中的任务队列。 经常使用的队列有:LinkedBlockingQueue,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。 threadFactory 线程工厂,提供建立新线程的功能。ThreadFactory是一个接口,只有一个方法: public interface ThreadFactory { Thread newThread(Runnable r); } RejectedExecutionHandler RejectedExecutionHandler也是一个接口,只有一个方法 public interface RejectedExecutionHandler { void rejectedExecution(Runnable var1, ThreadPoolExecutor var2); } 当线程池中的资源已经所有使用,添加新线程又被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。
线程池的线程执行规则跟任务队列有很大的关系。其中:
(1)在任务队列没有大小限制时: ①若是线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中。 ② 若是线程数量>核心线程数,但<=最大线程数,而且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。 ③若是线程数量>核心线程数,但<=最大线程数,而且任务队列是SynchronousQueue的时候,线程池会建立新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。 ④若是线程数量>核心线程数,而且>最大线程数,当任务队列是LinkedBlockingDeque时,会将超过核心线程的任务放在任务队列中排队。也就是说,当任务队列是LinkedBlockingDeque而且没有大小限制时,线程池的最大线程数设置是无效的,它的线程数最多不会超过核心线程数。 ⑤若是线程数量>核心线程数,而且>最大线程数,当任务队列是SynchronousQueue的时候,会由于线程池拒绝添加任务而抛出异常。 (2)在任务队列大小有限时: ①当LinkedBlockingDeque塞满时,新增的任务会直接建立新线程来执行,当建立的线程数量超过最大线程数量时会抛异常。 ②SynchronousQueue没有数量限制。由于它根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
在ThreadPoolExecutor中用到了BlockingQueue阻塞队列的接口。请参考个人另外一篇博文《Java中的BlockingQueue》。
4.Phaser
Phaser是一个更加复杂和强大的同步辅助类,它容许并发执行多阶段任务。当咱们有并发任务而且须要分解成几步执行时,(CyclicBarrier是分红两步),就能够选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当全部的线程都完成了这一步,才容许执行下一步。
能够说,Phaser容许并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当全部的线程都完成了这一步,才容许执行下一步。
跟其余同步工具同样,必须对Phaser类中参与同步操做的任务数进行初始化,不一样的是,能够动态的增长或者减小任务数。
一个Phaser对象有两种状态:
arriveAndAwaitAdvance():相似于CyclicBarrier的await()方法,等待其它线程都到来以后同步继续执行 arriveAndDeregister():把执行到此的线程从Phaser中注销掉 isTerminated():判断Phaser是否终止 register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程 forceTermination():强制Phaser进入终止态
案例场景:Phaser将同步三个并发任务。这三个任务将在三个不一样的文件夹及其子文件夹中查找过去24小时内改过扩展名为.txt的文件。这个任务分解为三个步骤:①在指定文件夹及其子文件夹中得到扩展名为.txt的文件;②对第一步的结果过滤,删除修改时间超过24小时的文件;③将结果打印数据到控制台。
package com.itszt.test3; import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; /** * 测试Phaser */ public class PhaserTest { public static void main(String[] args) { Phaser phaser=new Phaser(3); FileSearch system=new FileSearch("E:\\a", ".txt", phaser); FileSearch apps=new FileSearch("E:\\b", ".txt", phaser); FileSearch documents=new FileSearch("E:\\c", ".txt", phaser); Thread systemThread=new Thread(system, "system-a"); systemThread.start(); Thread appsThread=new Thread(apps, "apps-b"); appsThread.start(); Thread documentsThread=new Thread(documents, "documents-c"); documentsThread.start(); try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Terminated:"+ phaser.isTerminated()); } } class FileSearch implements Runnable { private String initPath;// 查找路径 private String end;// 文件后缀 private List<String> results;// 结果集 private Phaser phaser; public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser = phaser; this.results = new ArrayList<String>(); } private void direactoryProcess(File file) { File list[] = file.listFiles(); if (list != null) { for (File f : list) { if (f.isDirectory()) { direactoryProcess(f); } else { fileProcess(f); } } } } private void fileProcess(File file) { if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } } private void filterResult() { List<String> newResult = new ArrayList<String>(); long actualDate = new Date().getTime(); for (int i = 0; i < results.size(); i++) { File file = new File(results.get(i)); long lastModifyTime = file.lastModified(); if (actualDate - lastModifyTime < TimeUnit.MICROSECONDS. convert(1, TimeUnit.DAYS)) { newResult.add(results.get(i)); } } results = newResult; } private boolean checkResults() { if (results.isEmpty()) { System.out.println(Thread.currentThread(). getName() + ": Phase " + phaser.getPhase() + " 0 result"); System.out.println(Thread.currentThread(). getName() + ": Phase " + phaser.getPhase() + " end"); phaser.arriveAndDeregister(); return false; } else { System.out.println(Thread.currentThread(). getName() + ": Phase " + phaser.getPhase() + " " + results.size() + " result"); phaser.arriveAndAwaitAdvance(); return true; } } private void showInfo() { for (int i = 0; i < results.size(); i++) { System.out.println(Thread.currentThread(). getName() + ":" + results.get(i)); } phaser.arriveAndAwaitAdvance(); } @Override public void run() { phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread(). getName()+": Starting"); File file=new File(initPath); if(file.isDirectory()){ direactoryProcess(file); } if(!checkResults()){ return; } filterResult(); if(!checkResults()){ return; } showInfo(); phaser.arriveAndDeregister(); System.out.println(Thread.currentThread(). getName()+": Work completed"); } }
控制台打印以下:
system-a: Starting system-a: Phase 1 1 result apps-b: Starting documents-c: Starting documents-c: Phase 1 1 result apps-b: Phase 1 1 result apps-b: Phase 2 1 result system-a: Phase 2 1 result documents-c: Phase 2 1 result documents-c:E:\c\jsp技术.txt apps-b:E:\b\jsp技术.txt system-a:E:\a\jsp技术.txt system-a: Work completed documents-c: Work completed apps-b: Work completed Terminated:true