本文已同步至我的博客 liaosi's blog-Executor框架(一)Callable、Future、Executor和ExecutorService
Executor框架是指JDK 1.5中引入的一系列并发库中与Executor相关的功能类,包括Executor、Executors、ExecutorService、Future、Callable等。java
若是使用new Thread(...).start()
的方法处理多线程,有以下缺点:多线程
使用线程池的方式,有以下优势:并发
下面开始分析一下Executor框架中几个比较重要的接口和类。框架
Callable 位于java.util.concurrent包下,它是一个接口,只声明了一个叫作call()的方法。ide
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable 接口相似于Runnable,二者都是为那些其实例可能被另外一个线程执行的类设计的。和 Runnable 接口中的run()
方法相似,Callable 提供一个call()
方法做为线程的执行体。可是call()
方法比run()
方法更增强大,这要体如今:测试
1)call 方法能够有返回值。返回值的类型便是 Callable 接口传递进来的V类型。 2)call 方法能够声明抛出异常。
Future 接口位于java.util.concurrent包下,是Java 1.5中引入的接口.
Future主要用来对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时能够经过get()
方法获取执行结果,get()
方法会阻塞直到任务返回结果。this
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
当你提交一个Callable对象给线程池时,将获得一个Future对象,而且它和你传入的Callable示例有相同泛型。spa
Future 接口中的5个方法:线程
mayInterruptIfRunning
表示是否容许取消正在执行却没有执行完毕的任务,若是设置true,则表示能够取消正在执行过程当中的任务。mayInterruptIfRunning
为true仍是false,此方法确定返回false,即若是取消已经完成的任务会返回false;mayInterruptIfRunning
设置为true,则返回true,若mayInterruptIfRunning
设置为false,则返回false;mayInterruptIfRunning
为true仍是false,确定返回true。TimeoutException
异常。也就是说Future提供了三种功能:设计
1)判断任务是否完成; 2)可以中断任务; 3)可以获取任务执行结果
Executor是一个接口,它将任务的提交与任务的执行分离开来,定义了一个接收Runnable对象的方法execute。Executor是Executor框架中最基础的一个接口,相似于集合中的Collection接口。
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
ExecutorService继承了Executor,是一个比Executor使用更普遍的子类接口。定义了终止任务、提交任务、跟踪任务返回结果等方法。
一个ExecutorService是能够关闭的,关闭以后它将不能再接收任何任务。对于再也不使用的ExecutorService,应该将其关闭以释放资源。
package java.util.concurrent; import java.util.List; import java.util.Collection; public interface ExecutorService extends Executor { /** * 平滑地关闭线程池,已经提交到线程池中的任务会继续执行完。 */ void shutdown(); /** * 当即关闭线程池,返回尚未开始执行的任务列表。 * 会尝试中断正在执行的任务(每一个线程调用 interruput方法),但这个行为不必定会成功。 */ List<Runnable> shutdownNow(); /** * 判断线程池是否已经关闭 */ boolean isShutdown(); /** * 判断线程池的任务是否已经执行完毕。 * 注意此方法调用以前须要先调用shutdown()方法或者shutdownNow()方法,不然老是会返回false */ boolean isTerminated(); /** * 判断线程池的任务是否都执行完。 * 若是没有任务没有执行完毕则阻塞,直至任务完成或者达到了指定的timeout时间就会返回 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交带有一个返回值的任务到线程池中去执行(回调),返回的 Future 表示任务的待定结果。 * 当任务成功完成后,经过 Future 实例的 get() 方法能够获取该任务的结果。 * Future 的 get() 方法是会阻塞的。 */ <T> Future<T> submit(Callable<T> task); /** *提交一个Runnable的任务,当任务完成后,能够经过Future.get()获取的是提交时传递的参数T result * */ <T> Future<T> submit(Runnable task, T result); /** * 提交一个Runnable的人无语,它的Future.get()得不到任何内容,它返回值老是Null。 * 为何有这个方法?为何不直接设计成void submit(Runnable task)这种方式? * 这是由于Future除了get这种获取任务信息外,还能够控制任务, 具体体如今 Future的这个方法上:boolean cancel(boolean mayInterruptIfRunning) 这个方法可以去取消提交的Rannable任务。 */ Future<?> submit(Runnable task); /** * 执行一组给定的Callable任务,返回对应的Future列表。列表中每个Future都将持有该任务的结果和状态。 * 当全部任务执行完毕后,方法返回,此时而且每个Future的isDone()方法都是true。 * 完成的任务多是正常结束,也能够是异常结束 * 若是当任务执行过程当中,tasks集合被修改了,那么方法的返回结果将是不肯定的, 即不能肯定执行的是修改前的任务,仍是修改后的任务 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 执行一组给定的Callable任务,返回对应的Future列表。列表中每个Future都将持有该任务的结果和状态。 * 当全部任务执行完毕后或者超时后,方法将返回,此时而且每个Future的isDone()方法都是true。 * 一旦方法返回,未执行完成的任务被取消,而完成的任务可能正常结束或者异常结束, * 完成的任务能够是正常结束,也能够是异常结束 * 若是当任务执行过程当中,tasks集合被修改了,那么方法的返回结果将是不肯定的 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 执行一组给定的Callable任务,当成功执行完(没抛异常)一个任务后此方法便返回,返回的是该任务的结果 * 一旦此正常返回或者异常结束,未执行的任务都会被取消。 * 若是当任务执行过程当中,tasks集合被修改了,那么方法的返回结果将是不肯定的 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 执行一组给定的Callable任务,当在timeout(超时)以前成功执行完(没抛异常)一个任务后此方法便返回,返回的是该任务的结果 * 一旦此正常返回或者异常结束,未执行的任务都会被取消。 * 若是当任务执行过程当中,tasks集合被修改了,那么方法的返回结果将是不肯定的 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
shutdown() 和 shutdownNow() 是用来关闭链接池的两个方法,并且这两个方法都是在当前线程当即返回,不会阻塞至线程池中的方法执行结束。调用这两个方法以后,链接池将不能再接受任务。
下面给写几个示例来加深ExecutorService的方法的理解。
先写两个任务类:ShortTask和LongTask,这两个类都继承了Runnable接口,ShortTask的run()
方法执行很快,LongTask的run()
方法执行时间为10s。
public class LongTask implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("complete a long task"); } }
public class ShortTask implements Runnable { @Override public void run() { System.out.println("complete a short task..."); } }
shutdown()
方法@Test public void testShutdown() throws InterruptedException { ExecutorService threadpool = Executors.newFixedThreadPool(4); //将4个任务提交到有4个线程的线程池 threadpool.submit(new ShortTask()); threadpool.submit(new ShortTask()); threadpool.submit(new LongTask()); threadpool.submit(new ShortTask()); //关闭线程池 threadpool.shutdown(); boolean isShutdown = threadpool.isShutdown(); System.out.println("线程池是否已经关闭:" + isShutdown); final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) { System.out.println("线程池中还有任务在执行,当前时间:" + sdf.format(new Date())); } System.out.println("线程池中已经没有在执行的任务,线程池已彻底关闭!"); }
控制台输出的结果:
complete a short task... 线程池是否已经关闭:true complete a short task... complete a short task... 线程池中还有任务在执行,当前时间:19:53:08 线程池中还有任务在执行,当前时间:19:53:09 线程池中还有任务在执行,当前时间:19:53:10 线程池中还有任务在执行,当前时间:19:53:11 线程池中还有任务在执行,当前时间:19:53:12 线程池中还有任务在执行,当前时间:19:53:13 线程池中还有任务在执行,当前时间:19:53:14 线程池中还有任务在执行,当前时间:19:53:15 线程池中还有任务在执行,当前时间:19:53:16 complete a long task 线程池中已经没有在执行的任务,线程池已彻底关闭!
shutdownNow()
方法@Test public void testShutdownNow() throws InterruptedException { ExecutorService threadpool = Executors.newFixedThreadPool(3); //将5个任务提交到有3个线程的线程池 threadpool.submit(new LongTask()); threadpool.submit(new LongTask()); threadpool.submit(new LongTask()); threadpool.submit(new LongTask()); threadpool.submit(new LongTask()); //主线程睡眠2秒钟,让3个线程池的任务都开始执行 TimeUnit.SECONDS.sleep(1L); //关闭线程池 List<Runnable> waiteRunnables = threadpool.shutdownNow(); System.out.println("尚未执行的任务数:" + waiteRunnables.size()); boolean isShutdown = threadpool.isShutdown(); System.out.println("线程池是否已经关闭:" + isShutdown); final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) { System.out.println("线程池中还有任务在执行,当前时间:" + sdf.format(new Date())); } System.out.println("线程池中已经没有在执行的任务,线程池已彻底关闭!"); }
控制台输出:
java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) 尚未执行的任务数:2 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 线程池是否已经关闭:true at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) complete a long task at java.lang.Thread.run(Thread.java:748) complete a long task java.lang.InterruptedException: sleep interrupted complete a long task at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.lzumetal.multithread.threadpooltest.LongTask.run(LongTask.java:11) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 线程池中已经没有在执行的任务,线程池已彻底关闭!
从上面咱们看到当调用了试图shutdownNow()
后,那三个执行的任务都被interrupt了。并且awaitTermination(long timeout, TimeUnit unit)
方法返回的是true。
submit(Callable<T> task)
方法Callabel任务类:
package com.lzumetal.multithread.threadpooltest; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public class CallableTask implements Callable<String> { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5L); return "success"; } }
测试:
@Test public void testSubmitCallable() { ExecutorService threadpool = null; try { threadpool = Executors.newFixedThreadPool(3); final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); System.out.println("提交一个callable任务到线程池,如今时间是:" + sdf.format(new Date())); Future<String> future = threadpool.submit(new CallableTask()); System.out.println("获取callable任务的结果:" + future.get() + ",如今时间是:" + sdf.format(new Date())); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (threadpool != null) { threadpool.shutdown(); } } }
控制台输出:
提交一个callable任务到线程池,如今时间是:20:25:27 获取callable任务的结果:success,如今时间是:20:25:32