「Java8系列」流式编程Stream

前言

「Java8系列」神秘的Lambda
「Java8系列」神奇的函数式接口
继上两篇以后,本文已经java8系列的第三篇了。本篇文章比较长,但我但愿你们都能认真读完。读不完能够先收藏,在找时间读。没看过前两篇的能够点上边的连接看看,前两篇文章算是对是用Stream铺垫的一点基础吧,不过不看也能够学会使用Stream,但看了会有助于更好的理解和使用。在没有深刻了解以前,我觉得Stream也是数据的载体,但后来发现并非。那么它究竟是什么?听我慢慢道来。java

什么是Stream?

Stream它并非一个容器,它只是对容器的功能进行了加强,添加了不少便利的操做,例如查找、过滤、分组、排序等一系列的操做。而且有串行、并行两种执行模式,并行模式充分的利用了多核处理器的优点,使用fork/join框架进行了任务拆分,同时提升了执行速度。简而言之,Stream就是提供了一种高效且易于使用的处理数据的方式。git

  • 特色:
  1. Stream本身不会存储元素。
  2. Stream的操做不会改变源对象。相反,他们会返回一个持有结果的新Stream。
  3. Stream 操做是延迟执行的。它会等到须要结果的时候才执行。也就是执行终端操做的时候。
  • 图解:
    在这里插入图片描述
    一个Stream的操做就如上图,在一个管道内,分为三个步骤,第一步是建立Stream,从集合、数组中获取一个流,第二步是中间操做链,对数据进行处理。第三步是终端操做,用来执行中间操做链,返回结果。

怎么建立Stream?

  • 由集合建立: Java8 中的 Collection 接口被扩展,提供了两个获取流的方法,这两个方法是default方法,也就是说全部实现Collection接口的接口都不须要实现就能够直接使用:
  1. default Stream stream() : 返回一个顺序流。
  2. default Stream parallelStream() : 返回一个并行流。
    例如:
        List<Integer> integerList = new ArrayList<>();
        integerList.add(1);
        integerList.add(2);
        Stream<Integer> stream = integerList.stream();
        Stream<Integer> stream1 = integerList.parallelStream();
    复制代码
  • 由数组建立: Java8 中的 Arrays 的静态方法 stream() 能够获取数组流:
  1. static Stream stream(T[] array): 返回一个流
  2. 重载形式,可以处理对应基本类型的数组:
    public static IntStream stream(int[] array)
    public static LongStream stream(long[] array)
    public static DoubleStream stream(double[] array)
    例如:
        int[] intArray = {1,2,3};
        IntStream stream = Arrays.stream(intArray);
    复制代码
  • 由值建立: 可使用静态方法 Stream.of(), 经过显示值 建立一个流。它能够接收任意数量的参数。
  1. public static Stream of(T... values) : 返回一个流。
    例如:
    Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
    复制代码
  • 由函数建立:建立无限流 可使用静态方法 Stream.iterate() 和 Stream.generate()建立无限流。
  1. 迭代 public static Stream iterate(final T seed, final UnaryOperator f)
  2. 生成 public static Stream generate(Supplier s)
    例如:
     Stream.generate(Math::random).limit(5).forEach(System.out::print); 
     List<Integer> collect = Stream.iterate(0,i -> i + 1).limit(5).collect(Collectors.toList());
    复制代码
    注意:使用无限流必定要配合limit截断,否则会无限制建立下去。

Stream的中间操做

若是Stream只有中间操做是不会执行的,当执行终端操做的时候才会执行中间操做,这种方式称为延迟加载或惰性求值。多个中间操做组成一个中间操做链,只有当执行终端操做的时候才会执行一遍中间操做链,具体是由于什么咱们在后面再说明。下面看下Stream有哪些中间操做。github

  • Stream<T> distinct(): 去重,经过流所生成元素的 hashCode() 和 equals() 去除重复元素。
  • Stream<T> filter(Predicate<? super T> predicate): Predicate函数在上一篇当中咱们已经讲过,它是断言型接口,因此filter方法中是接收一个和Predicate函数对应Lambda表达式,返回一个布尔值,从流中过滤某些元素。
  • Stream<T> sorted(Comparator<? super T> comparator): 指定比较规则进行排序。
  • Stream<T> limit(long maxSize): 截断流,使其元素不超过给定数量。若是元素的个数小于maxSize,那就获取全部元素。
  • Stream<T> skip(long n): 跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补。
  • Stream<R> map(Function<? super T, ? extends R> mapper): 接收一个Function函数做为参数,该函数会被应用到每一个元素上,并将其映射成一个新的元素。也就是转换操做,map还有三个应用于具体类型方法,分别是:mapToInt,mapToLong和mapToDouble。这三个方法也比较好理解,好比mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。这三个方法能够免除自动装箱/拆箱的额外消耗。
  • Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper): 接收一个Function函数做为参数,将流中的每一个值都转换成另外一个流,而后把全部流链接成一个流。flatMap也有三个应用于具体类型的方法,分别是:flatMapToInt、flatMapToLong、flatMapToDouble,其做用于map的三个衍生方法相同。

Stream的终端操做

终端操做执行中间操做链,并返回结果。终端操做咱们就不一一介绍了,只介绍一下经常使用的操做。详细可看java.util.stream.Stream接口中的方法。算法

  • void forEach(Consumer<? super T> action): 内部迭代(须要用户去作迭代,称为外部迭代。相反,Stream API使用内部迭代帮你把迭代作了)
users.stream().forEach(user -> System.out.println(user.getName()));
复制代码
  • <R, A> R collect(Collector<? super T, A, R> collector): 收集、将流转换为其余形式,好比转换成List、Set、Map。collect方法是用Collector做为参数,Collector接口中方法的实现决定了如何对流执行收集操做(如收集到 List、Set、Map)。可是 Collectors 实用类提供了不少静态方法,能够方便地建立常见收集器实例。例举一些经常使用的:
List<User> users = Lists.newArrayList();
 users.add(new User(15, "A", ImmutableList.of("1元", "5元")));
 users.add(new User(25, "B", ImmutableList.of("10元", "50元")));
 users.add(new User(21, "C", ImmutableList.of("100元")));
 //收集名称到List
 List<String> nameList = users.stream().map(User::getName).collect(Collectors.toList());
 //收集名称到List
 Set<String> nameSet = users.stream().map(User::getName).collect(Collectors.toSet());
 //收集到map,名字做为key,user对象做为value
 Map<String, User> userMap = users.stream()
                .collect(Collectors.toMap(User::getName, Function.identity(), (k1, k2) -> k2));
复制代码
  • 其余终端操做:
  1. boolean allMatch(Predicate<? super T> predicate); 检查是否匹配全部元素。
  2. boolean anyMatch(Predicate<? super T> predicate); 检查是否至少匹配一个元素。
  3. boolean noneMatch(Predicate<? super T> predicate); 检查是否没有匹配全部元素。
  4. Optional<T> findFirst(); 返回当前流中的第一个元素。
  5. Optional<T> findAny(); 返回当前流中的任意元素。
  6. long count(); 返回流中元素总数。
  7. Optional<T> max(Comparator<? super T> comparator); 返回流中最大值。
  8. Optional<T> min(Comparator<? super T> comparator); 返回流中最小值。
  9. T reduce(T identity, BinaryOperator<T> accumulator); 能够将流中元素反复结合起来,获得一个值。 返回 T。这是一个归约操做。

Fork/Join框架

上面咱们提到过,说Stream的并行模式使用了Fork/Join框架,这里简单说下Fork/Join框架是什么?Fork/Join框架是java7中加入的一个并行任务框架,能够将任务拆分为多个小任务,每一个小任务执行完的结果在合并成为一个结果。在任务的执行过程当中使用工做窃取(work-stealing)算法,减小线程之间的竞争。segmentfault

  • Fork/Join图解
  • 工做窃取图解

Stream是怎么实现的

先看下总体类图:蓝色箭头表明继承,绿色箭头表明实现,红色箭头表明内部类。 api

实际上Stream只有两种操做,中间操做、终端操做,中间操做只是一种标记,只有终端操做才会实际触发执行。因此Stream流水线式的操做大体应该是用某种方式记录中间操做,只有调用终端操做才会将全部的中间操做叠加在一块儿在一次迭代中所有执行。这里只作简单的介绍,想详细了解的能够参考下面的参考资料中的连接。

  • 操做怎么记录?
    Stream的操做记录是经过ReferencePipeline记录的,ReferencePipeline有三个内部类Head、StatelessOp、StatefulOp,Stream中使用Stage的概念来描述一个完整的操做,并用某种实例化后的ReferencePipeline来表明Stage,Head用于表示第一个Stage,即调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操做,StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操做。
  • 操做怎么叠加?
    操做是记录完了,可是前面的Stage并不知道后面Stage到底执行了哪一种操做,以及回调函数是哪一种形式。这就须要有某种协议来协调相邻Stage之间的调用关系。 这种协议由Sink接口完成,Sink接口包含的方法以下表所示:
  1. void begin(long size),开始遍历元素以前调用该方法,通知Sink作好准备。
  2. void end(),全部元素遍历完成以后调用,通知Sink没有更多的元素了。
  3. boolean cancellationRequested(),是否能够结束操做,可让短路操做尽早结束。
  4. void accept(T t),遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把本身包含的操做和回调方法封装到该方法里,前一个Stage只须要调用当前Stage.accept(T t)方法就好了。

每一个Stage都会将本身的操做封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法便可,并不须要知道其内部是如何处理的。有了Sink对操做的包装,Stage之间的调用问题就解决了,执行时只须要从流水线的head开始对数据源依次调用每一个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能够了。数组

  • 操做怎么执行?
    Sink完美封装了Stream每一步操做,并给出了[处理->转发]的模式来叠加操做。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操做呢?也许你已经想到了启动的原始动力就是结束操做(Terminal Operation),一旦调用某个结束操做,就会触发整个流水线的执行。

参考资料

ifeve.com/stream
www.ibm.com/developerwo…
segmentfault.com/a/119000001…
github.com/CarpenterLe…
bash

你们看后辛苦点个赞点个关注哦!后续还会后更多的博客。有兴趣能够扫码加群。若有错误,烦请指正。 app

相关文章
相关标签/搜索