流能够并行执行,以增长大量输入元素的运行时性能。并行流ForkJoinPool经过静态ForkJoinPool.commonPool()方法使用公共可用的流。底层线程池的大小最多使用五个线程 - 具体取决于可用物理CPU核心的数量:html
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3
在个人机器上,公共池初始化为默认值为3的并行度。经过设置如下JVM参数能够减少或增长此值:java
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持建立并行元素流的方法parallelStream()。或者,您能够在给定流上调用中间方法parallel(),以将顺序流转换为并行流。api
为了评估并行流的并行执行行为,下一个示例将有关当前线程的信息打印出来:数组
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
经过调查调试输出,咱们应该更好地理解哪些线程实际用于执行流操做:oracle
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
如您所见,并行流利用公共中的全部可用线程ForkJoinPool来执行流操做。输出在连续运行中可能不一样,由于实际使用的特定线程的行为是非肯定性的。函数
让咱们经过一个额外的流操做来扩展该示例:性能
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
结果可能最初看起来很奇怪:线程
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
彷佛sort只在主线程上顺序执行。实际上,sort在并行流上使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,若是排序将按顺序或并行执行,则此方法决定数组的长度:调试
若是指定数组的长度小于最小粒度,则使用适当的Arrays.sort方法对其进行排序。code
回到reduce一节的例子。咱们已经发现组合器函数只是并行调用,而不是顺序流调用。让咱们看看实际涉及哪些线程:
List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
控制台输出显示累加器和组合器函数在全部可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,并行流能够为具备大量输入元素的流带来良好的性能提高。但请记住,某些并行流操做reduce
,collect
须要额外的计算(组合操做),这在顺序执行时是不须要的。
此外,咱们了解到全部并行流操做共享相同的JVM范围ForkJoinPool。所以,您可能但愿避免实施慢速阻塞流操做,由于这可能会减慢严重依赖并行流的应用程序的其余部分。