使用ExecutorService提交多个任务时,须要保存Callable对应的Future。java
须要反复循环判断future是否完成。dom
@Test public void test() throws InterruptedException { LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>(); Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(5); Thread producerThread = new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { int finalI = i; Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { int time = random.nextInt(10000); Thread.sleep(time); return "task_" + finalI; } }); System.out.println("submit_" + i); futures.add(future); } } }; producerThread.start(); Thread consumerThread = new Thread(){ @Override public void run() { while (true) { for (Future<String> future : futures) { if (future.isDone()) { try { System.out.println(future.get()); futures.remove(future); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } } } }; consumerThread.start(); producerThread.join(); consumerThread.join(); }
使用CompletionService能够简化操做。ide
@Test public void test2() throws InterruptedException { //LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>(); Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(5); //使用ExecutorCompletionService包装ExecutorService ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(pool); Thread producerThread = new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { int finalI = i; //Future<String> future = pool.submit(new Callable<String>() { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { int time = random.nextInt(10000); Thread.sleep(time); return "task_" + finalI; } }); System.out.println("submit_" + i); //futures.add(future); } } }; producerThread.start(); Thread consumerThread = new Thread(){ @Override public void run() { while (true) { try { Future<String> take = completionService.take(); System.out.println(take.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /*while (true) { for (Future<String> future : futures) { if (future.isDone()) { try { System.out.println(future.get()); futures.remove(future); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }*/ } }; consumerThread.start(); producerThread.join(); consumerThread.join(); }
//构造方法 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
初始化自身属性:this
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); //包装成QueueingFuture,传递给executor executor.execute(new QueueingFuture(f)); return f; }
ExecutorCompletionService中的内部类线程
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
重写done方法,将task添加到completionQueue。completionQueue是ExecutorCompletionService中的属性。因此,执行完一个任务,就将执行完的RunnableFuture添加到ExecutorCompletionService的阻塞队列completionQueue中。code
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
而take操做就是从阻塞队列中取出,已经完成的任务结果(RunnableFuture)。队列