深刻理解Java Stream流水线

前面咱们已经学会如何使用Stream API,用起来真的很爽,但简洁的方法下面彷佛隐藏着无尽的秘密,如此强大的API是如何实现的呢?Pipeline是怎么执行的,每次方法调用都会致使一次迭代吗?自动并行又是怎么作到的,线程个数是多少?本节咱们学习Stream流水线的原理,这是Stream实现的关键所在。html

首先回顾一下容器执行Lambda表达式的方式,以ArrayList.forEach()方法为例,具体代码以下:java

// ArrayList.forEach()
public void forEach(Consumer<? super E> action) {
    ...
    for (int i=0; modCount == expectedModCount && i < size; i++) {
        action.accept(elementData[i]);// 回调方法
    }
    ...
}

咱们看到ArrayList.forEach()方法的主要逻辑就是一个for循环,在该for循环里不断调用action.accept()回调方法完成对元素的遍历。这彻底没有什么新奇之处,回调方法在Java GUI的监听器中普遍使用。Lambda表达式的做用就是至关于一个回调方法,这很好理解。git

Stream API中大量使用Lambda表达式做为回调方法,但这并非关键。理解Stream咱们更关心的是另外两个问题:流水线和自动并行。使用Stream或许很容易写入以下形式的代码:github

int longestStringLengthStartingWithA
        = strings.stream()
              .filter(s -> s.startsWith("A"))
              .mapToInt(String::length)
              .max();

上述代码求出以字母A开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样作可以实现功能,但效率上确定是没法接受的。类库的实现着使用流水线(Pipeline)的方式巧妙的避免了屡次迭代,其基本思想是在一次迭代中尽量多的执行用户指定的操做。为讲解方便咱们汇总了Stream的全部操做。数组

Stream操做分类
中间操做(Intermediate operations) 无状态(Stateless) unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
有状态(Stateful) distinct() sorted() sorted() limit() skip()
结束操做(Terminal operations) 非短路操做 forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
短路操做(short-circuiting) anyMatch() allMatch() noneMatch() findFirst() findAny()

Stream上的全部操做分为两类:中间操做和结束操做,中间操做只是一种标记,只有结束操做才会触发实际计算。中间操做又能够分为无状态的(Stateless)和有状态的(Stateful),无状态中间操做是指元素的处理不受前面元素的影响,而有状态的中间操做必须等到全部元素处理以后才知道最终结果,好比排序是有状态操做,在读取全部元素以前并不能肯定排序结果;结束操做又能够分为短路操做和非短路操做,短路操做是指不用处理所有元素就能够返回结果,好比找到第一个知足条件的元素。之因此要进行如此精细的划分,是由于底层对每一种状况的处理方式不一样。安全

一种直白的实现方式

Stream_pipeline_naive

仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(好比数组,容器等)。具体说来,就是调用filter()方法后当即执行,选出全部以A开头的字符串并放到一个列表list1中,以后让list1传递给mapToInt()方法并当即执行,生成的结果放到list2中,最后遍历list2找出最大的数字做为最终结果。程序的执行流程如如所示:数据结构

这样作实现起来很是简单直观,但有两个明显的弊端:app

  1. 迭代次数多。迭代次数跟函数调用的次数相等。
  2. 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销没法接受。

这些弊端使得效率底下,根本没法接受。若是不使用Stream API咱们都知道上述代码该如何在一次迭代中完成,大体是以下形式:less

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串
        int len = str.length();// 2. mapToInt(), 转换成长度
        longest = Math.max(len, longest);// 3. max(), 保留最长的长度
    }
}

采用这种方式咱们不但减小了迭代次数,也避免了存储中间结果,显然这就是流水线,由于咱们把三个操做放在了一次迭代当中。只要咱们事先知道用户意图,老是可以采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。如何在没法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。ide

Stream流水线解决方案

咱们大体可以想到,应该采用某种方式记录用户每一步的操做,当用户调用结束操做时将以前记录的操做叠加到一块儿在一次迭代中所有执行掉。沿着这个思路,有几个问题须要解决:

  1. 用户的操做如何记录?
  2. 操做如何叠加?
  3. 叠加以后的操做如何执行?
  4. 执行后的结果(若是有)在哪里?

>> 操做如何记录

Java_stream_pipeline_classes

注意这里使用的是“操做(operation)”一词,指的是“Stream中间操做”的操做,不少Stream操做会须要一个回调函数(Lambda表达式),所以一个完整的操做是<数据来源,操做,回调函数>构成的三元组。Stream中使用Stage的概念来描述一个完整的操做,并用某种实例化后的PipelineHelper来表明Stage,将具备前后顺序的各个Stage连到一块儿,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。

还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操做;StatelessOpStatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操做。

Stream流水线组织结构示意图以下:

Stream_pipeline_example

图中经过Collection.stream()方法获得Head也就是stage0,紧接着调用一系列的中间操做,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一块儿,构成整个流水线,因为每一个Stage都记录了前一个Stage和本次的操做以及回调函数,依靠这种结构就能创建起对数据源的全部操做。这就是Stream记录操做的方式。

>> 操做如何叠加

以上只是解决了操做记录的问题,要想让流水线起到应有的做用咱们须要一种将全部操做叠加到一块儿的方案。你可能会以为这很简单,只须要从流水线的head开始依次执行每一步的操做(包括回调函数)就好了。这听起来彷佛是可行的,可是你忽略了前面的Stage并不知道后面Stage到底执行了哪一种操做,以及回调函数是哪一种形式。换句话说,只有当前Stage自己才知道该如何执行本身包含的动做。这就须要有某种协议来协调相邻Stage之间的调用关系。

这种协议由Sink接口完成,Sink接口包含的方法以下表所示:

方法名 做用
void begin(long size) 开始遍历元素以前调用该方法,通知Sink作好准备。
void end() 全部元素遍历完成以后调用,通知Sink没有更多的元素了。
boolean cancellationRequested() 是否能够结束操做,可让短路操做尽早结束。
void accept(T t) 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把本身包含的操做和回调方法封装到该方法里,前一个Stage只须要调用当前Stage.accept(T t)方法就好了。

有了上面的协议,相邻Stage之间调用就很方便了,每一个Stage都会将本身的操做封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法便可,并不须要知道其内部是如何处理的。固然对于有状态的操做,Sink的begin()end()方法也是必须实现的。好比Stream.sorted()是一个有状态的中间操做,其对应的Sink.begin()方法可能建立一个乘放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操做,Sink.cancellationRequested()也是必须实现的,好比Stream.findFirst()是短路操做,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法经常相互协做,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法

有了Sink对操做的包装,Stage之间的调用问题就解决了,执行时只须要从流水线的head开始对数据源依次调用每一个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能够了。一种可能的Sink.accept()方法流程是这样的:

void accept(U u){
    1. 使用当前Sink包装的回调函数处理u
    2. 将处理结果传递给流水线下游的Sink
}

Sink接口的其余几个方法也是按照这种[处理->转发]的模型实现。下面咱们结合具体例子看看Stream的中间操做是如何将自身的操做包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:

// Stream.map(),调用该方法将产生一个新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回调函数包装而成Sink*/
        Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
            return new Sink.ChainedReference<P_OUT, R>(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
                    downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
                }
            };
        }
    };
}

上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个Sink当中。因为Stream.map()是一个无状态的中间操做,因此map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将获得一个包装了当前回调函数的Sink。

再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操做,由于读取全部元素以前是无法获得最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操做封装成Sink的呢?sorted()一种可能封装的Sink代码以下:

// Stream.sort()方法用到的Sink实现
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用于排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 建立一个存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素所有接收以后才能开始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操做
            list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
        }
        else {// 下游Sink包含短路操做
            for (T t : list) {// 每次都调用cancellationRequested()询问是否能够结束处理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用当前Sink包装动做处理t,只是简单的将元素添加到中间列表当中
    }
}

上述代码完美的展示了Sink的四个接口方法是如何协同工做的:

  1. 首先beging()方法告诉Sink参与排序的元素个数,方便肯定中间结果容器的的大小;
  2. 以后经过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历全部元素;
  3. 最后end()方法告诉Sink全部元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
  4. 若是下游的Sink是短路操做,将结果传递给下游时不断询问下游cancellationRequested()是否能够结束处理。

>> 叠加以后的操做如何执行

Stream_pipeline_Sink

Sink完美封装了Stream每一步操做,并给出了[处理->转发]的模式来叠加操做。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操做呢?也许你已经想到了启动的原始动力就是结束操做(Terminal Operation),一旦调用某个结束操做,就会触发整个流水线的执行。

结束操做以后不能再有别的操做,因此结束操做不会建立新的流水线阶段(Stage),直观的说就是流水线的链表不会在日后延伸了。结束操做会建立一个包装了本身操做的Sink,这也是流水线中最后一个Sink,这个Sink只须要处理数据而不须要将结果传递给下游的Sink(由于没有下游)。对于Sink的[处理->转发]模型,结束操做的Sink就是调用链的出口。

咱们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段便可。但Stream类库的设计者没有这么作,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来获得Sink,该方法的做用是返回一个新的包含了当前Stage表明的操做以及可以将结果传递给downstream的Sink对象。为何要产生一个新对象而不是返回一个Sink字段?这是由于使用opWrapSink()能够将当前操做与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,由于stage0表明数据源,不包含操做),就能够获得一个表明了流水线上全部操做的Sink,用代码表示就是这样:

// AbstractPipeline.wrapSink()
// 从下游向上游不断包装Sink。若是最初传入的sink表明结束操做,
// 函数返回时就能够获得一个表明了流水线上全部操做的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

如今流水线上从开始到结束的全部的操做都被包装到了一个Sink里,执行这个Sink就至关于执行整个流水线,执行Sink的代码以下:

// AbstractPipeline.copyInto(), 对spliterator表明的数据执行wrappedSink表明的操做。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍历结束
    }
    ...
}

上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,而后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,参阅),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。

>> 执行后的结果在哪里

最后一个问题是流水线上全部操做都执行后,用户所须要的结果(若是有)在哪里?首先要说明的是否是全部的Stream结束操做都须要返回结果,有些操做只是为了使用其反作用(Side-effects),好比使用Stream.forEach()方法将结果打印出来就是常见的使用反作用的场景(事实上,除了打印以外其余场景都应避免使用反作用),对于真正须要返回结果的结束操做结果存在哪里呢?

特别说明:反作用不该该被滥用,也许你会以为在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都没法保证,由于Stream可能会并行执行。大多数使用反作用的地方均可以使用归约操做更安全和有效的完成。

// 错误的收集方式
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正确的收集方式
List<String>results =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

回到流水线执行结果的问题上来,须要返回结果的流水线结果存在哪里呢?这要分不一样的状况讨论,下表给出了各类有返回结果的Stream结束操做。

返回类型 对应的结束操做
boolean anyMatch() allMatch() noneMatch()
Optional findFirst() findAny()
归约结果 reduce() collect()
数组 toArray()
  1. 对于表中返回boolean或者Optional的操做(Optional是存放 一个 值的容器)的操做,因为值返回一个值,只须要在对应的Sink中记录这个值,等到执行结束时返回就能够了。
  2. 对于归约操做,最终结果放在用户调用时指定的容器中(容器类型经过收集器指定)。collect(), reduce(), max(), min()都是归约操做,虽然max()和min()也是返回一个Optional,但事实上底层是经过调用reduce()方法实现的。
  3. 对于返回是数组的状况,毫无疑问的结果会放在数组当中。这么说固然是对的,但在最终返回数组以前,结果实际上是存储在一种叫作Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,而且一个叶子节点能够存放多个元素。这样作是为了并行执行方便。关于Node的具体结构,咱们会在下一节探究Stream如何并行执行时给出详细说明。

结语

本文详细介绍了Stream流水线的组织方式和执行过程,学习本文将有助于理解原理并写出正确的Stream代码,同时打消你对Stream API效率方面的顾虑。如你所见,Stream API实现如此巧妙,即便咱们使用外部迭代手动编写等价代码,也未必更加高效。

本文github地址

相关文章
相关标签/搜索