Java 8 并行流:必备技巧

Java 8 并行流(parallel stream)采用共享线程池,对性能形成了严重影响。能够包装流来调用本身的线程池解决性能问题。html

问题java

Java 8 的并行流可让咱们相对轻松地执行并行任务。git

myList.parallelStream.map(obj -> longRunningOperation()) 
复制代码

可是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join
池来完成上述功能,该池是全部并行流共享的。默认状况,fork/join
池会为每一个处理器分配一个线程。假设你有一台16核的机器,这样你就只能建立16个线程。对 CPU
密集型的任务来讲,这样是有意义的,由于你的机器确实只能执行16个线程。可是真实状况下,不是全部的任务都是 CPU 密集型的。例如:github

myList.parallelStream  
   .map(this::retrieveFromA)
   .map(this::processUsingB)
   .forEach(this::saveToC)
 
myList.parallelStream  
   .map(this::retrieveFromD)
   .map(this::processUsingE)
   .forEach(this::saveToD)
复制代码

这两个流很大程度上是受限于IO操做,因此会等待其余系统。但这两个流使用相同的(小)线程池,所以会相互等待而被阻塞。这个很是很差,能够改进。咱们以一个流为例:bash

final List<Integer> firstRange = buildIntRange();  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
});
复制代码

完整的代码能够在gist上查看。异步

在执行期间,我获取了一份线程dump的文件。这是相关的线程(在个人Macbook上):性能

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
复制代码

如今,我要并行的执行这两个并行流ui

Runnable firstTask = () -> {  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
 
Runnable secondTask = () -> {  
   secondRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
// run threads
复制代码

完整的代码能够在gist上查看。this

此次咱们再看一下线程dump文件:spa

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
复制代码

正如你所见,结果是同样的。咱们只使用了4个线程。

一种变通方案

正如我所提到的,JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,咱们能够看到:

若是合适,安排一个异步执行的任务到当前正在运行的池中。若是任务不在inForkJoinPool()中,也能够调用ForkJoinPool.commonPool()获取新的池来执行。

让我试一试……

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    firstRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) { }
    });
});
 
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
forkJoinPool2.submit(() -> {  
    secondRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
        }
    });
});
复制代码

完整的代码能够在gist上查看。

如今,咱们再次查看线程池:

ForkJoinPool-1-worker-1 
ForkJoinPool-1-worker-2 
ForkJoinPool-1-worker-3 
ForkJoinPool-1-worker-4 
ForkJoinPool-2-worker-1 
ForkJoinPool-2-worker-2 
ForkJoinPool-2-worker-3 
ForkJoinPool-1-worker-4
复制代码

由于咱们建立本身的线程池,因此能够避免共享线程池,若是有须要,甚至能够分配比处理机数量更多的线程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>); 
复制代码


欢迎关注知乎专栏《跟上Java8》,分享优秀的Java8中文指南、教程,同时欢迎投稿高质量的文章。


原文连接: tobyhobson 翻译: ImportNew.com - paddx
译文连接: www.importnew.com/16801.html
相关文章
相关标签/搜索