使用ThreadPoolExecutor并行执行独立的单线程任务

Java SE 5.0中引入了任务执行框架,这是简化多线程程序设计开发的一大进步。使用这个框架能够方便地管理任务:管理任务的生命周期以及执行策略。html

在这篇文章中,咱们经过一个简单的例子来展示这个框架所带来的灵活与简单。java

基础

执行框架引入了Executor接口来管理任务的执行。Executor是一个用来提交Runnable任务的接口。这个接口将任务提交与任务执行隔离起来:拥有不一样执行策略的executor都实现了同一个提交接口。改变执行策略不会影响任务的提交逻辑。算法

若是你要提交一个Runnable对象来执行,很简单:api

Executor exec = …;

exec.execute(runnable);


如前所述,executor如何去执行提交的runnable任务并无在Executor接口中规定,这取决于你所用的executor的具体类型。这个框架提供了几种不一样的executor,执行策略针对不一样的场景而不一样。线程池

你可能会用到的最多见的executor类型就是线程池executor,也就是ThreadPoolExecutor类(及其子类)的实例。ThreadPoolExecutor管理着一个线程池和一个工做队列,线程池存放着用于执行任务的工做线程。多线程

你确定在其余技术中也了解过“池”的概念。使用“池”的一个最大的好处就是减小资源建立的开销,用过并释放后,还能够重用。另外一个间接的好处是你能够控制使用资源的多少。好比,你能够调整线程池的大小达到你想要的负载,而不损害系统的资源。并发

这个框架提供了一个工厂类,叫Executors,来建立线程池。使用这个工程类你能够建立不一样特性的线程池。尽管底层的实现经常是同样的(ThreadPoolExecutor),但工厂类可使你没必要使用复杂的构造函数就能够快速地设置一个线程池。工程类的工厂方法有:oracle

  • newFixedThreadPool:该方法返回一个最大容量固定的线程池。它会按需建立新线程,线程数量不大于配置的数量大小。当线程数达到最大之后,线程池会一直维持这么多不变。
  • newCachedThreadPool:该方法返回一个无界的线程池,也就是没有最大数量限制。但当工做量减少时,这类线程池会销毁没用的线程。
  • newSingleThreadedExecutor:该方法返回一个executor,它能够保证全部的任务都在一个单线程中执行。
  • newScheduledThreadPool:该方法返回一个固定大小的线程池,它支持延时和定时任务的执行。

这仅仅是一个开端。Executor还有一些其余用法已超出了这篇文章的范围,我强烈推荐你研究如下内容:框架

  • 生命周期管理的方法,这些方法由ExecutorService接口声明(好比shutdown()和awaitTermination())。
  • 使用CompletionService来查询任务状态、获取返回值,若是有返回值的话。

ExecutorService接口特别重要,由于它提供了关闭线程池的方法,并确保清理了再也不使用的资源。使人欣慰的是,ExecutorService接口至关简单、一目了然,我建议全面地学习下它的文档。ide

大体来讲,当你向ExecutorService发送了一个shutdown()消息后,它就不会接收新提交的任务,可是仍在队列中的任务会被继续处理完。你可使用isTerminated()来查询ExecutorService终止状态,或使用awaitTermination(…)方法来等待ExecutorService终止。若是传入一个最大超时时间做为参数,awaitTermination方法就不会永远等待。函数

警告: 对JVM进程永远不会退出的理解上,存在着一些错误和迷惑。若是你不关闭executorService,只是销毁了底层的线程,JVM就不会退出。当最后一个普通线程(非守护线程)退出后,JVM也会退出。

配置ThreadPoolExecutor

若是你决定不使用Executor的工厂类,而是手动建立一个 ThreadPoolExecutor,你须要使用构造函数来建立并配置。下面是这个类使用最普遍的一个构造函数:

public ThreadPoolExecutor(

    int corePoolSize,

    int maxPoolSize,

    long keepAlive,

    TimeUnit unit,

    BlockingQueue<Runnable> workQueue,

    RejectedExecutionHandler handler);


核心池的大小(线程池将会使用的大小)如你所见,你能够配置如下内容:

  • 最大池大小
  • 存活时间,空闲线程在这个时间后被销毁
  • 存听任务的工做队列
  • 任务提交拒绝后要执行的策略

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地建立线程,最终会耗尽运行时资源。你的应用程序所以会产生严重的性能问题,甚至致使程序不稳定。

这只解决了部分问题:限制了并发任务数,但并无限制提交到等待队列的任务数。若是任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的情况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。若是队列填满,之后提交的任务会被“拒绝”。
  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为何这个类名中引用动词“rejected”。你能够实现本身的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可让executor抛出一个RejectedExecutionException异常。然而,还有其余的内建策略:

  • 悄悄地丢弃一个任务
  • 丢弃最旧的任务,从新提交最新的
  • 在调用者的线程中执行被拒绝的任务

何时以及为何咱们才会这样配置线程池?让咱们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个好久之前的任务的问题,个人客户以前就运行过这个任务。大体来讲,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特色,即便我能把它并行化,我也不想那么作。一天的某些时候,事件到达率才很高,文件也不必实时处理,在次日以前处理完便可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,咱们采用了双核的执行环境。一样,事件的到达率至关低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,咱们有了食谱的全部原料,咱们须要作的仅仅是把程序创建起来并调节。在写代码前,咱们必须了解下程序的负载。我列一下我检测到的内容:

  • 有很是多的文件须要被周期性地扫描:每一个目录包含1~2百万个文件
  • 扫描算法很快,能够并行化
  • 处理一个文件至少须要1s,甚至上升到2s或3s
  • 处理文件时,性能瓶颈主要是CPU
  • CPU利用率必须可调,根据一天时间的不一样而使用不一样的负载配置。

我须要这样一个线程池,它的大小在程序运行的时候经过负载配置来设置。我倾向于根据负载策略建立一个固定大小的线程池。因为线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其余资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

int cpus = Runtime.getRuntime().availableProcessors();

int maxThreads = cpus * scaleFactor;

maxThreads = (maxThreads > 0 ? maxThreads : 1);


并且,我将使用ThreadPoolExecutor.CallerRunsPolicy做为拒绝策略。为何?由于当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会中止,转而去处理一个文件,处理结束后立刻又会扫描目录。而后我须要使用阻塞队列建立一个ThreadPoolExecutor,能够限制提交的任务数。为何?是这样,扫描算法执行很快,很快就产生庞大数量须要处理的文件。数量有多庞大呢?很难预测,由于变更太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

下面是建立executor的代码:

ExecutorService executorService =

    new ThreadPoolExecutor(

        maxThreads, // core thread pool size

        maxThreads, // maximum thread pool size

        1, // time to wait before resizing pool

        TimeUnit.MINUTES,

        new ArrayBlockingQueue<Runnable>(maxThreads, true),

        new ThreadPoolExecutor.CallerRunsPolicy());

下面是程序的框架(极其简化版):

// scanning loop: fake scanning

while (!dirsToProcess.isEmpty()) {

    File currentDir = dirsToProcess.pop();



    // listing children

    File[] children = currentDir.listFiles();



    // processing children

    for (final File currentFile : children) {

        // if it's a directory, defer processing

        if (currentFile.isDirectory()) {

            dirsToProcess.add(currentFile);

            continue;

        }



        executorService.submit(new Runnable() {

            @Override

            public void run() {

                try {

                    // if it's a file, process it

                    new ConvertTask(currentFile).perform();

                } catch (Exception ex) {

                    // error management logic

                }

            }

        });

    }

}



// ...

// wait for all of the executor threads to finish

executorService.shutdown();

try {

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {

        // pool didn't terminate after the first try

        executorService.shutdownNow();

    }



    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {

        // pool didn't terminate after the second try

    }

} catch (InterruptedException ex) {

    executorService.shutdownNow();

    Thread.currentThread().interrupt();

}

总结

看到了吧,Java并发API很是简单易用,十分灵活,也很强大。真但愿我多年前能够多花点功夫写一个这样简单的程序。这样我就能够在几小时内解决由传统单线程组件所引起的扩展性问题。

相关文章
相关标签/搜索