Java多线程(2):使用线程池 ThreadPoolExecutor

首先,咱们为何须要线程池
让咱们先来了解下什么是 对象池 技术。某些对象(好比线程,数据库链接等),它们建立的代价是很是大的 —— 相比于通常对象,它们建立消耗的时间和内存都很大(并且这些对象销毁的代价比通常对象也大)。因此,若是咱们维护一个 ,每次使用完这些对象以后,并不销毁它,而是将其放入池中,下次须要使用时就直接从池中取出,即可以免这些对象的重复建立;同时,咱们能够固定 池的大小,好比设置池的大小为 N —— 即池中只保留 N 个这类对象 —— 当池中的 N 个对象都在使用中的时候,为超出数量的请求设置一种策略,好比 排队等候 或者 直接拒绝请求 等,从而避免频繁的建立此类对象。
线程池 即对象池的一种(池中的对象为线程 Thread),相似的还有 数据库链接池(池中对象为数据库链接 Connection)。合理利用线程池可以带来三个好处(参考文末的 References[1]):html

  1. 下降资源消耗,经过重复利用已建立的线程,下降线程建立和销毁时形成的时间和内存上的消耗;
  2. 提高响应速度,当任务到达时,直接使用线程池中的线程来运行任务,使得任务能够不须要等到线程建立就能当即执行;
  3. 提升线程的可管理性,线程是开销很大的对象,若是无限制的建立线程,不只会快速消耗系统资源,还会下降系统的稳定性;而使用线程池能够对线程进行统一的分配和调控。

本文只介绍 Java 中线程池的基本使用,不会过多的涉及到线程池的原理。若是有兴趣的读者须要深刻理解线程池的实现原理,能够参考文末的 Referencesjava

JDK 中线程池的基础架构以下:
JDK 中线程池的基础架构数据库

执行器 Executor 是顶级接口,只包含了一个 execute 方法,用来执行一个 Runnable 任务:
Executorsegmentfault

执行器服务 ExecutorService 接口继承了 Executor 接口,ExecutorService 是全部线程池的基础接口,它定义了 JDK 中线程池应该实现的基本方法:
ExecutorService缓存

线程池执行器 ThreadPoolExecutor 是基础线程池的核心实现,而且能够经过定制 ThreadPoolExecutor 的构造参数或者继承 ThreadPoolExecutor,实现本身的线程池;多线程

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,是能执行周期性任务或定时任务的线程池;架构

ForkJoinPool 是 JDK1.7 时添加的类,做为对 Fork/Join 型线程池的实现。ide

本文只介绍 ThreadPoolExecutor 线程池的使用,ScheduledThreadPoolExecutorForkJoinPool 会在以后的文章中介绍。工具


查看 ThreadPoolExecutor 的源码可知,在 ThreadPoolExecutor 的内部,将每一个池中的线程包装为了一个 Worker性能

Worker 类图
Worker

而后在 ThreadPoolExecutor 中定义了一个 HashSet<Worker>,做为 “池”
workers


设置一个合适的线程池(即自定义 ThreadPoolExecutor)是比较麻烦的,所以 JDK 经过 Executors 这个工厂类为咱们提供了一些预先定义好的线程池:

一、固定大小的线程池
构造固定大小的线程池的工厂方法

建立一个包含 nThreads 个工做线程的线程池,这 nThreads 个线程共享一个无界队列(即不限制大小的队列);当新任务提交到线程池时,若是当前没有空闲线程,那么任务将放入队列中进行等待,直到有空闲的线程来从队列中取出该任务并运行。

(经过 Runtime.getRuntime().availableProcessors() 能够得到当前机器可用的处理器个数,对于计算密集型的任务,固定大小的线程池的 nThreads 设置为这个值时,通常能得到最大的 CPU 使用率)

二、单线程线程池
构造单线程线程池的工厂方法

建立一个只包含一个工做线程的线程池,它的功能能够简单的理解为 即 newFixedThreadPool 方法传入参数为 1 的状况。可是与 newFixedThreadPool(1) 不一样的是,若是线程池中这个惟一的线程意外终止,线程池会建立一个新线程继续执行以后的任务。

三、可缓存线程的线程池
构造可缓存线程的线程池的工厂方法

建立一个可缓存线程的线程池。当新任务提交到线程池时,若是当前线程池中有空闲线程可用,则使用空闲线程来运行任务,不然新建一个线程来运行该任务,并将该线程添加到线程池中;并且该线程池会终止并移除那些超过 60 秒未被使用的空闲线程。因此这个线程池表现得就像缓存,缓存的资源为线程,缓存的超时时间为 60 秒。根据 JDK 的文档,当任务的运行时间都较短的时候,该线程池有利于提升性能。

咱们看到每一个构造线程池的工厂方法都有一个带 ThreadFactory 的重载形式。ThreadFactory 即线程池用来新建线程的工厂,每次线程池须要新建一个线程时,调用的就是这个 ThreadFactorynewThread 方法:
ThreadFactory

(若是不提供自定义的 ThreadFactory,那么使用的就是 DefaultThreadFactory —— Executors 内定义的内部类)
好比咱们要为线程池中的每一个线程提供一个特定的名字,那么咱们就能够自定义 ThreadFactory 并重写其 newThread 方法:

public class SimpleThreadFactory implements ThreadFactory {

    private AtomicInteger id = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("Test_Thread-" + id.getAndIncrement());
        return thread;
    }

}

经过 JDK 的源码咱们能够知道,以上三种线程池的实现都是基于 ThreadPoolExecutor

可缓存线程池的实现

单线程池的实现

固定大小线程池的实现


下面咱们来看一下线程池的基础接口 ExecutorService 中每一个方法的含义。
首先是从 Executor 接口继承到的 execute 方法:
execute 方法

使用该方法即将一个 Runnable 任务交给线程池去执行。


submit 方法:
submit 方法

submit 方法会提交一个任务去给线程池执行,该任务能够是带返回结果的 Callable<V> 任务,也能够是一开始就指定结果的 Runnable 任务,或者不带结果的 Runnable 任务(此时即一开始指定结果为 null)。submit 方法会返回一个与所提交任务相关联的 Future<V>。经过 上一篇文章 咱们能够知道,Future<V>get 方法能够等待任务执行完毕并返回结果。因此经过 Future<V>,咱们能够与已经提交到线程池的任务进行交互。submit 提交任务及任务运行过程大体以下:

  1. 向线程池提交一个 Runnable 或者 Callable<V> 任务;
  2. 将 任务 做为参数使用 newTaskFor 方法构造出 FutureTask<V>

(由 上一篇文章 可知,FutureTask<V> 实现了 RunnableFuture<V> 两个接口,从而 FutureTask<V> 能够做为 Runnable 交给 WorkerThread)去运行,也能够做为一个 Future<V> 与任务交互)

![newTaskFor 方法][19]
  1. 线程池使用 execute 方法将 FutureTask<V> 交给当前的 Worker 去运行,并将 FutureTask<V>Future<V> 返回;

    submit 方法

  2. 而后 Worker 执行任务(即运行 run 方法),在任务完成后,为 Future<V>FutureTask<V>) 设置结果 —— 设置结果以前,调用 Future<V>get 方法会让调用线程处于阻塞状态;
    FutureTask 实现的 run 方法
  3. 经过 Future<V>get 方法,得到任务的结果。

invokeAll 方法:
invokeAll 方法

invokeAll 方法能够一次执行多个任务,但它并不一样等于屡次调用 submit 方法。submit 方法是非阻塞的,每次调用 submit 方法提交任务到线程池以后,会当即返回与任务相关联的 Future<V>,而后当前线程继续向后执行;

invokeAll 方法是阻塞的,只有当提交的多个任务都执行完毕以后,invokeAll 方法才会返回,执行结果会以List<Future<V>> 返回,该 List<Future<V>> 中的每一个 Future<V> 是和提交任务时的 Collection<Callable<V>> 中的任务 Callable<V> 一 一对应的。带 timeout 参数的 invokeAll 就是设置一个超时时间,若是超过这个时间 invokeAll 中提交的全部任务还有没所有执行完,那么没有执行完的任务会被取消(中断),以后一样以一个 List<Future<V>> 返回执行的结果。


invokeAny 方法:
invokeAny 方法

invokeAny 方法也是阻塞的,与 invokeAll 方法的不一样之处在于,当所提交的一组任务中的任何一个任务完成以后,invokeAny 方法便会返回(返回的结果即是那个已经完成的任务的返回值),而其余任务会被取消(中断)。

举一个 invokeAny 使用的例子:电脑有 C、D、E、F 四个盘,咱们须要找一个文件,可是咱们不知道这个文件位于哪一个盘中,咱们即可以使用 invokeAny,并提交四个任务(对应于四个线程)分别查找 C、D、E、F 四个盘,若是哪一个线程找到了这个文件,那么此时 invokeAny 便中止阻塞并返回结果,同时取消其余任务。


shutdown 方法:
shutdown 方法

shutdown 方法的做用是向线程池发送关闭的指令。一旦在线程池上调用 shutdown 方法以后,线程池便不能再接受新的任务;若是此时还向线程池提交任务,那么将会抛出 RejectedExecutionException 异常。以后线程池不会马上关闭,直到以前已经提交到线程池中的全部任务(包括正在运行的任务和在队列中等待的任务)都已经处理完成,才会关闭。


shutdownNow 方法:
shutdownNow 方法

shutdown 不一样,shutdownNow 会当即关闭线程池 —— 当前在线程池中运行的任务会所有被取消,而后返回线程池中全部正在等待的任务。


(值得注意的是,咱们 必须显式的关闭线程池,不然线程池不会本身关闭)


awaitTermination 方法:
awaitTermination 方法

awaitTermination 能够用来判断线程池是否已经关闭。调用 awaitTermination 以后,在 timeout 时间内,若是线程池没有关闭,则阻塞当前线程,不然返回 true;当超过 timeout 的时间后,若线程池已经关闭则返回 true,不然返回 false。该方法通常这样使用:

  1. 任务所有提交完毕以后,咱们调用 shutdown 方法向线程池发送关闭的指令;
  2. 而后咱们经过 awaitTermination 来检测到线程池是否已经关闭,能够得知线程池中全部的任务是否已经执行完毕;
  3. 线程池执行完已经提交的全部任务,并将本身关闭;
  4. 调用 awaitTermination 方法的线程中止阻塞,并返回 true

isShutdown() 方法,若是线程池已经调用 shutdown 或者 shutdownNow,则返回 true,不然返回 false


isTerminated() 方法,若是线程池已经调用 shutdown 而且线程池中全部的任务已经执行完毕,或者线程池调用了 shutdownNow,则返回 true,不然返回 false


经过以上介绍,咱们已经了解了 ExecutorService 中全部方法的功能,如今让咱们来实践 ExecutorService 的功能。

咱们继续使用 上一篇文章 的两个例子中的任务,首先是任务类型为 Runnable 的状况:

import java.util.*;
import java.util.concurrent.*;

public class RunnableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用线程池运行 Runnable 任务:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池

        List<AccumRunnable> tasks = new ArrayList<>(10);

        for (int i = 0; i < 10; i++) {
            AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10);
            tasks.add(task);
            
            threadPool.execute(task); // 让线程池执行任务 task
        }
        threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕以后,线程池会关闭

        threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待线程池关闭,等待的最大时间为 1 小时

        int total = 0;
        for (AccumRunnable task : tasks) {
            total += task.getResult(); // 调用在 AccumRunnable 定义的 getResult 方法得到返回的结果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumRunnable implements Runnable {

        private final int begin;
        private final int end;

        private int result;

        public AccumRunnable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public void run() {
            result = 0;
            try {
                for (int i = begin; i <= end; i++) {
                    result += i;
                    Thread.sleep(100);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace(System.err);
            }
            System.out.printf("(%s) - 运行结束,结果为 %d\n",
                    Thread.currentThread().getName(), result);
        }

        public int getResult() {
            return result;
        }
    }
}

运行结果:
线程池的运行 Runnable 任务的结果

能够看到 NetBeans 给出的运行时间为 2 秒 —— 由于每一个任务须要 1 秒的时间,而线程池中的线程个数固定为 5 个,因此线程池最多同时处理 5 个任务,于是 10 个任务总共须要 2 秒的运行时间。


任务类型为 Callable

import java.util.*;
import java.util.concurrent.*;

public class CallableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用线程池运行 Callable 任务:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池
        
        List<Future<Integer>> futures = new ArrayList<>(10);
        
        for (int i = 0; i < 10; i++) {
            AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10);
            Future<Integer> future = threadPool.submit(task); // 提交任务
            futures.add(future);
        }
        threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕以后,线程池会关闭

        int total = 0;
        for (Future<Integer> future : futures) {
            total += future.get(); // 阻塞,直到任务结束,返回任务的结果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumCallable implements Callable<Integer> {

        private final int begin;
        private final int end;

        public AccumCallable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public Integer call() throws Exception {
            int result = 0;
            for (int i = begin; i <= end; i++) {
                result += i;
                Thread.sleep(100);
            }
            System.out.printf("(%s) - 运行结束,结果为 %d\n",
                    Thread.currentThread().getName(), result);

            return result;
        }

    }

}

运行结果:
使用线程池运行 Callable 任务的结果


改写上面的代码,使用 invokeAll 来直接执行一组任务:

public static void main(String[] args) throws Exception {
    System.out.println("使用线程池 invokeAll 运行一组 Callable 任务:");

    ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定为 5 的线程池

    List<AccumCallable> tasks = new ArrayList<>(10); // tasks 为一组任务
    for (int i = 0; i < 10; i++) {
        tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); 
    }

    List<Future<Integer>> futures = threadPool.invokeAll(tasks); // 阻塞,直到全部任务都运行完毕

    int total = 0;
    for (Future<Integer> future : futures) {
        total += future.get(); // 返回任务的结果
    }

    System.out.println("Total: " + total);

    threadPool.shutdown(); // 向线程池发送关闭的指令
}

运行结果:
使用线程池 invokeAll 运行一组 Callable 任务的结果


线程池是很强大并且很方便的工具,它提供了对线程进行统一的分配和调控的各类功能。自 JDK1.5 时 JDK 添加了线程池的功能以后,通常状况下更推荐使用线程池来编写多线程程序,而不是直接使用 Thread

invokeAny 也是很实用的方法,请有兴趣的读者本身实践)


References:

  1. http://www.infoq.com/cn/artic...
  2. http://www.cnblogs.com/absfre...
相关文章
相关标签/搜索