Java 8 并行流(parallel stream)采用共享线程池,对性能形成了严重影响。能够包装流来调用本身的线程池解决性能问题。html
问题java
Java 8 的并行流可让咱们相对轻松地执行并行任务。git
myList.parallelStream.map(obj -> longRunningOperation())
复制代码
可是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join
池来完成上述功能,该池是全部并行流共享的。默认状况,fork/join
池会为每一个处理器分配一个线程。假设你有一台16核的机器,这样你就只能建立16个线程。对 CPU
密集型的任务来讲,这样是有意义的,由于你的机器确实只能执行16个线程。可是真实状况下,不是全部的任务都是 CPU 密集型的。例如:github
myList.parallelStream
.map(this::retrieveFromA)
.map(this::processUsingB)
.forEach(this::saveToC)
myList.parallelStream
.map(this::retrieveFromD)
.map(this::processUsingE)
.forEach(this::saveToD)
复制代码
这两个流很大程度上是受限于IO操做,因此会等待其余系统。但这两个流使用相同的(小)线程池,所以会相互等待而被阻塞。这个很是很差,能够改进。咱们以一个流为例:bash
final List<Integer> firstRange = buildIntRange();
firstRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
复制代码
完整的代码能够在gist上查看。异步
在执行期间,我获取了一份线程dump的文件。这是相关的线程(在个人Macbook上):性能
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
复制代码
如今,我要并行的执行这两个并行流ui
Runnable firstTask = () -> {
firstRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
};
Runnable secondTask = () -> {
secondRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
};
// run threads
复制代码
完整的代码能够在gist上查看。this
此次咱们再看一下线程dump文件:spa
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
复制代码
正如你所见,结果是同样的。咱们只使用了4个线程。
一种变通方案
正如我所提到的,JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,咱们能够看到:
若是合适,安排一个异步执行的任务到当前正在运行的池中。若是任务不在inForkJoinPool()中,也能够调用ForkJoinPool.commonPool()获取新的池来执行。
让我试一试……
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
firstRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) { }
});
});
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
}
});
});
复制代码
完整的代码能够在gist上查看。
如今,咱们再次查看线程池:
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-4
复制代码
由于咱们建立本身的线程池,因此能够避免共享线程池,若是有须要,甚至能够分配比处理机数量更多的线程。
ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);
复制代码
欢迎关注知乎专栏《跟上Java8》,分享优秀的Java8中文指南、教程,同时欢迎投稿高质量的文章。
原文连接: tobyhobson 翻译: ImportNew.com - paddx
译文连接: www.importnew.com/16801.html