咱们已经学会如何使用Stream API,用起来真的很爽,但简洁的方法下面彷佛隐藏着无尽的秘密,如此强大的API是如何实现的呢?java
好比Pipeline是怎么执行的,每次方法调用都会致使一次迭代吗?自动并行又是怎么作到的,线程个数是多少?本节咱们学习Stream流水线的原理,这是Stream实现的关键所在。数组
首先回顾一下容器执行Lambda表达式的方式,以ArrayList.forEach()
方法为例,具体代码以下:安全
// ArrayList.forEach() public void forEach(Consumer<? super E> action) { ... for (int i=0; modCount == expectedModCount && i < size; i++) { action.accept(elementData[i]);// 回调方法 } ... }
咱们看到ArrayList.forEach()
方法的主要逻辑就是一个for循环,在该for循环里不断调用action.accept()
回调方法完成对元素的遍历。数据结构
这彻底没有什么新奇之处,回调方法在Java GUI的监听器中普遍使用。Lambda表达式的做用就是至关于一个回调方法,这很好理解。app
Stream API中大量使用Lambda表达式做为回调方法,但这并非关键。理解Stream咱们更关心的是另外两个问题:流水线和自动并行。使用Stream或许很容易写入以下形式的代码:less
int longestStringLengthStartingWithA = strings.stream() .filter(s -> s.startsWith("A")) .mapToInt(String::length) .max();
上述代码求出以字母A开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样作可以实现功能,但效率上确定是没法接受的。ide
类库的实现着使用流水线(Pipeline)的方式巧妙的避免了屡次迭代,其基本思想是在一次迭代中尽量多的执行用户指定的操做。为讲解方便咱们汇总了Stream的全部操做。函数
Stream上的全部操做分为两类:中间操做和结束操做,中间操做只是一种标记,只有结束操做才会触发实际计算。中间操做又能够分为无状态的(Stateless)和有状态的(Stateful),无状态中间操做是指元素的处理不受前面元素的影响,而有状态的中间操做必须等到全部元素处理以后才知道最终结果。学习
好比排序是有状态操做,在读取全部元素以前并不能肯定排序结果;结束操做又能够分为短路操做和非短路操做,短路操做是指不用处理所有元素就能够返回结果,好比找到第一个知足条件的元素。之因此要进行如此精细的划分,是由于底层对每一种状况的处理方式不一样。ui
为了更好的理解流的中间操做和终端操做,能够经过下面的两段代码来看他们的执行过程。
IntStream.range(1, 10) .peek(x -> System.out.print("\nA" + x)) .limit(3) .peek(x -> System.out.print("B" + x)) .forEach(x -> System.out.print("C" + x));
输出为:
A1B1C1 A2B2C2 A3B3C3
中间操做是懒惰的,也就是中间操做不会对数据作任何操做,直到遇到了最终操做。而最终操做,都是比较热情的。他们会往前回溯全部的中间操做。也就是当执行到最后的forEach操做的时候,它会回溯到它的上一步中间操做,上一步中间操做,又会回溯到上上一步的中间操做,...,直到最初的第一步。
第一次forEach执行的时候,会回溯peek 操做,而后peek会回溯更上一步的limit操做,而后limit会回溯更上一步的peek操做,顶层没有操做了,开始自上向下开始执行,输出:A1B1C1 第二次forEach执行的时候,而后会回溯peek 操做,而后peek会回溯更上一步的limit操做,而后limit会回溯更上一步的peek操做,顶层没有操做了,开始自上向下开始执行,输出:A2B2C2
... 当第四次forEach执行的时候,而后会回溯peek 操做,而后peek会回溯更上一步的limit操做,到limit的时候,发现limit(3)这个job已经完成,这里就至关于循环里面的break操做,跳出来终止循环。
再来看第二段代码:
IntStream.range(1, 10) .peek(x -> System.out.print("\nA" + x)) .skip(6) .peek(x -> System.out.print("B" + x)) .forEach(x -> System.out.print("C" + x));
输出为:
A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9
第一次forEach执行的时候,会回溯peek操做,而后peek会回溯更上一步的skip操做,skip回溯到上一步的peek操做,顶层没有操做了,开始自上向下开始执行,执行到skip的时候,由于执行到skip,这个操做的意思就是跳过,下面的都不要执行了,也就是就至关于循环里面的continue,结束本次循环。输出:A1
第二次forEach执行的时候,会回溯peek操做,而后peek会回溯更上一步的skip操做,skip回溯到上一步的peek操做,顶层没有操做了,开始自上向下开始执行,执行到skip的时候,发现这是第二次skip,结束本次循环。输出:A2
...
第七次forEach执行的时候,会回溯peek操做,而后peek会回溯更上一步的skip操做,skip回溯到上一步的peek操做,顶层没有操做了,开始自上向下开始执行,执行到skip的时候,发现这是第七次skip,已经大于6了,它已经执行完了skip(6)的job了。此次skip就直接跳过,继续执行下面的操做。输出:A7B7C7
...直到循环结束。
仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(好比数组,容器等)。
具体说来,就是调用filter()
方法后当即执行,选出全部以A开头的字符串并放到一个列表list1中,以后让list1传递给mapToInt()
方法并当即执行,生成的结果放到list2中,最后遍历list2找出最大的数字做为最终结果。程序的执行流程如如所示:
这样作实现起来很是简单直观,但有两个明显的弊端:
这些弊端使得效率底下,根本没法接受。若是不使用Stream API咱们都知道上述代码该如何在一次迭代中完成,大体是以下形式:
int longest = 0; for(String str : strings){ if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串 int len = str.length();// 2. mapToInt(), 转换成长度 longest = Math.max(len, longest);// 3. max(), 保留最长的长度 } }
采用这种方式咱们不但减小了迭代次数,也避免了存储中间结果,显然这就是流水线,由于咱们把三个操做放在了一次迭代当中。只要咱们事先知道用户意图,老是可以采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。
如何在没法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。
咱们大体可以想到,应该采用某种方式记录用户每一步的操做,当用户调用结束操做时将以前记录的操做叠加到一块儿在一次迭代中所有执行掉。沿着这个思路,有几个问题须要解决:
注意这里使用的是“操做(operation)”一词,指的是“Stream中间操做”的操做,不少Stream操做会须要一个回调函数(Lambda表达式),所以一个完整的操做是<数据来源,操做,回调函数>构成的三元组。
Stream中使用Stage的概念来描述一个完整的操做,并用某种实例化后的PipelineHelper来表明Stage,将具备前后顺序的各个Stage连到一块儿,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。
图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操做;StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操做。
Stream流水线组织结构示意图以下:
图中经过Collection.stream()
方法获得Head也就是stage0,紧接着调用一系列的中间操做,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一块儿,构成整个流水线,因为每一个Stage都记录了前一个Stage和本次的操做以及回调函数,依靠这种结构就能创建起对数据源的全部操做。这就是Stream记录操做的方式。
以上只是解决了操做记录的问题,要想让流水线起到应有的做用咱们须要一种将全部操做叠加到一块儿的方案。你可能会以为这很简单,只须要从流水线的head开始依次执行每一步的操做(包括回调函数)就好了。
这听起来彷佛是可行的,可是你忽略了前面的Stage并不知道后面Stage到底执行了哪一种操做,以及回调函数是哪一种形式。换句话说,只有当前Stage自己才知道该如何执行本身包含的动做。这就须要有某种协议来协调相邻Stage之间的调用关系。
这种协议由Sink接口完成,Sink接口包含的方法以下表所示:
有了上面的协议,相邻Stage之间调用就很方便了,每一个Stage都会将本身的操做封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()
方法便可,并不须要知道其内部是如何处理的。
固然对于有状态的操做,Sink的begin()
和end()
方法也是必须实现的。好比Stream.sorted()是一个有状态的中间操做,其对应的Sink.begin()方法可能建立一个盛放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。
对于短路操做,Sink.cancellationRequested()
也是必须实现的,好比Stream.findFirst()是短路操做,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法经常相互协做,共同完成计算任务。
实际上Stream API内部实现的的本质,就是如何重写Sink的这四个接口方法。
有了Sink对操做的包装,Stage之间的调用问题就解决了,执行时只须要从流水线的head开始对数据源依次调用每一个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能够了。一种可能的Sink.accept()方法流程是这样的:
void accept(U u){ 1. 使用当前Sink包装的回调函数处理u 2. 将处理结果传递给流水线下游的Sink }
Sink接口的其余几个方法也是按照这种[处理->转发]的模型实现。
下面咱们结合具体例子看看Stream的中间操做是如何将自身的操做包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:
// Stream.map(),调用该方法将产生一个新的Stream public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { ... return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override /*opWripSink()方法返回由回调函数包装而成Sink*/ Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) { return new Sink.ChainedReference<P_OUT, R>(downstream) { @Override public void accept(P_OUT u) { R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink } }; } }; }
上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个Sink当中。因为Stream.map()是一个无状态的中间操做,因此map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将获得一个包装了当前回调函数的Sink。
再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操做,由于读取全部元素以前是无法获得最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操做封装成Sink的呢?sorted()一种可能封装的Sink代码以下:
// Stream.sort()方法用到的Sink实现 class RefSortingSink<T> extends AbstractRefSortingSink<T> { private ArrayList<T> list;// 存放用于排序的元素 RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { super(downstream, comparator); } @Override public void begin(long size) { ... // 建立一个存放排序元素的列表 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); } @Override public void end() { list.sort(comparator);// 只有元素所有接收以后才能开始排序 downstream.begin(list.size()); if (!cancellationWasRequested) {// 下游Sink不包含短路操做 list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink } else {// 下游Sink包含短路操做 for (T t : list) {// 每次都调用cancellationRequested()询问是否能够结束处理。 if (downstream.cancellationRequested()) break; downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink } } downstream.end(); list = null; } @Override public void accept(T t) { list.add(t);// 1. 使用当前Sink包装动做处理t,只是简单的将元素添加到中间列表当中 } }
上述代码完美的展示了Sink的四个接口方法是如何协同工做的:
Sink完美封装了Stream每一步操做,并给出了[处理->转发]的模式来叠加操做。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。
是什么启动这一连串的操做呢?也许你已经想到了启动的原始动力就是结束操做(Terminal Operation),一旦调用某个结束操做,就会触发整个流水线的执行。
结束操做以后不能再有别的操做,因此结束操做不会建立新的流水线阶段(Stage),直观的说就是流水线的链表不会在日后延伸了。
结束操做会建立一个包装了本身操做的Sink,这也是流水线中最后一个Sink,这个Sink只须要处理数据而不须要将结果传递给下游的Sink(由于没有下游)。对于Sink的[处理->转发]模型,结束操做的Sink就是调用链的出口。
咱们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段便可。
但Stream类库的设计者没有这么作,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)
方法来获得Sink,该方法的做用是返回一个新的包含了当前Stage表明的操做以及可以将结果传递给downstream的Sink对象。为何要产生一个新对象而不是返回一个Sink字段?
这是由于使用opWrapSink()能够将当前操做与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,由于stage0表明数据源,不包含操做),就能够获得一个表明了流水线上全部操做的Sink,用代码表示就是这样:
// AbstractPipeline.wrapSink() // 从下游向上游不断包装Sink。若是最初传入的sink表明结束操做, // 函数返回时就能够获得一个表明了流水线上全部操做的Sink。 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { ... for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
如今流水线上从开始到结束的全部的操做都被包装到了一个Sink里,执行这个Sink就至关于执行整个流水线,执行Sink的代码以下:
// AbstractPipeline.copyInto(), 对spliterator表明的数据执行wrappedSink表明的操做。 final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { ... if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历 spliterator.forEachRemaining(wrappedSink);// 迭代 wrappedSink.end();// 通知遍历结束 } ... }
上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,而后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
最后一个问题是流水线上全部操做都执行后,用户所须要的结果(若是有)在哪里?首先要说明的是否是全部的Stream结束操做都须要返回结果,有些操做只是为了使用其反作用(Side-effects),好比使用Stream.forEach()
方法将结果打印出来就是常见的使用反作用的场景(事实上,除了打印以外其余场景都应避免使用反作用),对于真正须要返回结果的结束操做结果存在哪里呢?
特别说明:反作用不该该被滥用,也许你会以为在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都没法保证,由于Stream可能会并行执行。大多数使用反作用的地方均可以使用归约操做更安全和有效的完成。
// 错误的收集方式 ArrayList<String> results = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s)); // Unnecessary use of side-effects! // 正确的收集方式 List<String>results = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList()); // No side-effects!
回到流水线执行结果的问题上来,须要返回结果的流水线结果存在哪里呢?这要分不一样的状况讨论,下表给出了各类有返回结果的Stream结束操做。
本文详细介绍了Stream流水线的组织方式和执行过程,学习本文将有助于理解原理并写出正确的Stream代码,同时打消你对Stream API效率方面的顾虑。如你所见,Stream API实现如此巧妙,即便咱们使用外部迭代手动编写等价代码,也未必更加高效。
注:留下本文所用的JDK版本,以便有考究癖的人考证:
$ java -version java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101-b13) Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
欢迎你们关注个人公众号【风平浪静如码】,海量Java相关文章,学习资料都会在里面更新,整理的资料也会放在里面。
以为写的还不错的就点个赞,加个关注呗!点关注,不迷路,持续更新!!!