在ExecutorService的submit方法中能够获取返回值,经过Future的get方法,可是这个Future类存在缺陷,Future接口调用get()方法取得处理后的返回结果时具备阻塞性,也就是说调用Future的get方法时,任务没有执行完成,则get方法要一直阻塞等到任务完成为止。 这样大大的影响了系统的性能,这就是Future的最大缺点。为此,java1.5之后提供了CompletionServlice来解决这个问题。java
CompletionService 接口CompletionService的功能是异步的方式,一边生产任务,一边处理完成的任务结果,这样能够将执行的任务与处理任务隔离开来进行处理,使用submit执行任务,使用塔克获取已完成的任务,并按照这些任务的完成的时间顺序来处理他们的结果。dom
向ExecutorService 提交一组任务,哪一个任务先完成,就把完成任务的返回结果打印出来。异步
public class CompletionServiceExecutorDemo { public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(10); // 同时运行多个任务,那个任务先返回数据,就先获取该数据 CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool); for (int i = 1; i <= 10; i++) { final int seq = i; completionService.submit(new Callable<String>() { @Override public String call() throws Exception { int waitTime = new Random().nextInt(10); TimeUnit.SECONDS.sleep(waitTime); return "callable:"+seq+" 执行时间:"+waitTime+"s"; } }); } for (int i = 1; i <= 10; i++) { try { Future<String> future = completionService.take(); System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadPool.shutdown(); } }
执行结果以下:ide
callable:6 执行时间:1s callable:2 执行时间:3s callable:10 执行时间:3s callable:1 执行时间:4s callable:4 执行时间:5s callable:8 执行时间:5s callable:7 执行时间:7s callable:5 执行时间:8s callable:9 执行时间:9s callable:3 执行时间:9s
从打印结果能够看出,这些任务是按照任务执行完成的顺序打印的,先执行完就先返回结果。源码分析
ExecutorCompletionService 类结构以下性能
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; //线程池 private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //任务完成队列 private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
ExecutorCompletionService 类中定义了一个QueueingFuture 的内部类,继承于FutureTask类,内部重写了FutureTask的done方法,该方法是在FutureTask任务执行完成后会调用的方法,在FutureTask中该方法未实现任何逻辑。this
重写done方法,在任务处理完成后把该FutureTask任务放入到阻塞队列(BlockingQueue)中,而后咱们就能够从阻塞队列中take执行完成的任务,进行想用的处理。spa
这里是实现ExecutorCompletionService的核心逻辑。线程
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
ExecutorCompletionService 支持Callable和Runnable任务code
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
该构造能够指定一个阻塞队列,其它功能同上构造方法。
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
该方法能够向ExecutorCompletionService 中提交要执行的任务。
支持Callable和Runnable两种类型的任务。
若是提交的Runnable任务,则执行完后返回的结果为null。
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
从阻塞队列中获取执行完成的任务的,若是队列为空且任务没有所有完成,则阻塞当前线程,直到有任务执行完成。
public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }
ExecutorCompletionService支持非阻塞方式从阻塞队列中获取已完成的任务
ExecutorCompletionService的实现原理是内部使用了FutureTask来实现异步的任务执行。经过一个内部类继承FutureTask,并实现了FutureTask的一个done方法。该done方法会在任务执行完成以后调用该方法,在任务执行完以后把当前的FutureTask放入到阻塞队列中。这样就实现了先执行完成的任务先存放到阻塞队列中,应用程序能够从阻塞队列中提早获取先执行完的任务。