线程池和Exector框架javascript
什么是线程池? java
线程池要作些什么? 程序员
public class MyThreadPool { //默认的线程个数 private int work_num = 5; //线程的容器 private WorkThread[] workThreads; //任务队列 private List<Runnable> taskQueue = new LinkedList<>(); public MyThreadPool(int work_num) { this.work_num = work_num; workThreads = new WorkThread[work_num]; for(int i=0;i<work_num;i++){ workThreads[i] = new WorkThread(); workThreads[i].start(); } } //提交任务的接口 public void execute(Runnable task){ synchronized (taskQueue){ taskQueue.add(task); taskQueue.notify(); } } //销毁线程池 public void destroy(){ System.out.println("ready stop pool...."); for(int i=0;i<work_num;i++){ workThreads[i].stopWorker(); workThreads[i] = null;//加速垃圾回收 } taskQueue.clear(); } //工做线程 private class WorkThread extends Thread{ private volatile boolean on = true; public void run(){ Runnable r = null; try{ while(on&&!isInterrupted()){ synchronized (taskQueue){ //任务队列中无任务,工做线程等待 while(on&&!isInterrupted()&&taskQueue.isEmpty()){ taskQueue.wait(1000); } //任务队列中有任务,拿任务作事 if(on&&!isInterrupted()&&!taskQueue.isEmpty()){ r = taskQueue.remove(0); } } if (r!=null){ System.out.println(getId()+" ready execute...."); r.run(); } //加速垃圾回收 r = null; } }catch(InterruptedException e){ System.out.println(Thread.currentThread().getId()+" is Interrupted"); } } public void stopWorker(){ on = false; interrupt(); } } }
public class TestMyThreadPool { public static void main(String[] args) throws InterruptedException { // 建立3个线程的线程池 MyThreadPool t = new MyThreadPool(3); t.execute(new MyTask("testA")); t.execute(new MyTask("testB")); t.execute(new MyTask("testC")); t.execute(new MyTask("testD")); t.execute(new MyTask("testE")); System.out.println(t); Thread.sleep(3000); // t.destroy();// 全部线程都执行完成才destory System.out.println(t); } // 任务类 static class MyTask implements Runnable { private String name; private Random r = new Random(); public MyTask(String name) { this.name = name; } public String getName() { return name; } @Override public void run() {// 执行任务 try { Thread.sleep(r.nextInt(1000) + 2000); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getId() + " sleep InterruptedException:" + Thread.currentThread().isInterrupted()); } System.out.println("任务 " + name + " 完成"); } } }
运行结果 13 ready execute.... com.dongnaoedu.mypool.MyThreadPool@7852e922 12 ready execute.... 11 ready execute.... 任务 testC 完成 13 ready execute.... 任务 testA 完成 11 ready execute.... 任务 testB 完成 com.dongnaoedu.mypool.MyThreadPool@7852e922 任务 testD 完成 任务 testE 完成
线程池的主要处理流程 小程序
1)线程池判断核心线程池里的线程是否都在执行任务。若是不是,则建立一个新的工做服务器
线程来执行任务。若是核心线程池里的线程都在执行任务,则进入下个流程。多线程
2)线程池判断工做队列是否已经满。若是工做队列没有满,则将新提交的任务存储在这并发
个工做队列里。若是工做队列满了,则进入下个流程。框架
3)线程池判断线程池的线程是否都处于工做状态。若是没有,则建立一个新的工做线程dom
来执行任务。若是已经满了,则交给饱和策略来处理这个任务。异步
ThreadPoolExecutor执行execute()方法的示意
1)若是当前运行的线程少于corePoolSize,则建立新线程来执行任务(注意,执行这一步骤
须要获取全局锁)。
2)若是运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)若是没法将任务加入BlockingQueue(队列已满),则建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。
4)若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于corePoolSize;
若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。
线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认状况下,该参数只在线程数大于corePoolSize时才有用
keepAliveTime的时间单位
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。经过workQueue,线程池实现了阻塞功能
建立线程的工厂,经过自定义的线程工厂能够给每一个新建的线程设置一个具备识别度的线程名
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”
线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
线程池使用示例
public class UseThreadPool { static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } public String getName() { return name; } @Override public void run() {// 执行任务 try { Random r = new Random(); Thread.sleep(r.nextInt(1000) + 2000); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getId() + " sleep InterruptedException:" + Thread.currentThread().isInterrupted()); } System.out.println("任务 " + name + " 完成"); } } public static void main(String[] args) { // 建立线程池 2 核心线程数 4最大线程数 60存活时间 TimeUnit时间单位 任务队列大小为10 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); for (int i = 0; i <= 5; i++) { MyTask task = new MyTask("Task_" + i); System.out.println("A new task will add:" + task.getName()); // 提交到线程池 threadPoolExecutor.execute(task); } threadPoolExecutor.shutdown(); } }
Execute提交不须要用返回值的任务
Submit 提交须要返回值的任务,返回值是个Future类型的对象,调用futrure的get方法(阻塞方法)来获取返回值
ShutDown():interrupt方法来终止线程
shutDownNow() 尝试中止全部正在执行的线程
线程数配置:
任务:计算密集型,IO密集型,混合型
计算密集型=计算机的cpu数或计算机的cpu数+1(应付页缺失)
IO密集型=计算机的cpu数*2
混合型,拆分红计算密集型,IO密集型
Runtime.getRuntime().availableProcessors();当前机器中的cpu核心个数
尽可能用有界队列,不要使用无界队列
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操做系统线程。Java线程启动时会建立一个本地操做系统线程;当该Java线程终止时,这个操做系统线程也会被回收。操做系统会调度全部线程并将它们分配给可用的CPU。
在上层,Java多线程程序一般把应用分解为若干个任务,而后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操做系统内核将这些线程映射到硬件处理器上。
从图中能够看出,应用程序经过Executor框架控制上层的调度;而下层的调度由操做系统内核控制,下层的调度不受应用程序的控制。
包括被执行任务须要实现的接口:Runnable接口或Callable接口。
包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
包括接口Future和实现Future接口的FutureTask类。
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService接口继承了Executor,在其上作了一些shutdown()、submit()的扩展,能够说是真正的线程池接口;
AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,能够在给定的延迟后运行命令,或者按期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
Future接口和实现Future接口的FutureTask类,表明异步计算的结果。
Runnable接口和Callable接口的实现类,均可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。
主线程首先要建立实现Runnable或者Callable接口的任务对象。
工具类Executors能够把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。而后能够把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnablecommand));或者也能够把Runnable对象或Callable对象提交给ExecutorService执行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task))。
若是执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。因为FutureTask实现了Runnable,程序员也能够建立FutureTask,而后直接交给ExecutorService执行。
最后,主线程能够执行FutureTask.get()方法来等待任务执行完成。主线程也能够执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
一般使用工厂类Executors来建立。Executors能够建立3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
ExecutorService threadPool2 = Executors.newFixedThreadPool(2); ExecutorService threadPool3 = Executors.newSingleThreadExecutor(); ExecutorService threadPool4 = Executors.newCachedThreadPool(); ExecutorService threadPool6 = Executors.newWorkStealingPool();
建立使用固定线程数的FixedThreadPool的API。适用于为了知足资源管理的需求,而须要限制当前线程数量的应用场景,适用于负载比较重的服务器。FixedThreadPool的corePoolSize和maximumPoolSize都被设置为建立FixedThreadPool时指定的参数nThreads。
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被当即终止。
FixedThreadPool使用无界队列LinkedBlockingQueue做为线程池的工做队列(队列的容量为Integer.MAX_VALUE)。使用无界队列做为工做队列会对线程池带来以下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,所以线程池中的线程数不会超过corePoolSize。
2)因为1,使用无界队列时maximumPoolSize将是一个无效参数。
3)因为1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)因为使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或
shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。
建立使用单个线程的SingleThread-Executor的API,适用于须要保证顺序地执行各个任务;而且在任意时间点,不会有多个线程是活动的应用场景。
corePoolSize和maximumPoolSize被设置为1。其余参数与FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue做为线程池的工做队列(队列的容量为Integer.MAX_VALUE)。
建立一个会根据须要建立新线程的CachedThreadPool的API。大小无界的线程池,适用于执行不少的短时间异步任务的小程序,或者是负载较轻的服务器。
corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue做为线程池的工做队列。CachedThreadPool使用没有容量的SynchronousQueue做为线程池的工做队列,但CachedThreadPool的maximumPool是无界的。这意味着,若是主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断建立新线程。极端状况下,CachedThreadPool会由于建立过多线程而耗尽CPU和内存资源。
利用全部运行的处理器数目来建立一个工做窃取的线程池,使用forkjoin实现。
使用工厂类Executors来建立。Executors能够建立2种类
型的ScheduledThreadPoolExecutor,以下。
·ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor。
·SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor适用于须要多个后台线程执行周期任务,同时为了知足资源管理的需求而须要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor适用于须要单个后台线程执行周期任务,同时须要保证顺序地执行各个任务的应用场景。
对这4个步骤的说明。
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time以后的ScheduledFutureTask放回DelayQueue中(Delay-Queue.add())。
有关提交定时任务的四个方法:
//向定时任务线程池提交一个延时Runnable任务(仅执行一次) public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
//向定时任务线程池提交一个延时的Callable任务(仅执行一次) public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//向定时任务线程池提交一个固定时间间隔执行的任务 public ScheduledFuture<?> scheduleAtFixedRate( Runnablecommand,long initialDelay,long period,TimeUnit unit)
//向定时任务线程池提交一个固定延时间隔执行的任务 public ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay,long delay, TimeUnit unit);
固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间是肯定的,固然执行任务的时间不能超过执行周期。
固定延时间隔的任务是指每次执行完任务之后都延时一个固定的时间。因为操做系统调度以及每次任务执行的语句可能不一样,因此每次任务执行所花费的时间是不肯定的,也就致使了每次任务的执行周期存在必定的波动。
注意:定时或延时任务中所涉及到时间、周期不能保证明时性及准确性,实际运行中会有必定的偏差。
ScheduleThreadPoolExecutor与Timer相比的优点。
(1)Timer是基于绝对时间的延时执行或周期执行,当系统时间改变,则任务的执行会受到的影响。而ScheduleThreadPoolExecutore中,任务时基于相对时间进行周期或延时操做。
(2)Timer也能够提交多个TimeTask任务,但只有一个线程来执行全部的TimeTask,这样并发性受到影响。而ScheduleThreadPoolExecutore能够设定池中线程的数量。
(3)Timer不会捕获TimerTask的异常,只是简单地中止,这样势必会影响其余TimeTask的执行。而ScheduleThreadPoolExecutore中,若是一个线程因某些缘由中止,线程池能够自动建立新的线程来维护池中线程的数量。
public class ScheduleTask implements Runnable { public static enum OperType{ None,OnlyThrowException,CatheException } public static SimpleDateFormat formater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private OperType operType; public ScheduleTask(OperType operType) { this.operType = operType; } @Override public void run() { switch (operType){ case OnlyThrowException: System.out.println("Exception not catch:"+formater.format(new Date())); throw new RuntimeException("OnlyThrowException"); case CatheException: try { throw new RuntimeException("CatheException"); } catch (RuntimeException e) { System.out.println("Exception be catched:"+formater.format(new Date())); } break; case None: System.out.println("None :"+formater.format(new Date())); } } }
public class TestSchedule { public static void main(String[] args) { ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1); /** * 每隔一段时间打印系统时间,互不影响的建立并执行一个在给定初始延迟后首次启用的按期操做, * 后续操做具备给定的周期; * 也就是将在 initialDelay 后开始执行,周期为period。 */ exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.None), 1000,5000, TimeUnit.MILLISECONDS); // 开始执行后就触发异常,next周期将不会运行 exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.OnlyThrowException), 1000,5000, TimeUnit.MILLISECONDS); // 虽然抛出了运行异常,当被拦截了,next周期继续运行 exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.CatheException), 1000,5000, TimeUnit.MILLISECONDS); /** * 建立并执行一个在给定初始延迟后首次启用的按期操做, * 随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。 */ exec.scheduleWithFixedDelay(new Runnable() { @Override public void run() { System.out.println("scheduleWithFixedDelay:begin" +ScheduleTask.formater.format(new Date())); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("scheduleWithFixedDelay:end" +ScheduleTask.formater.format(new Date())); } },1000,5000,TimeUnit.MILLISECONDS); /** * 建立并执行在给定延迟后启用的一次性操做。 */ exec.schedule(new Runnable() { @Override public void run() { System.out.println("schedule running....."); } },5000,TimeUnit.MILLISECONDS); } }
运行结果 None :2018-06-28 20:01:12 (5秒一次) Exception not catch:2018-06-28 20:01:12 (执行后抛异常没抓取 只执行了一次) Exception be catched:2018-06-28 20:01:12 (5秒一次) scheduleWithFixedDelay:begin2018-06-28 20:01:12 (5秒一次 停2秒) scheduleWithFixedDelay:end2018-06-28 20:01:14 schedule running..... (只执行了一次) None :2018-06-28 20:01:17 Exception be catched:2018-06-28 20:01:17 scheduleWithFixedDelay:begin2018-06-28 20:01:19 scheduleWithFixedDelay:end2018-06-28 20:01:21 None :2018-06-28 20:01:22 Exception be catched:2018-06-28 20:01:22 scheduleWithFixedDelay:begin2018-06-28 20:01:26 scheduleWithFixedDelay:end2018-06-28 20:01:28 ......... .........
若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,会立刻开始执行.
若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行
以下例子:
设置定时任务每60s执行一次
若第一次任务时长80s,第二次任务时长20ms,第三次任务时长50ms
第一次任务第0s开始,第80s结束;
第二次任务第80s开始,第110s结束;(上次任务已超时,本次不会再等待60s,会立刻开始),
第三次任务第150s开始,第200s结束.
第四次任务第210s开始.....
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。
当咱们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向咱们返回一个FutureTask对象。
Runnable接口和Callable接口的实现类,均可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable能够返回结果。
除了能够本身建立实现Callable接口的对象外,还可使用工厂类Executors来把一个Runnable包装成一个Callable。
Executors提供的,把一个Runnable包装成一个Callable的API。
public static Callable<Object> callable(Runnable task) // 假设返回对象Callable1
Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。
public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Callable2
当任务成功完成后FutureTask.get()将返回该任务的结果。例如,若是提交的是对象Callable1,FutureTask.get()方法将返回null;若是提交的是对象Callable2,FutureTask.get()方法将返回result对象。
FutureTask除了实现Future接口外,还实现了Runnable接口。所以,FutureTask能够交给Executor执行,也能够由调用线程直接执行(FutureTask.run())。
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将致使调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将致使调用线程当即返回结果或抛出异常。
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将致使此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图中止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。
public class ComputeTask implements Callable<Integer> { private Integer result =0; private String taskName =""; public ComputeTask(Integer result, String taskName) { this.result = result; this.taskName = taskName; System.out.println(taskName+"子任务已经建立"); } @Override public Integer call() throws Exception { for(int i=0;i<100;i++){ result = result+i; } Thread.sleep(2000); System.out.println(taskName+"子任务已经完成"); return result; } }
public class FutureSample { public static void main(String[] args) { FutureSample futureSample = new FutureSample(); futureSample.futureTask(); } // 使用FutureTask private void futureTask() { // 建立任务集合 List<FutureTask<Integer>> taskList = new ArrayList<>(); ExecutorService exec = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { // 传入Callable对象建立FutureTask对象 FutureTask<Integer> ft = new FutureTask<Integer>(new ComputeTask(i,"task_" + i)); taskList.add(ft); exec.submit(ft); } System.out.println("主线程已经提交任务,作本身的事!"); // 开始统计各计算线程计算结果 int totalResult = 0; for (FutureTask<Integer> ft : taskList) { try { // FutureTask的get方法会自动阻塞,直到获取计算结果为止 totalResult = totalResult + ft.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("total = " + totalResult); exec.shutdown(); } // 使用Future private void future() { // 建立任务集合 List<Future<Integer>> futureList = new ArrayList<>(); ExecutorService exec = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { // 提交Callable对象建立Future对象 Future<Integer> result = exec.submit(new ComputeTask(i, "task_" + i)); futureList.add(result); } System.out.println("主线程已经提交任务,作本身的事!"); // 开始统计各计算线程计算结果 int totalResult = 0; for (Future<Integer> ft : futureList) { try { // FutureTask的get方法会自动阻塞,直到获取计算结果为止 totalResult = totalResult + ft.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("total = " + totalResult); exec.shutdown(); } }
运行结果 task_0子任务已经建立 task_1子任务已经建立 task_2子任务已经建立 task_3子任务已经建立 task_4子任务已经建立 task_5子任务已经建立 task_6子任务已经建立 task_7子任务已经建立 task_8子任务已经建立 task_9子任务已经建立 主线程已经提交任务,作本身的事! task_4子任务已经完成 task_1子任务已经完成 task_0子任务已经完成 task_3子任务已经完成 task_2子任务已经完成 task_5子任务已经完成 task_9子任务已经完成 task_6子任务已经完成 task_7子任务已经完成 task_8子任务已经完成 total = 49545
CompletionService实际上能够看作是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,经过相似BlockingQueue的put和take得到任务执行的结果。
CompletionService的一个实现是ExecutorCompletionService。
ExecutorCompletionService把具体的计算任务交给Executor完成。在实现上,ExecutorCompletionService在构造函数中会建立一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的做用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,而后改写FutureTask的done方法,以后把Executor执行的计算结果放入BlockingQueue中。
与ExecutorService最主要的区别在于submit的task不必定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,若是Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。因此,先完成的一定先被取出。这样就减小了没必要要的等待时间。
public class WorkTask implements Callable<String> { private String name; public WorkTask(String name) { this.name = name; } @Override public String call() throws Exception { //休眠随机时间,观察获取结果的行为。 int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); String str = name+" sleept time:"+sleepTime; System.out.println(str+" finished...."); return str; } }
public class CompletionTest { private final int POOL_SIZE = 5; private final int TOTAL_TASK = 10; // 方法一,本身写集合来实现获取线程池中任务的返回结果 public void testByQueue() throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); BlockingQueue<Future<String>> queue = new LinkedBlockingDeque<>(); // 向里面扔任务 for (int i = 0; i < TOTAL_TASK; i++) { Future<String> future = pool.submit(new WorkTask("ExecTask" + i)); queue.add(future); } // 检查线程池任务执行结果 for (int i = 0; i < TOTAL_TASK; i++) { System.out.println("ExecTask:" + queue.take().get()); } pool.shutdown(); } // 方法二,经过CompletionService来实现获取线程池中任务的返回结果 public void testByCompletion() throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); CompletionService<String> service = new ExecutorCompletionService<String>(pool); // 向里面扔任务 for (int i = 0; i < TOTAL_TASK; i++) { service.submit(new WorkTask("ExecTask" + i)); } // 检查线程池任务执行结果 for (int i = 0; i < TOTAL_TASK; i++) { Future<String> future = service.take(); System.out.println("CompletionService:" + future.get()); } } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletionTest completionTest = new CompletionTest(); // completionTest.testByQueue(); completionTest.testByCompletion(); } }
运行结果 completionTest.testByQueue() ExecTask4 sleept time:12 finished.... ExecTask1 sleept time:148 finished.... ExecTask3 sleept time:652 finished.... ExecTask6 sleept time:522 finished.... ExecTask5 sleept time:666 finished.... ExecTask0 sleept time:714 finished.... ExecTask:ExecTask0 sleept time:714 ExecTask:ExecTask1 sleept time:148 ExecTask8 sleept time:136 finished.... ExecTask2 sleept time:829 finished.... ExecTask:ExecTask2 sleept time:829 ExecTask:ExecTask3 sleept time:652 ExecTask:ExecTask4 sleept time:12 ExecTask:ExecTask5 sleept time:666 ExecTask:ExecTask6 sleept time:522 ExecTask9 sleept time:230 finished.... ExecTask7 sleept time:796 finished.... ExecTask:ExecTask7 sleept time:796 ExecTask:ExecTask8 sleept time:136 ExecTask:ExecTask9 sleept time:230 =============================== completionTest.testByCompletion(); ExecTask2 sleept time:89 finished.... CompletionService:ExecTask2 sleept time:89 ExecTask4 sleept time:137 finished.... CompletionService:ExecTask4 sleept time:137 ExecTask6 sleept time:495 finished.... CompletionService:ExecTask6 sleept time:495 ExecTask3 sleept time:738 finished.... CompletionService:ExecTask3 sleept time:738 ExecTask0 sleept time:812 finished.... CompletionService:ExecTask0 sleept time:812 ExecTask1 sleept time:972 finished.... CompletionService:ExecTask1 sleept time:972 ExecTask5 sleept time:921 finished.... CompletionService:ExecTask5 sleept time:921 ExecTask9 sleept time:473 finished.... CompletionService:ExecTask9 sleept time:473 ExecTask7 sleept time:711 finished.... CompletionService:ExecTask7 sleept time:711 ExecTask8 sleept time:880 finished.... CompletionService:ExecTask8 sleept time:880
总结:
使用方法一,本身建立一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先得到的是最早完成任务的线程返回值。它只是按加入线程池的顺序返回。由于take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
使用方法二,使用CompletionService来维护处理线程不的返回结果时,主线程老是可以拿到最早完成的任务的返回值,而无论它们加入线程池的顺序。