Java8 Stream源码分析

Stream

Stream是在Java SE 8 API添加的用于加强集合的操做接口,可让你以一种声明的方式处理集合数据。将要处理的集合看做一种流的建立者,将集合内部的元素转换为流而且在管道中传输, 而且能够在管道的节点上进行处理, 好比筛选,排序,聚合等。元素流在管道中通过中间操做(intermediate operation)的处理,最后由最终操做(terminal operation)获得前面处理的结果。Stream的继承关系图以下,且容我慢慢抽丝剥茧细细道来。java

 

ReferencePipeline

 

过滤,转换,聚合,归约
 Stream.of("one", "two", "three", "four")
       .filter(e -> e.length() > 3)
       .peek(e -> System.out.println("Filtered value: " + e))
       .map(String::toUpperCase)
       .peek(e -> System.out.println("Mapped value: " + e))
       .collect(Collectors.toList());
 

在没有Stream以前,咱们对集合数据的处理到可能是外部遍历,而后作数据的聚合用算,排序,merge等等。这属于OO思想,在引入Java SE 8引入FP以后,FP的操做能够提升Java程序员的生产力,,基于类型推断的lambda表达式能够 让程序员写出高效率、干净、简洁的代码。能够避免冗余的代码。根据给定的集合操做经过stream()方法建立初始流,配合map(),flatMap(),filter()对集合数据进行过滤,转换。api调用我这里就很少说了。直接从源码入手,看上图最核心的就是类为AbstractPipelineReferencePipelineSink接口.AbstractPipeline抽象类是整个Stream中流水线的高度抽象了源头sourceStage,上游previousStage,下游nextStage,定义evaluate结束方法,而ReferencePipeline则是抽象了过滤,转换,聚合,归约等功能,每个功能的添加实际上能够理解为卷心菜,菜心就是源头,每一次加入一个功能就至关于从新长出一片叶子包住了菜心,最后一个功能集成完毕以后整颗卷心菜就长大了。而Sink接口呢负责把整个流水线串起来,而后在执行聚合,归约时候调AbstractPipeline抽象类的evaluate结束方法,根据是不是并行执行,调用不一样的结束逻辑,若是不是并行方法则执行terminalOp.evaluateSequential不然就执行terminalOp.evaluateParallel,非并行执行模式下则是执行的是AbstractPipeline抽象类的wrapAndCopyInto方法去调用copyInto,调用前会先执行一下wrapSink,用于剥开这个咱们在流水线上产生的卷心菜。从下游向上游去遍历AbstractPipeline,而后包装到Sink,而后在copyInto方法内部迭代执行对应的方法。最后完成调用,node

并行执行其实是构建一个ForkJoinTask并执行invoke去提交到ForkJoinPool线程池。程序员

BaseStream
BaseStream

 

流的基本接口,该接口制定流能够支持无序,顺序,并行的,Stream实现了BaseStream接口。api

  • Iterator iterator();数组

    外部迭代器app

  • Spliterator spliterator();ide

    用于建立一个内部迭代器函数

  • isParallel性能

    用于判断该stream是不是并行的ui

  • S sequential();

    标识该stream建立是顺序执行的

  • S parallel();

    标识该stream建立是并行的,须要使用ForkJoinPool

  • S unordered();

    标识该stream建立是无序的

  • S onClose(Runnable closeHandler);

    当stream关闭的时候执行一个方法回调去关闭流。

PipelineHelper
PipelineHelper

 

该抽象类主要定义了操做管道的核心方法,而且能收集到流管道内的全部信息。如经过TerminalOp#evaluateParallel用于执行并行流操做,经过TerminalOp#evaluateSequential执行顺序流的操做。

  • abstract StreamShape getSourceShape();

    用于定义该流的中元素的原型,返回一个枚举值,用于切片操做limit或者skip

    枚举值取值范围 {REFERENCE:引用类型元素,INT_VALUE:int类型元素,LONG_VALUE:long类型元素,DOUBLE_VALUE:double类型元素}

  • abstract int getStreamAndOpFlags();

    用于获取流的中元素的原型和全部操做的组合,Stream中全部的定义流类型和操做的指令都包含在`StreamOpFlag``枚举类中。先看下补码 掩码的运算

     位掩码的经常使用CRUD操做
          a&~b: 清除标志位b;
          a|b : 添加标志位b;
          a&b : 取出标志位b;
          a^b : 取出a与b的不一样部分;
      下面是对应流的标志位对应的表。
      /*
       * Characteristics belong to certain types, see the Type enum. Bit masks for
       * the types are constructed as per the following table:
       *
       *                        DISTINCT  SORTED  ORDERED  SIZED  SHORT_CIRCUIT
       *          SPLITERATOR      01       01       01      01        00
       *               STREAM      01       01       01      01        00
       *                   OP      11       11       11      10        01
       *          TERMINAL_OP      00       00       10      00        01
       * UPSTREAM_TERMINAL_OP      00       00       10      00        00
       *
       * 01 = set/inject SET_BITS=0b01设置指令
       * 10 = clear CLEAR_BITS=0b10清除指令
       * 11 = preserve PRESERVE_BITS=0b11保存指令
       */
      构造函数
       private StreamOpFlag(int position, MaskBuilder maskBuilder) {
          this.maskTable = maskBuilder.build();
          // Two bits per flag
          position *= 2;
          this.bitPosition = position;
          this.set = SET_BITS << position;
          this.clear = CLEAR_BITS << position;
          this.preserve = PRESERVE_BITS << position;
      }
    • StreamOpFlag.DISTINCT

      DISTINCT(0,set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))

      output:StreamOpFlag.DISTINCT: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=0, set=1, clear=2, preserve=3)

    ok,咱们知道了StreamOpFlag.DISTINCT的[设置]偏移位是1,16进制表示:0x00000001。当getStreamAndOpFlags返回的包含IS_DISTINCT也就是0x00000001就表示对于流中遇到的X,Y元素{@code!X.equals(Y)}。对应的是包含Spliterator.DISTINCT,标识该stream已是distinct的了。

    • StreamOpFlag.SIZED

      SIZED(3, set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP))

      output:StreamOpFlag.SIZED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=2, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=6, set=64, clear=128, preserve=192)【0x00000040】->[Spliterator.SIZED]

    表示遍历或拆分前从estimateSize()返回的值的特征值表示一个有限大小,在没有修改源结构的状况下,该值表示完整遍历流中元素数量的精确值,若是stream没有SIZED|SUBSIZED属性,则能够将estimateSize返回为Long.MAX_VALUE.这说明这个stream的estimateSize计算很复杂或自己就是一个infinite的steam流。这样设置后,性能上会差一些,可是,不会对sorted方法产生影响。若是要对流进行并行操做,实现自定义的Spliterator时,则须要重写trySplit()方法和long estimateSize()方法。经过拆分Spliterator加入fork/join线程池中,而后实现并行的处理。

    • StreamOpFlag.SORTED

      SORTED(1, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))

      output:StreamOpFlag.SORTED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=2, set=4, clear=8, preserve=12) 【0x00000004】->[Spliterator.SORTED]

    表示流里顺序遵循定义的排序顺序。若是包含该属性,方法getComparator()返回关联的比较器,或者返回null,若是设置了该属性而且,方法getComparator()返回null,这代表改流已经排好序了,若是方法getComparator()返回不为null,那么在fromCharacteristics方法处,该SORTED属性会被取消掉。若是流里面的全部元素都是实现了Comparable,那排序顺序就是按它们的天然顺序,在sorted(x->{...})方法执行能够传一个lambda进去。若是有值传输进去,那么都回按照该lambda对该流进行排序

    • StreamOpFlag.ORDERED

      ORDERED(2, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP) .clear(Type.UPSTREAM_TERMINAL_OP))

      output:StreamOpFlag.ORDERED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=2, UPSTREAM_TERMINAL_OP=2}, bitPosition=4, set=16, clear=32, preserve=48)【0x00000010】->[Spliterator.ORDERED]

    表示该流中的元素已经定义了顺序。包含了ORDERED属性,是拆分器保证trySplit拆分元素的强制前置条件,tryAdvance方法也会按定义了的顺序逐个元素进行拆分,forEachRemaining方法也按定义了的顺序执行内部迭代操做。通常集合的顺序是升序。可是对于基于哈希的集合,例如HashSet,不保证顺序。因此应该在不进行交换场景的并行计算中强制保证排序约束。

    • StreamOpFlag.SHORT_CIRCUIT

      SHORT_CIRCUIT(12, set(Type.OP).set(Type.TERMINAL_OP))

      output:StreamOpFlag.SHORT_CIRCUIT: StreamOpFlag(maskTable={SPLITERATOR=0, STREAM=0, OP=1, TERMINAL_OP=1, UPSTREAM_TERMINAL_OP=0}, bitPosition=24, set=16777216, clear=33554432, preserve=50331648)【0x01000000】->[表示操做可能使流短路]

    表示操做可能使流短路

  • abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);

将此时间的管道内的元素应用到提供的Spliterator,并将结果发送到提供的接收器sink里

  • abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);

用于输出返回值的大小。

  • abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);

用于将从Spliterator得到的元素推入提供的接收器中Sink。若是已知流管道中有短路阶段(包含StreamOpflag#SHORT_CURRENT),则在每一个元素以后执行一下Sink#cancellationRequested(),若是返回请求true,则执行终止。这个方法被实现以后须要遵照Sink的协议即:Sink#begin->Sink#accept->Sink->end

  • abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);

用于将从Spliterator得到的元素推入提供的接收器中Sink。在每一个元素以后执行一下Sink#cancellationRequested(),若是返回请求true,则执行终止。这个方法被实现以后须要遵照Sink的协议即:Sink#begin->Sink#accept->Sink->end

  • abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);

该方法主要用于包装sink,从下游向上游去遍历AbstractPipeline,而后包装到一个Sink内,用于而后在copyInto方法内部迭代执行对应的方法。

  • abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,IntFunction<P_OUT[]> generator);

用于构造一个节点Builder,转换为数组去处理数组类型和PipelineHelper定义的输出类型同样。

  • abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<P_OUT[]> generator);

该方法将源拆分器应用到管道内的全部元素。针对数组处理。若是管道没有中间(filter,map)操做,而且源由一个节点支持(源头),则该节点将被返回(内部遍历而后返回)。这减小了由有状态操做和返回数组的终端操做组成的管道的复制.例如:stream.sorted().toArray();该方法对应到AbstractPipeline内部,代码以下:

 @Override
  @SuppressWarnings("unchecked")
  final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
                                    boolean flatten,
                                    IntFunction<E_OUT[]> generator) {
      if (isParallel()) {
          // @@@ Optimize if op of this pipeline stage is a stateful op
          return evaluateToNode(this, spliterator, flatten, generator);
      }
      else {
          Node.Builder<E_OUT> nb = makeNodeBuilder(
                  exactOutputSizeIfKnown(spliterator), generator);
          return wrapAndCopyInto(nb, spliterator).build();
      }
  }
AbstractPipeline

“管道”类的抽象基类,是流接口及其原始专门化的核心实现。用来表示流管道的初始部分,封装流源和零个或多个中间操做。对于顺序流和没有状态中间操做的并行流、并行流,管道中数据的处理是在一次“阻塞”全部操做的过程当中完成的也就是最后才去处理。对于具备状态操做的并行流,执行被分红多个段,其中每一个状态操做标记一个段的结束,每一个段被单独评估,结果被用做下一个段的输入。上述全部状况,都是达到终端操做才开始处理源数据。

  • AbstractPipeline(Supplier> source,
    int sourceFlags, boolean parallel)

建立源Source stage 第一个参数指定一个Supplier接口(工厂模式,只能生成Spliterator<?>的对象,根据传入的lambda实现,<? extends Spliterator<?泛型的PECS原则了解一下。)

  • AbstractPipeline(Spliterator<?> source,
    int sourceFlags, boolean parallel)

建立源Source stage 第一个参数制定这个拆分器,和上面的构造方式同样,直接分析一下这个方法:

 AbstractPipeline(Spliterator<?> source,
                    int sourceFlags, boolean parallel) {
       this.previousStage = null;
       this.sourceSpliterator = source;
       this.sourceStage = this;
       this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
       // The following is an optimization of:
       // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
       this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
       this.depth = 0;
       this.parallel = parallel;
   }
 

建立Stream 源阶段的时候previousStagenullthis.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;用于设置当前阶段的标识位。this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; 添加源阶段的对流的操做标识,这个combinedFlags是流在整个管道内部全部操做的合集,在最后的规约操做的时候去解析出来。

  • AbstractPipeline(AbstractPipeline previousStage, int opFlags)

根据上游建立下游Pipeline

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
      if (previousStage.linkedOrConsumed)
          throw new IllegalStateException(MSG_STREAM_LINKED);
      previousStage.linkedOrConsumed = true;
      previousStage.nextStage = this;

      this.previousStage = previousStage;
      this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
      this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
      this.sourceStage = previousStage.sourceStage;
      if (opIsStateful())
          sourceStage.sourceAnyStateful = true;
      this.depth = previousStage.depth + 1;
  }
 

this.sourceStage = previousStage.sourceStage;,用于上游和下游关联,this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);将上游的操做标识位添加到本阶段的操做标识位中。depth记录整个管道的中间操做数。

  • final R evaluate(TerminalOp<E_OUT, R> terminalOp)

进行终端汇聚计算。执行最终的计算,获得结果,根据是不是并行执行,调用不一样的结束逻辑,若是不是并行方法则执行terminalOp.evaluateSequential不然就执行terminalOp.evaluateParallel

  • final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)

处理流转换数组。

final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
        if (isParallel() && previousStage != null && opIsStateful()) {
            depth = 0;
            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
        }
        else {
            return evaluate(sourceSpliterator(0), true, generator);
        }
    }
 

转换数组的时候,若是是并行流而且不是源阶段,并且调用过sorted||limit||skip||distinct这些有状态的操做以后,这里是个模版方法调用。其实是经过调用DistinctOps||SortedOps||SliceOps这些实现的opEvaluateParallel方法,提交到ForkJoin线程池来转换数组。串行执行的时候直接执行evaluate(sourceSpliterator(0), true, generator);

  • evaluate(sourceSpliterator(0), true, generator);

具体的执行方法,用于吧管道内部的输出结果放到Node中。

@Override
    @SuppressWarnings("unchecked")
    final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
                                      boolean flatten,
                                      IntFunction<E_OUT[]> generator) {
        if (isParallel()) {
            // @@@ Optimize if op of this pipeline stage is a stateful op
            return evaluateToNode(this, spliterator, flatten, generator);
        }
        else {
            Node.Builder<E_OUT> nb = makeNodeBuilder(
                    exactOutputSizeIfKnown(spliterator), generator);
            return wrapAndCopyInto(nb, spliterator).build();
        }
    }
 @Override
    final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
                                        Spliterator<P_IN> spliterator,
                                        boolean flattenTree,
                                        IntFunction<P_OUT[]> generator) {
        return Nodes.collect(helper, spliterator, flattenTree, generator);
    }
    // Nodes.collect方法
    public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> helper,
                                                    Spliterator<P_IN> spliterator,
                                                    boolean flattenTree,
                                                    IntFunction<P_OUT[]> generator) {
        long size = helper.exactOutputSizeIfKnown(spliterator);
        if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
            if (size >= MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(BAD_SIZE);
            P_OUT[] array = generator.apply((int) size);
            new SizedCollectorTask.OfRef<>(spliterator, helper, array).invoke();
            return node(array);
        } else {
            Node<P_OUT> node = new CollectorTask.OfRef<>(helper, generator, spliterator).invoke();
            return flattenTree ? flatten(node, generator) : node;
        }
    }
 

若是是源是并行流的状况,以ReferencePipeline引用管道来看主要执行的是 return Nodes.collect(helper, spliterator, flattenTree, generator);,该collect方法内部根据切割器有无Spliterator.SUBSIZED肯定了生成的Node的长度,主要工做是建立一个Task提交到线程池。而后调用invoke拿到结果。示例代码Arrays.asList("2","22","222").parallelStream().skip(2).toArray(); 整个流程以下:

image

 

串行执行示例代码Arrays.asList("2","22","222").stream().skip(2).toArray(); 整个流程以下:

 

image

 

  • final Spliterator<E_OUT> sourceStageSpliterator()

获取Stream源头设置的拆分器,若是设置有则返回而且把源拆分器置空,若是有Supplier则调用get方法返回拆分器而且把源拆分器置空。

  • public final S sequential()

设置为串行流 ,设置源的paraller属性为false。终态方法不容许重写

  • public final S sequential()

设置为并行流 ,设置源的paraller属性为true。终态方法不容许重写

  • public void close()

关闭管道的方法,在关闭的时候会把管道使用标志设置为false,拆分器设置为null,若是源的回调关闭Job存在不为null时则invoker这个回调Job。

  • public S onClose(Runnable closeHandler)

用于注册关闭的回调job,在调用close的时候用于去执行这个回调job。

  • public Spliterator<E_OUT> spliterator()

sourceStageSpliterator方法同样的功能,只不过不是终态方法,能够重写用于自定义的拓展。

  • public final boolean isParallel()

用于盘带你当前管道是不是并行流。

  • final int getStreamFlags()

获取流的标志和Stream的包含的全部操做。

  • private Spliterator<?> sourceSpliterator(int terminalFlags) {

获取源拆分器,和sourceStageSpliterator方法同样的功能,针对是并行流时候,而且是建立Stream阶段的话有中间状态,会组合流标志和操做构建拆分器。若是传入的操做码不等于0,那么则添加到拆分器的操做码中。

  • final StreamShape getSourceShape()

输出Stream源的类型。(引用 OR int OR Double OR Long)

  • final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator)

获取指望的size,若是拆分器若是有SIZE标志,调用拆分器的getExactSizeIfKnown方法,不然返回-1。

  • final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)

封装整个管道的阶段,包装在Sink中。把每个阶段串联起来。包装在Sink内部的downstream.

wrapAndCopyInto代码执行流程以下:

 

image

 

看完三件事❤️

若是你以为这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:

  1. 点赞,转发,有大家的 『点赞和评论』,才是我创造的动力。

  2. 关注公众号 『 java烂猪皮 』,不按期分享原创知识。

  3. 同时能够期待后续文章ing🚀

 

相关文章
相关标签/搜索