Java 8 的 Stream 使得代码更加简洁易懂,本篇文章深刻分析 Java Stream 的工做原理,并探讨 Steam 的性能问题。java
Java 8 集合中的 Stream 至关于高级版的 Iterator,它能够经过 Lambda 表达式对集合进行各类很是便利、高效的聚合操做(Aggregate Operation),或者大批量数据操做 (Bulk Data Operation)。git
Stream的聚合操做与数据库SQL的聚合操做sorted、filter、map等相似。咱们在应用层就能够高效地实现相似数据库SQL的聚合操做了,而在数据操做方面,Stream不只能够经过串行的方式实现数据操做,还能够经过并行的方式处理大批量数据,提升数据的处理效率。github
官方将 Stream 中的操做分为两大类:数据库
中间操做(Intermediate operations)
,只对操做进行了记录,即只会返回一个流,不会进行计算操做。终结操做(Terminal operations)
,实现了计算操做。中间操做又能够分为:数组
无状态(Stateless)操做
,元素的处理不受以前元素的影响。有状态(Stateful)操做
,指该操做只有拿到全部元素以后才能继续下去。终结操做又能够分为:安全
短路(Short-circuiting)
操做,指遇到某些符合条件的元素就能够获得最终结果非短路(Unshort-circuiting)
操做,指必须处理完全部元素才能获得最终结果。操做分类详情以下图所示:服务器
Stream 相关类和接口的继承关系以下图所示:app
最顶端的接口类,定义了流的基本接口方法,最主要的方法为 spliterator、isParallel。框架
最顶端的接口类。定义了流的经常使用方法,例如 map、filter、sorted、limit、skip、collect 等。less
ReferencePipeline 是一个结构类,定义内部类组装了各类操做流,定义了Head
、StatelessOp
、StatefulOp
三个内部类,实现了 BaseStream 与 Stream 的接口方法。
Sink 接口定义了 Stream 之间的操做行为,包含 begin()
、end()
、cancellationRequested()
、accpt()
四个方法。ReferencePipeline最终会将整个 Stream 流操做组装成一个调用链,而这条调用链上的各个 Stream 操做的上下关系就是经过 Sink 接口协议来定义实现的。
Stream 的基础用法就再也不叙述了,这里从一段代码开始,分析 Stream 的工做原理。
@Test public void testStream() { List<String> names = Arrays.asList("kotlin", "java", "go"); int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length) .max(Comparator.naturalOrder()).orElse(-1); System.out.println(maxLength); }
当使用 Stream 时,主要有 3 部分组成,下面一一讲解。
调用 names.stream()
方法,会初次加载 ReferencePipeline 的 Head 对象,此时为加载数据源操做。
java.util.Collection#stream
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
StreamSupport 类中的 stream 方法,初始化了一个 ReferencePipeline的 Head 内部类对象。
java.util.stream.StreamSupport#stream(java.util.Spliterator
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
接着为 filter(name -> name.length() <= 4).mapToInt(String::length)
,是中间操做,分为无状态中间操做 StatelessOp 对象和有状态操做 StatefulOp 对象,此时的 Stage 并无执行,而是经过AbstractPipeline 生成了一个中间操做 Stage 链表。
java.util.stream.ReferencePipeline#filter
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
java.util.stream.ReferencePipeline#map
@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
能够看到 filter 和 map 方法都返回了一个新的 StatelessOp
对象。new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将先后的 Stage 联系起来,生成一个 Stage 链表:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
最后为 max(Comparator.naturalOrder())
,是终结操做,会生成一个最终的 Stage,经过这个 Stage 触发以前的中间操做,从最后一个Stage开始,递归产生一个Sink链。
java.util.stream.ReferencePipeline#max
@Override public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { return reduce(BinaryOperator.maxBy(comparator)); }
最终调用到 java.util.stream.AbstractPipeline#wrapSink,这个方法会调用 opWrapSink 生成一个 Sink 链表,对应到本文的例子,就是 filter 和 map 操做。
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
在上面 opWrapSink 上断点调试,发现最终会调用到本例中的 filter 和 map 操做。
wrapAndCopyInto 生成 Sink 链表后,会经过 copyInfo 方法执行 Sink 链表的具体操做。
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
上面的核心代码是:
spliterator.forEachRemaining(wrappedSink);
java.util.Spliterators.ArraySpliterator#forEachRemaining
@Override public void forEachRemaining(Consumer<? super T> action) { Object[] a; int i, hi; // hoist accesses and checks from loop if (action == null) throw new NullPointerException(); if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { do { action.accept((T)a[i]); } while (++i < hi); } }
断点调试,能够发现首先进入了 filter 的 Sink,其中 accept 方法的入参是 list 中的第一个元素“kotlin”(代码中的 3 个元素是:"kotlin", "java", "go")。filter 的传入是一个 Lambda 表达式:
filter(name -> name.length() <= 4)
显然这个第一个元素“kotlin”的 predicate 是不会进入的。
对于第二个元素“java”,predicate.test 会返回 true(字符串“java”的长度<=4),则会进入 map 的 accept 方法。
本次调用 accept 方法时,empty 为 false,会将 map 后的结果(int 类型的 4)赋值给 t。
public static <T> TerminalOp<T, Optional<T>> makeRef(BinaryOperator<T> operator) { Objects.requireNonNull(operator); class ReducingSink implements AccumulatingSink<T, Optional<T>, ReducingSink> { private boolean empty; private T state; public void begin(long size) { empty = true; state = null; } @Override public void accept(T t) { if (empty) { empty = false; state = t; } else { state = operator.apply(state, t); } } …… } }
对于第三个元素“go”,也会进入 accept 方法,此时 empty 为 true, map 后的结果(int 类型的 2)会与上次的结果 4 经过自定义的比较器相比较,存入符合结果的值。
public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) { Objects.requireNonNull(comparator); return (a, b) -> comparator.compare(a, b) >= 0 ? a : b; }
本文代码中的 max 传入的比较器为:
max(Comparator.naturalOrder())
至此会返回 int 类型的 4。
上面的例子是串行处理的,若是要改为并行也很简单,只须要在 stream() 方法后加上 parallel()
就能够了,并行代码能够写成:
@Test public void testStream() { List<String> names = Arrays.asList("kotlin", "java", "go"); int maxLength = names.stream().parallel().filter(name -> name.length() <= 4) .map(String::length).max(Comparator.naturalOrder()).orElse(-1); System.out.println(maxLength); }
Stream 的并行处理在执行终结操做以前,跟串行处理的实现是同样的。而在调用终结方法以后,实现的方式就有点不太同样,会调用 TerminalOp 的 evaluateParallel 方法进行并行处理。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
核心是使用了 ForkJoin 框架,对 Stream 处理进行分片,最终会调用下面的代码,这里就不展开分析了。
java.util.stream.AbstractTask#compute
@Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); }
@Test public void testParallelWrong() { List<Integer> parallelList = new ArrayList<>(); IntStream.range(0, 1000).boxed().parallel().filter(i -> i % 2 == 1) .forEach(parallelList::add); System.out.println(parallelList.size()); }
上面的输出结果会常常小于500,这是由于 parallelList 的类型是 ArrayList,并非线程安全的,在执行 add 操做时,可能正好遇上扩容或者线程被占用,会覆盖其余线程的赋好的值。
@Test public void testParallelRight() { List<Integer> parallelList = IntStream.range(0, 1000).boxed().parallel() .filter(i -> i % 2 == 1).collect(Collectors.toList()); System.out.println(parallelList.size()); }
下面的文章参考自:JavaLambdaInternals/8-Stream Performance.md,侵删。
为保证测试结果然实可信,咱们将JVM运行在-server
模式下,测试数据在GB量级,测试机器采用常见的商用服务器,配置以下:
OS | CentOS 6.7 x86_64 |
CPU | Intel Xeon X5675, 12M Cache 3.06 GHz, 6 Cores 12 Threads |
内存 | 96GB |
JDK | java version 1.8.0_91, Java HotSpot(TM) 64-Bit Server VM |
性能测试并非容易的事,Java性能测试更费劲,由于虚拟机对性能的影响很大,JVM对性能的影响有两方面:
-XX:+UseConcMarkSweepGC -Xms10G -Xmx10G
-XX:CompileThreshold=10000
。Stream并行执行时用到ForkJoinPool.commonPool()
获得的线程池,为控制并行度咱们使用Linux的taskset
命令指定JVM可用的核数。
测试数据由程序随机生成。为防止一次测试带来的抖动,测试4次求出平均时间做为运行时间。
测试内容:找出整型数组中的最小值。对比for循环外部迭代和Stream API内部迭代性能。
测试程序IntTest,测试结果以下图:
图中展现的是for循环外部迭代耗时为基准的时间比值。分析以下:
并行迭代性能跟可利用的核数有关,上图中的并行迭代使用了所有12个核,为考察使用核数对性能的影响,咱们专门测试了不一样核数下的Stream并行迭代效果:
分析,对于基本类型:
以上两个测试说明,对于基本类型的简单迭代,Stream串行迭代性能更差,但多核状况下Stream迭代时性能较好。
再来看对象的迭代效果。
测试内容:找出字符串列表中最小的元素(天然顺序),对比for循环外部迭代和Stream API内部迭代性能。
测试程序StringTest,测试结果以下图:
结果分析以下:
再来单独考察Stream并行迭代效果:
分析,对于对象类型:
以上两个测试说明,对于对象类型的简单迭代,Stream串行迭代性能更差,但多核状况下Stream迭代时性能较好。
从实验1、二的结果来看,Stream串行执行的效果都比外部迭代差(不少),是否是说明Stream真的不行了?先别下结论,咱们再来考察一下更复杂的操做。
测试内容:给定订单列表,统计每一个用户的总交易额。对比使用外部迭代手动实现和Stream API之间的性能。
咱们将订单简化为<userName, price, timeStamp>
构成的元组,并用Order
对象来表示。测试程序ReductionTest,测试结果以下图:
分析,对于复杂的归约操做:
再来考察并行度对并行效果的影响,测试结果以下:
分析,对于复杂的归约操做:
以上两个实验说明,对于复杂的归约操做,Stream串行归约效果好于手动归约,在多核状况下,并行归约效果更佳。咱们有理由相信,对于其余复杂的操做,Stream API也能表现出类似的性能表现。
上述三个实验的结果能够总结以下:
因此,若是出于性能考虑,1. 对于简单操做推荐使用外部迭代手动实现,2. 对于复杂操做,推荐使用Stream API, 3. 在多核状况下,推荐使用并行Stream API来发挥多核优点,4.单核状况下不建议使用并行Stream API。
coding 笔记、点滴记录,之后的文章也会同步到公众号(Coding Insight)中,但愿你们关注_
代码和思惟导图在 GitHub 项目中,欢迎你们 star!