Executor,Executors和ExecutorService

Executor

Executor是java.util.concurrent包中的一个接口,是一个执行提交的Runnable任务的对象。这个接口提供了一种方式把任务提交从每一个任务会如何执行的方法中解耦,包括线城市用,调度等的细节。使用Executor代替了显式建立线程。例如,比起对一组task中的每个调用new Thread(new(RunnableTask())).start(),你能够用:java

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

可是,Executor接口不是严格须要执行是异步的。在最简单的状况中,一个executor可以在调用者的线程上当即运行提交的任务:网络

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();
  }
}

更典型的是,任务执行在非调用者线程。下面的executor为每一个task产出一个新的线程:app

class ThreadPerTaskExecutor implements Executor {
  public void execute(Runnable r) {
    new Thread(r).start();
  }
}

不少Executor的实现强制加入了一些关于如何以及什么时候任务被调度的限制。下面的executor串行提交的任务到第二个executor,代表它是一个混合的executor:异步

class SerialExecutor implements Executor {
  final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  final Executor executor;
  Runnable active;

  SerialExecutor(Executor executor) {
    this.executor = executor;
  }

  public synchronized void execute(final Runnable r) {
    tasks.offer(new Runnable() {
      public void run() {
        try {
          r.run();
        } finally {
          scheduleNext();
        }
      }
    });
    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
  }
}

在java.util.concurrent包中提供的Executor接口的实现(如ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPoolAbstractExecutorService)也同时实现了ExecutorService,这是一个更普遍的接口。ThreadPoolExecutor类提供了一个可扩展的线程池实现。Executors类为这些Executors提供了方便的工厂方法。socket

内存一致性效应:线程中在提交一个Runnable对象给一个Executor以前发生的操做happen-before执行这个Runnable(可能在另外一个线程中执行)。this

实现Executor接口须要实现execute方法,定义以下:线程

void execute(Runnbale command)

这个方法在将来某个时间点执行给定的command。这个command可能执行在一个新的线程中,在一个池化的线程中,或在调用者线程中,这取决于Executor的实现。code

ExecutorService

ExecutorService接口继承Executor接口,是提供管理终止的方法以及produce出Future去跟踪一个或多个异步任务进度的方法的Executor。server

一个ExecutorService能够被shutdown,会致使它拒绝新的tasks。提供了两个不一样的方法去关闭一个ExecutorService。shutdown方法容许以前提交的任务在终止以前执行,shutdownNow方法禁止等待已经开始任务并试图结束正在执行的任务。若是一个ExecutorService终止了,一个executor没有正在执行的活跃任务,没有等待执行的任务,也没有新任务能被提交。一个没有使用的ExecutorService应该被关闭以回收资源。对象

submit方法根据Executor的execute(Runnable)方法扩展,建立并返回一个Future,可以用来cancel执行以及等待执行完成。invokeAny方法以及invokeAll方法执行最普通的批量执行,执行一组任务而后等待至少一个,或者全部任务完成。

Executors类为ExecutorService提供工厂方法。

使用举例

这里有一个网络服务,其中一个线程池中的线程为请求提供服务。它使用预配置的Executors的newFixedThreadPool工厂方法:

class NetworkService implements Runnable {
  private final ServerSocket serverSocket;
  private final ExecutorService pool;

  public NetworkService(int port, int poolSize) 
      throws IOException {
    serverSocket = new ServerSocket(port);
    pool = Executors.newFixedTreadPool(poolSize);
  }

  public void run() { // run the service
    try {
      for (;;) {
        pool.execute(new Handler(serverSocket.accept()));
      }
    } catch (IOException ex) {
      pool.shutdown();
    }
  }
}

class Handler implements Runnable {
  private final Socket socket;
  Handler(Socket socket) { this.socket = socket; }
  public void run() {
    // read and service request on socket
  }
}

下面的方法哦经过两步关闭一个ExecutorService,首先调用shutdown以拒绝新来的tasks,而后调用shutdownNow(若是有必要的话),去cancel任何执行的任务:

void shutdownAndAwaitTermination(ExecutorService pool) {
  pool.shutdown(); // Disable new tasks from being submitted
  try {
    // Wait a while for existing tasks to terminate
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
      pool.shutdownNow(); // Cancel currently executing tasks
      // Wait a while for tasks to respond to being cancelled
      if (!pool.awaitTermination(60, TimeUnit.SECONDS))
        System.err.println("Pool did not terminate");
    }
  } catch (InterruptedException ie) {
    // (Re-)Cancel if current thread also interrupted
    pool.shutdownNow();
    // Preserve interrupt status
    Thread.currentThread.interrupt();
  }
}

ExecutorService中定义的方法

shutdown

void shutdown()

启动有序关闭,执行先前提交的任务,可是不接受新的任务。若是已经关闭再次执行没有影响。

这个方法不等待先前提交的任务完成执行,使用awaitTermination去等待任务执行完毕。

shutdownNow

List<Runnable> shutdownNow()

试图中止全部正在执行的任务,中止等待任务的处理,返回一个等待执行的任务列表。

这个方法不等待正在执行的任务终止,使用awaitTermination去等待任务终止。

尽力(best-effort)去中止正在执行的任务。例如,标准实现会经过Thread的interrupt去cancel任务,所以任何回应终止失败的任务永远不会终止。

isShutdown

boolean isShutdown()

若是executor已经被shutdown则返回true。

isTerminated

boolean isTerminated()

若是全部任务在shutdown以后都执行完成则返回true。注意isTerminated永远不会为true除非shutdown或shutdownNow先执行。

awaitTermination

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

在一个shutdown请求后阻塞直到全部任务完成执行,或者timeout发生,或者当前线程被interrupt,看哪一个先发生。

submit

<T> Future<T> submit(Callable<T> task)

提交一个返回值的任务去执行并返回一个表明任务挂起结果的Future对象。Future的get方法会返回成功完成的任务结果。

若是你想要当即阻塞等待一个任务完成,你可使用result = exec.submit(aCallable).get()

submit

<T> Future<T> submit(Runnable task, T result)

提交一个Runnable任务去执行并返回一个表明这个任务的Future。Future的get方法会返回成功完成的任务结果。

submit

Future<?> submit(Runnable task)

提交一个Runnable任务去执行并返回一个表明这个任务的Future。Future的get方法会返回成功完成的任务结果null。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException

执行给定的一组任务,返回一个包含这些任务状态和结果的Future列表。Future的isDone方法对返回列表中的每一个元素调用都返回true。

这个方法会阻塞,等待全部task完成。所以返回的list中的每一个Future都是完成状态。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
       throws InterruptedException

执行给定的一组任务,返回一个包含这些任务状态和结果的列表。全部任务完成或timeout超时时方法返回。Future的isDone方法对返回列表中的每一个元素调用都返回true。

返回后,没有完成的任务被cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
      throws InterruptedException, ExcutionException

执行给定的一组任务,若是其中一个任务成功完成(没有抛出异常)则返回。返回后,全部没完成的任务都被cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException

执行给定的一组任务,若是其中一个任务成功完成(没有抛出异常)则返回。

相关文章
相关标签/搜索