JAVA8之数据流Stream

  在JAVA8以前的传统编程方式,若是咱们须要操做一个集合数据,须要使用集合提供的API,经过一个循环去获取集合的元素,这种访问数据的方式会使代码显得臃肿,JAVA8新引入的Stream类,用于从新封装集合数据,经过使用流式Stream代替经常使用集合数组、list和map的遍历操做能够极大的提升效率。java

 

1、Stream的组成编程

数据源(Source) + 0个或多个中间操做(intermediate operation)和终止操做(terminal operation)api

数据源:数据源头,可为数组、Collection、I/O资源和生成函数数组

 

2、构造流(Stream)app

Arrays构造流less

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
        return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

由源码可知Arrays类由传入的数组生成一个Stream是委托StreamSupport来构建的,下面看下StreamSupport的源码ide

 

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
}

 

StreamSupport首先对spliterator参数进行判空,而后把它做为参数经过调用ReferencePipeline的内部类Head的构造函数生成一个ReferencePipeline.Head实例返回,经过对Head类源码的阅读可知他是ReferencePipeline的一个内部子类函数

 

static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        
        Head(Spliterator<?> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }

}

ReferencePipeline(Spliterator<?> source,
                      int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
}
AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
}

Head构造函数内部调用父类ReferencePipeline的构造函数,而ReferencePipeline的构造函数则调用父类AbstractPipeline构造函数最终完成Stream的构建,阅读源码咱们可知构造流的核心是Spliterator,而最初持有他的是AbstractPipeline,如今咱们来看下Spliterator接口源码分析

 

下面是源码对Spliterator的注释ui

/*
* An object for traversing and partitioning elements of a source.  The source
* of elements covered by a Spliterator could be, for example, an array, a
* {@link Collection}, an IO channel, or a generator function.
**/

翻译过来的意思是该接口是用于对数据源进行遍历和分区,即Spliterator对象封装了对数据源和分区的能力,接口声明的方法以下:

tryAdvance - 单元素遍历

trySplit - 分区抽象

forEachRemaining - 批量遍历

Stream - 实现原理

estimateSize - 默认实现,返回估计的大小

getExactSizeIfKnown - 默认实现,返回元素集合的确切大小

characteristics - 默认实现,返回当前spliterator源数据的一组特征值

hasCharacteristics - 默认实现,是否源数据是否包含该特征值

getComparator - 默认实现,若是该Spliterator操做的数据源是有序的,那么返回他的Comparator

 

好了,如今咱们基本清楚Spliterator的做用和他在流中的定位,如今让咱们回过头看下Arrays根据数组构造Stream流的JDK源码,深刻到AbstractPipeline咱们能够看到,Stream构造流是经过委托StreamSupport实现的,而核心是构建一个ArraySpliterator对象,可见,构造流(Stream)的核心就是构造一个Spliterator

 

Collection构造流

由上述对Arrays构造流分析可知构建流的核心是Spliterator,咱们直接查看JAVA8中Collection构造流源码

 default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
 }
 default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
 }

 

Collection容器构造流与Arrays同样也是委托StreamSupport,代码设计的好的一点是他的流构建是在接口层面实现的,经过把他自身做为参数传入返回一个返回了一个IteratorSpliterator对象,不一样集合的返回的IteratorSpliterator对象的tryAdvance等遍历方法不一样,具体到IteratorSpliterator源码以下

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (action == null) throw new NullPointerException();
            if (it == null) {
                it = collection.iterator();
                est = (long) collection.size();
            }
            if (it.hasNext()) {
                action.accept(it.next());
                return true;
            }
            return false;
        }    

 

可知IteratorSpliterator对源码的遍历处理是经过传入一个Consumer消费函数消费不一样Collection实现类Iterator迭代器获取的元素实现,感谢JAVA8接口的default设计。

 

3、Stream经常使用方法列表

Stream中间操做方法列表

 

方法 参数 用途
concat Stream< ? extends T> a, Stream< ? extends T> b 将两个流合起来造成新流
distinct 将流里的元素按照Ojbect.equal方法进行聚合去重,返回一个去重结果的新流
empty 返回一个空的流
filter Predicate< ? super T> predicate 按照谓词参数predicate过滤,返回过滤后的流Stream
flatMap Function< ? super T, ? extends Stream< ? extends R>> mapper 将流里的元素T,按照参数Function进行处理,处理结果是一个子流Stream< ? extends R>,后续将子流flat打平,造成元素R的新流。相似的有flatToDouble、flatToInt和flatToLong
limit long maxSize 返回一个新流,只包含maxSize个元素,其余被truncate掉
map Function< ? super T, ? extends R> mapper 经典的map操做,对流里的每一个元素,经过参数mapper映射为一个新的元素,返回新元素的流。相似map有mapToDouble、mapToInt和mapToLong
peek Consumer< ? super T> action 这个动做很是有趣,它并不改变流,而是对流里的每一个元素执行一个Consumer,对其进行一次处理。原始流不变继续返回
skip long n 跳过n个元素,从第n+1个元素开始返回一个新的流
sorted Comparator< ? super T> comparator 根据参数排序器对流进行排序,返回新的流。若是参数为空,则按照天然序排

 

Stream终止操做方法列表

 

方法 参数 用途
allMatch Predicate< ? super T> predicate 根据谓词函数判断流里的元素是否都知足,返回对应的boolean值
anyMatch Predicate< ? super T> predicate 根据谓词函数判断流里的元素是否存在一个或多个知足,返回对应的boolean值
noneMatch Predicate< ? super T> predicate 根据谓词函数判断流里的元素是否不存在任何一个元素知足,返回对应的boolean值
count 返回这个流里元素的个数
findAny 返回一个Optional对象,这个等价于对于一个流执行一个select操做,返回一条记录
findFirst 返回这个流里的第一个元素的Optional,若是这个流不是有序的,则返回任意元素
forEach Consumer< ? super T> action 对这个流的每一个元素,执行参数Consumer
forEachOrdered Consumer<? super T> action 针对forEach在并行流里对有序元素的输出不足,这个方法确保并行流中按照原来顺序处理
max Comparator<? super T> comparator 返回一个Optional值,包含了流里元素的max,条件是按照参数排序器排序
min Comparator<? super T> comparator 返回一个Optional值,包含了流里元素的min,条件是按照参数排序器排序
reduce BinaryOperator< T> accumulator 经典的reduce,就是根据一个二元操做算子,将流中的元素逐个累计操做一遍,初始元素以foundAny结果为主
reduce T identity, BinaryOperator< T> accumulator 与上面的方法一致,只不过多了一个初始值,不须要findAny了
reduce U identity,BiFunction< U, ? super T, U> accumulator,BinaryOperator< U> combiner 最复杂的reduce,看到combiner会不会有联想?它作的也是对于一个流里的元素T,使用二元函数accumulator计算,计算的值累计到U上,由于以前的reduce要求流元素和结果元素类型一致,因此有限制。而该reduce函数,支持T和U类型不一样,经过二元函数转换,可是要求combiner会执行这个事情,要求“ combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)”
collect Supplier< R> supplier,BiConsumer< R, ? super T> accumulator,BiConsumer< R, R> combiner 超级强大的方法。常规的reduce是返回不可变的值。而collect能够将reduce后的值升级为一个可变容器。具体这个方法就是对流里每一个元素T,将Supplier提供的值R做为初始值,用BiConsumer的accumulator进行累加计算。combiner的做用和要求和reduce是同样的
collect Collector< ? super T, A, R> collector 和上面的collect一致,只不过Collector封装了一组上面的参数,T是流里的元素,A是累计中间结果,R是返回值的类型(collect的话就是容器了)

 

 

4、Stream工做原理

下面咱们分析下Stream的工做原理

 

        Integer[] array = new Integer[]{1,2,3,4};
        Optional<Integer> result =  Stream.of(array).filter(v -> v > 2).sorted((v1, v2) -> v2.compareTo(v1)).limit(2).reduce((v1, v2) -> v1 + v2);

        System.out.println(result.get());

 

  首先调用Stream.of方法根据一个Integer对象数组构建了流,函数内部是经过调用Arrays.stream方法返回流,对应的Spliterator实现是ArraySpliterator,而后调用filter方法过滤,咱们分析下这个阶段对应的源码

 

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

由源码可知filter把前面构造的流(Stream)自己做为参数返回了一个StatelessOp实现对象,深刻StatelessOp类咱们知道它是ReferencePipeline的一个内部类,继承ReferencePipeline,而上面咱们分析过ReferencePipeline继承自AbstractPipeline,回到filter源码,咱们看到在filter方法里面实现了超类AbstractPipeline的OpWrapSink方法

    /**
     * Accepts a {@code Sink} which will receive the results of this operation,
     * and return a {@code Sink} which accepts elements of the input type of
     * this operation and which performs the operation, passing the results to
     * the provided {@code Sink}.
     *
     * @apiNote
     * The implementation may use the {@code flags} parameter to optimize the
     * sink wrapping.  For example, if the input is already {@code DISTINCT},
     * the implementation for the {@code Stream#distinct()} method could just
     * return the sink it was passed.
     *
     * @param flags The combined stream and operation flags up to, but not
     *        including, this operation
     * @param sink sink to which elements should be sent after processing
     * @return a sink which accepts elements, perform the operation upon
     *         each element, and passes the results (if any) to the provided
     *         {@code Sink}.
     */
    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);

 

ChainedReference.java源码

/**
     * Abstract {@code Sink} implementation for creating chains of
     * sinks.  The {@code begin}, {@code end}, and
     * {@code cancellationRequested} methods are wired to chain to the
     * downstream {@code Sink}.  This implementation takes a downstream
     * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
     * implementation of the {@code accept()} method must call the correct
     * {@code accept()} method on the downstream {@code Sink}.
     */
    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

经过对上述源码的分析可知Sink(Consumer的一个派生类)在流中的做用实际上是用于控制流中间阶段的数据、大小等状态信息,在Sink方法中还定义了两个方法,begin和end,begin在Sink的accept方法以前调用,end在accept方法以后调用,主要是用于对流程数据进行一些额外的控制,如今咱们在结合分析filter源码发现,为了维护支持流(Stream)的中间操做状态信息,JAVA8流在结构上其实被设计成一个链表结构,一个Head起始节点,多个中间节点StatelessMap(继承自ReferencePipeline),而流程中管理和控制数据状态信息的实际是其中的Sink。

  接下来是到sorted排序阶段,咱们继续深刻源码。

ReferencePipeline.java

 

@Override
    public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
        return SortedOps.makeRef(this, comparator);
}

SortedOps.java

    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                                Comparator<? super T> comparator) {
        return new OfRef<>(upstream, comparator);
    }

排序是一个有状态的中间操做,与filter阶段相似sorted方法实际返回的是一个OfRef对象,深刻SortedOps的makeRef方法,可知返回了一个OfRef实例分析该类可知该类是StatefulOp的子类,该类持有一个排序器,解读StatefulOp类源码可知该类是流中间状态的基类,对比StatelessOp类,接下来经过源码分析方法的逻辑

SortedOps.OfRef

 

        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // If the input is already naturally sorted and this operation
            // also naturally sorted then this is a no-op
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }

 

分析代码可知实现逻辑与filter阶段相似,都是经过Sink控制流的数据和中间状态信息,流程逻辑是先对入参Sink判空,若是流有序,直接返回sink,不然判断是否有界,若是有界返回一个SizedRefSortingSink对象,不然返回一个RefSortingSink对象,深刻两个类,不出意外,前者内部是经过数组保存数据,后者是经过一个ArrayList实例保存,二者均是在end方法里借由内部排序器完成元素排序

 

  接下来的limit截取阶段相似,读者可自行分析,大致的实现逻辑与上述两个阶段并没有二致

  最后到了终止操做阶段reduce

ReferencePipeline.java

    @Override
    public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
        return evaluate(ReduceOps.makeRef(accumulator));
    }

AbstractPipeline.java

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

 

ReduceOps.java

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

 

AbstractPipeline.java

    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

经过对源码的分析可知,在Stream的中间阶段若是没有意外中断,代码执行到终止操做时才开始执行前面定义的各个中间操做,也就是说Stream的中间操做的执行方式都是lazy,读者可自行在中间流的opWrapSink打断点,去掉流的终止操做进行校验。

相关文章
相关标签/搜索