我关注的任务提交方法为:invokeAll, submit, executor
java
1 单任务提交:submit , executor函数
AbstractExecutorService 类的方法实现this
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); //1 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); //2 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //3 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数 execute(ftask); return ftask; }
不论何种形式的submit函数,都是把task封装成一个RunnableFuture对象,并调用execute函数。spa
//4 newTaskFor 函数,把Runnable,Callable封装成统一的FutureTask对象 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
那么FutureTask和RunnableFuture的关系是什么?线程
可知,FutureTask 是 RunnableFuture 的实现类。FutureTask对象既有任务,又会有任务执行结果。code
//5 RunnableFuture 接口继承了Runnable, Future public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } public class FutureTask<V> implements RunnableFuture<V> { //6 构造函数,把Callable 封装成 FutureTask public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } //7构造函数,把Runnable封装成FutrueTask public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
2 批量提交 invokeAll对象
//整体思路: 一一提交任务, 等待返回结果 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); // 存放线程的执行结果 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); //不一样的线程池,不一样的实现方式 execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // 是否线程执行完毕, 若是没有执行完毕,则调用get() if (!f.isDone()) { try { f.get();//阻塞至执行完毕 } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // 只有全部的线程都执行完, 才返回 return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }