本文已收录 【修炼内功】跃迁之路
Java8中新增的Stream,相信使用过的同窗都已经感觉到了它的便利,容许你以声明性的方式处理集合,而不用去作繁琐的for-loop/while-loop,而且能够以极低的成本并行地处理集合数据java
若是须要从菜单中筛选出卡路里在400如下的菜品,并按卡路里排序后,输出菜品名称segmentfault
在java8以前,须要进行两次显示迭代,而且还须要借助中间结果存储并发
List<Menu> lowCaloricDishes = new LinkedList<>(); // 按照热量值进行筛选 for(Dish dish : dishes) { if (dish.getCalories() < 400) { lowCaloricDishes.add(dish); } } // 按照热量进行排序 lowCaloricDishes.sort(new Comparator<Dish>() { @Override public int compare(Dish d1, Dish d2) { return d1.getCalories().compareTo(d2.getCalories); } }) // 提取名称 List<String> lowCaloricDishesName = new LinkedList<>(); for(Dish dish : lowCaloricDishes) { lowCaloricDishesName.add(dish.getName()); }
若是使用Stream API,只须要app
List<String> lowCaloricDishesName = dishes.parallelStream() // 开启并行处理 .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选 .sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序 .map(Dish::getName) // 提取名称 .collect(Collectors.toList()); // 将结果存入List
甚至,能够写出更复杂的功能框架
Map<Integer, List<String>> lowCaloricDishesNameGroup = dishes.parallelStream() // 开启并行处理 .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选 .sorted(comparing(Dish::getCalories)) // 按照热量进行排序 .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组 Dish::getCalories, Collectors.mapping(Dish::getName, Collectors.toList()) ));
是否是很是简洁,而且愈加形似SQL
如此简洁的API是如何实现的?中间过程是如何衔接起来的?每一步都会进行一次迭代么,须要中间结果存储么?并行处理是怎么作到的?less
Stream使用一种相似SQL语句的方式,提供对集合运算的高阶抽象,能够将其处理的元素集合看作一种数据流,流在管道中传输,数据在管道节点上进行处理,好比筛选、排序、聚合等ide
数据流在管道中通过中间操做(intermediate operation)处理,由终止操做(terminal operation)获得前面处理的结果
和以往的集合操做不一样,Stream操做有两个基础特征:函数
pipelining:中间操做
会返回流对象,多个操做最终串联成一个管道,管道并不直接操做数据,最终由终止操做
触发数据在管道中的流动及处理,并收集最终的结果oop
Stream的实现使用流水线( pipelining)的方式巧妙的避免了屡次迭代,其基本思想是在 一次迭代中尽量多的执行用户指定的操做
Stream操做分为两类:中间操做
及终止操做
ui
中间操做:将流一层层的进行处理,并向下一层进行传递,如 filter
map
sorted
等
中间操做又分为有状态(stateful)及无状态(stateless)
sorted
filter
map
终止操做:触发数据的流动,并收集结果,如collect
findFirst
forEach
等
终止操做又分为短路操做(short-circuiting)及非短路操做(non-short-circuiting)
anyMatch
findFirst
等collect
max
等Stream采用某种方式记录用户每一步的操做,当用户调用终止操做时将以前记录的操做叠加到一块儿,尽量地在一次迭代中所有执行掉,那么
Stream中使用Stage的概念来描述一个完整的操做,并用某种实例化后的PipelineHelper来表明Stage,将各Pipeline按照前后顺序链接到一块儿,就构成了整个流水线
与Stream相关类和接口的继承关系以下图
Head用于表示第一个Stage,该Stage不包含任何操做
StatelessOp和StatefulOp分别表示无状态和有状态的Stage
使用Collection.stream
Arrays.stream
或Stream.of
等接口会生成Head
,其内部均采用StreamSupport.stream
方法,将原始数据包装为Spliterator
存放在Stage中
Head StatelessOp StatefulOp三个操做实例化会指向其父类AbstractPipeline
对于Head
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */ AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
其会将包装为Spliterator的原始数据存放在Stage中,并将自身存放在sourceStage中
对于StatelessOp及StatefulOp
/** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
每个Stage都会存放原始的sourceStage,即Head
经过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操做
以上已经解决了如何记录操做的问题,想要让pipeline运行起来,须要一种将全部操做叠加到一块儿的方案
因为前面的Stage并不知道后面的Stage致使须要执行何种操做,只有当前Stage自己知道该如何执行本身包含的动做,这就须要某种协议来协调相邻Stage之间的调用关系
Stream类库采用了Sink接口来协调各Stage之间的关系
interface Sink<T> extends Consumer<T> { /** * Resets the sink state to receive a fresh data set. This must be called * before sending any data to the sink. After calling {@link #end()}, * you may call this method to reset the sink for another calculation. * @param size The exact size of the data to be pushed downstream, if * known or {@code -1} if unknown or infinite. * * <p>Prior to this call, the sink must be in the initial state, and after * this call it is in the active state. * * 开始遍历前调用,通知Sink作好准备 */ default void begin(long size) {} /** * Indicates that all elements have been pushed. If the {@code Sink} is * stateful, it should send any stored state downstream at this time, and * should clear any accumulated state (and associated resources). * * <p>Prior to this call, the sink must be in the active state, and after * this call it is returned to the initial state. * * 全部元素遍历完成后调用,通知Sink没有更多元素了 */ default void end() {} /** * Indicates that this {@code Sink} does not wish to receive any more data. * * @implSpec The default implementation always returns false. * * @return true if cancellation is requested * * 是否能够结束操做,可让短路操做尽早结束 */ default boolean cancellationRequested() {} /** * Accepts a value. * * @implSpec The default implementation throws IllegalStateException. * * @throws IllegalStateException if this sink does not accept values * * 遍历时调用,接收的一个待处理元素,并对元素进行处理 * Stage把本身包含的操做和回调方法封装到该方法里,前一个Stage只须要调用当前Stage.accept方法便可 */ default void accept(T value) {} }
Sink的四个接口方法经常相互协做,共同完成计算任务
实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法,下面结合具体源码来理解Stage是如何将自身的操做包装秤Sink,以及Sink是如何将处理结果转发给下一个Sink的
无状态Stage,Stream.map
// Stream.map 将生成一个新Stream public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { // 该方法将回调函数(处理逻辑)包装成Sink @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { // 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink downstream.accept(mapper.apply(u)); } }; } }; }
上述代码逻辑很是简单,接下来能够看一下有状态Stage,Stream.sorted
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { // 存放用于排序的元素 private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); // 建立用于存放排序元素的列表 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); } @Override public void end() { // 只有在接收到全部元素后才开始排序 list.sort(comparator); downstream.begin(list.size()); // 排序完成后,将数据传递给下游Sink if (!cancellationWasRequested) { // 下游Sink不包含短路操做,将数据依次传递给下游Sink list.forEach(downstream::accept); } else { // 下游Sink包含短路操做 for (T t : list) { // 对于每个元素,都要询问是否能够结束处理 if (downstream.cancellationRequested()) break; // 将元素传递给下游Sink downstream.accept(t); } } // 告知下游Sink数据传递完毕 downstream.end(); list = null; } @Override public void accept(T t) { // 依次将须要排序的元素加入到临时列表中 list.add(t); } }
Stream.sorted会在接收到全部元素以后再进行排序,在此以后才开始将数据依次传递给下游Sink
Sink就如齿轮,每一步的操做逻辑是封装在Sink中的,那各Sink是如何串联咬合在一块儿的,首个Sink又是如何启动来触发整个pipeline执行的?
结束操做(TerminalOp)以后不能再有别的操做,结束操做会建立一个包装了本身操做的Sink,这个Sink只处理数据而不会将数据传递到下游Sink
TerminalOp的类图很是简单
FindOp: 用于查找,如findFirst
,findAny
,生成FindSink
ReduceOp: 用于规约,如reduce
collect
,生成ReduceSink
MatchOp: 用于匹配,如allMatch
anyMatch
,生成MatchSink
ForEachOp: 用于遍历,如forEach
,生成ForEachSink
在调用Stream的终止操做时,会执行AbstractPipeline.evaluate
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各类终止操做 */) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */ : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */ }
最终会根据是否并行执行TerminalOp中不一样的的evaluate方法,在TerminalOp的evaluate方法中会调用helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get()
来串联各层Sink,触发pipeline,并获取最终结果,那TerminalOp究竟是如何串联各层Sink的?
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; }
其中玄机尽在warpSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); // AbstractPipeline.this,最后一层Stage for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { // 从下游向上游遍历,不断包装Sink sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */); } return (Sink<P_IN>) sink; }
还记得opWrapSink
么?它会返回一个新的Sink,实现begin
end
accept
等方法,当前Stage的处理逻辑封装在其中,并将处理后的结果传递给下游的Sink
这样,便将从开始到结束的全部操做都包装到了一个Sink里,执行这个Sink就至关于执行首个Sink,并带动全部下游的Sink,使整个pipeline运行起来
有了包含全部操做的Sink,如何执行Sink呢?wrapAndCopyInto
中还有一个copyInto
方法
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { // 不包含短路操做 // 1. begin wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 2. 遍历调用 sink.accept spliterator.forEachRemaining(wrappedSink); // 3. end wrappedSink.end(); } else { // 包含短路操做 copyIntoWithCancel(wrappedSink, spliterator); } } final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; } // 1. begin wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 2. 遍历调用 sink.accept // 每一次遍历都询问cancellationRequested结果 // 若是cancellationRequested为true,则中断遍历 p.forEachWithCancel(spliterator, wrappedSink); // 3. end wrappedSink.end(); }
copyInto
会根据不一样的状况依次
sink.bigin
sink.accept
若是包含短路操做,则每次遍历都须要询问cancellationRequested,适时中断遍历
sink.end
各层Stage经过Sink协议将全部的操做串联到一块儿,遍历原始数据并执行,终止操做会建立一个包装了本身操做的TerminalSink,该Sink中处理最终的数据并作数据收集(若是须要),每一种TerminalSink中均会提供一个获取最终数据的方法
TerminalOp经过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中
@Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator)/* 执行各Sink */.get()/* 获取最终数据 */; }
使用Collection.parallelStream
或Stream.parallel
等方法能够将当前的流标记
为并发,从新来看AbstractPipeline.evaluate
,该方法会在终止操做时被执行
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各类终止操做 */) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */ : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */ }
若是被标记为sequential
,则会调用TerminalOp.evaluateSequential
,evaluateSequential的调用过程上文已经讲述的很清楚
若是被标记为parallel
,则会调用TerminalOp.evaluateParallel
,对于该方法不一样的TerminalOp会有不一样的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每个单元作上述evaluateSequential
相似的动做,最后将每个单元计算的结果依次整合,获得最终结果
默认状况下,ForkJoin的线程数即为机器的CPU核数,若是想自定义Stream并行执行的线程数,能够参考Custom Thread Pools In Java 8 Parallel Streams
在将原始数据进行拆分的时候,拆分的策略是什么?拆分的粒度又是什么(拆分到什么程度)?
还记得上文所说,原始数据是如何存放的么?Spliterator
(可分迭代器 splitable iterator),不管使用何种API,均会将原始数据封装为Spliterator
后存放在Stage中,在进行parallel计算时,对原始数据的拆分以及拆分粒度都是基于Spliterator
的,和Iterator同样,Spliterator也用于遍历数据源中的数据,但它是专门为并行执行而设计的
public interface Spliterator<T> { /** * 若是还有元素须要遍历,则遍历该元素并执行action,返回true,不然返回false */ boolean tryAdvance(Consumer<? super T> action); /** * 若是能够,则将一部分元素划分出去,构造另外一个Spliterator,使得两个Spliterator能够并行处理 */ Spliterator<T> trySplit(); /** * 估算还有多少元素须要遍历 */ long estimateSize(); /** * 遍历全部未遍历的元素 */ default void forEachRemaining(Consumer<? super T> action) { do { } while (tryAdvance(action)); } }
动图以下
在使用Stream parallel时,若是默认Spliterator的拆分逻辑不能知足你的需求,即可以自定义Spliterator,具体示例能够参考《Java 8 in Action》中『7.3.2 实现你本身的Spliterator』
Head
会生成一个不包含任何操做的Stage,并将原始数据Spliterator
存放在sourceStage
中StagelessOp
StagefulOp
将当前操做封装在Sink中,生成一个新的Stage,并使用双链表结构将先后的Stage连接在一块儿,Sink用于调用当前指定的操做处理数据,并将处理后的结果传递给下游SinkTerminalOp
生成一个TerminalSink
,从下游向上游遍历Stage,不断包装各Stage中的Sink,最终生成一个串联了全部操做的TerminalSink,适时调用该Sink的begin
accept
end
等方法,触发整个pipeline的数据流动及处理,最终调用TerminalSink的get
方法,获取最终结果(若是有)JavaLambdaInternals - 6-Stream Pipelines)
java8实战:Stream执行原理