[译] 一文带你玩转 Java8 Stream 流,今后操做集合 So Easy

本文翻译自 winterbe.com/posts/2014/…html

做者: @Winterbejava

欢迎关注我的微信公众号: 小哈学Java,便可免费无套路领取10G面试学习资料哦,文末资料截图。面试

我的网站: www.exception.site/java8/java8…shell

Stream 流能够说是 Java8 新特性中用起来最爽的一个功能了,有了它,今后操做集合告别繁琐的 for 循环。可是还有不少小伙伴对 Stream 流不是很了解。今天就经过这篇 @Winterbe 的译文,一块儿深刻了解下如何使用它吧。编程

目录

1、Stream 流是如何工做的?api

2、不一样类型的 Stream 流bash

3、Stream 流的处理顺序微信

4、中间操做顺序这么重要?多线程

5、数据流复用问题oracle

6、高级操做

  • 6.1 Collect
  • 6.2 FlatMap
  • 6.3 Reduce

7、并行流

8、结语


当我第一次阅读 Java8 中的 Stream API 时,说实话,我很是困惑,由于它的名字听起来与 Java I0 框架中的 InputStreamOutputStream 很是相似。可是实际上,它们彻底是不一样的东西。

Java8 Stream 使用的是函数式编程模式,如同它的名字同样,它能够被用来对集合进行链状流式的操做。

本文就将带着你如何使用 Java 8 不一样类型的 Stream 操做。同时您还将了解流的处理顺序,以及不一样顺序的流操做是如何影响运行时性能的。

咱们还将学习终端操做 API reducecollect 以及flatMap的详细介绍,最后咱们再来深刻的探讨一下 Java8 并行流。

注意:若是您还不熟悉 Java 8 lambda 表达式,函数式接口以及方法引用,您能够先阅读一下小哈的另外一篇译文 《Java8 新特性教程》

接下来,就让咱们进入正题吧!

1、Stream 流是如何工做的?

流表示包含着一系列元素的集合,咱们能够对其作不一样类型的操做,用来对这些元素执行计算。听上去可能有点拗口,让咱们用代码说话:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream() // 建立流
    .filter(s -> s.startsWith("c")) // 执行过滤,过滤出以 c 为前缀的字符串
    .map(String::toUpperCase) // 转换成大写
    .sorted() // 排序
    .forEach(System.out::println); // for 循环打印

// C1
// C2
复制代码

咱们能够对流进行中间操做或者终端操做。小伙伴们可能会疑问?什么是中间操做?什么又是终端操做?

Stream中间操做,终端操做
Stream中间操做,终端操做

  • :中间操做会再次返回一个流,因此,咱们能够连接多个中间操做,注意这里是不用加分号的。上图中的filter 过滤,map 对象转换,sorted 排序,就属于中间操做。
  • :终端操做是对流操做的一个结束动做,通常返回 void 或者一个非流的结果。上图中的 forEach循环 就是一个终止操做。

看完上面的操做,感受是否是很像一个流水线式操做呢。

实际上,大部分流操做都支持 lambda 表达式做为参数,正确理解,应该说是接受一个函数式接口的实现做为参数。

2、不一样类型的 Stream 流

咱们能够从各类数据源中建立 Stream 流,其中以 Collection 集合最为常见。如 ListSet 均支持 stream() 方法来建立顺序流或者是并行流。

并行流是经过多线程的方式来执行的,它可以充分发挥多核 CPU 的优点来提高性能。本文在最后再来介绍并行流,咱们先讨论顺序流:

Arrays.asList("a1", "a2", "a3")
    .stream() // 建立流
    .findFirst() // 找到第一个元素
    .ifPresent(System.out::println);  // 若是存在,即输出

// a1
复制代码

在集合上调用stream()方法会返回一个普通的 Stream 流。可是, 您大可没必要刻意地建立一个集合,再经过集合来获取 Stream 流,您还能够经过以下这种方式:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1
复制代码

例如上面这样,咱们能够经过 Stream.of() 从一堆对象中建立 Stream 流。

除了常规对象流以外,Java 8还附带了一些特殊类型的流,用于处理原始数据类型intlong以及double。说道这里,你可能已经猜到了它们就是IntStreamLongStream还有DoubleStream

其中,IntStreams.range()方法还能够被用来取代常规的 for 循环, 以下所示:

IntStream.range(1, 4)
    .forEach(System.out::println); // 至关于 for (int i = 1; i < 4; i++) {}

// 1
// 2
// 3
复制代码

上面这些原始类型流的工做方式与常规对象流基本是同样的,但仍是略微存在一些区别:

  • 原始类型流使用其独有的函数式接口,例如IntFunction代替FunctionIntPredicate代替Predicate

  • 原始类型流支持额外的终端聚合操做,sum()以及average(),以下所示:

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1) // 对数值中的每一个对象执行 2*n + 1 操做
    .average() // 求平均值
    .ifPresent(System.out::println);  // 若是值不为空,则输出
// 5.0
复制代码

可是,偶尔咱们也有这种需求,须要将常规对象流转换为原始类型流,这个时候,中间操做 mapToInt()mapToLong() 以及mapToDouble就派上用场了:

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1)) // 对每一个字符串元素从下标1位置开始截取
    .mapToInt(Integer::parseInt) // 转成 int 基础类型类型流
    .max() // 取最大值
    .ifPresent(System.out::println);  // 不为空则输出

// 3
复制代码

若是说,您须要将原始类型流装换成对象流,您可使用 mapToObj()来达到目的:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i) // for 循环 1->4, 拼接前缀 a
    .forEach(System.out::println); // for 循环打印

// a1
// a2
// a3
复制代码

下面是一个组合示例,咱们将双精度流首先转换成 int 类型流,而后再将其装换成对象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue) // double 类型转 int
    .mapToObj(i -> "a" + i) // 对值拼接前缀 a
    .forEach(System.out::println); // for 循环打印

// a1
// a2
// a3
复制代码

3、Stream 流的处理顺序

上小节中,咱们已经学会了如何建立不一样类型的 Stream 流,接下来咱们再深刻了解下数据流的执行顺序。

在讨论处理顺序以前,您须要明确一点,那就是中间操做的有个重要特性 —— 延迟性。观察下面这个没有终端操做的示例代码:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });
复制代码

执行此代码段时,您可能会认为,将依次打印 "d2", "a2", "b1", "b3", "c" 元素。然而当你实际去执行的时候,它不会打印任何内容。

为何呢?

缘由是:当且仅当存在终端操做时,中间操做操做才会被执行。

是否是不信?接下来,对上面的代码添加 forEach终端操做:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));
复制代码

再次执行,咱们会看到输出以下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c
复制代码

输出的顺序可能会让你很惊讶!你脑海里确定会想,应该是先将全部 filter 前缀的字符串打印出来,接着才会打印 forEach 前缀的字符串。

事实上,输出的结果倒是随着链条垂直移动的。好比说,当 Stream 开始处理 d2 元素时,它实际上会在执行完 filter 操做后,再执行 forEach 操做,接着才会处理第二个元素。

是否是很神奇?为何要设计成这样呢?

缘由是出于性能的考虑。这样设计能够减小对每一个元素的实际操做数,看完下面代码你就明白了:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 转大写
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A"); // 过滤出以 A 为前缀的元素
    });

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
复制代码

终端操做 anyMatch()表示任何一个元素以 A 为前缀,返回为 true,就中止循环。因此它会从 d2 开始匹配,接着循环到 a2 的时候,返回为 true ,因而中止循环。

因为数据流的链式调用是垂直执行的,map这里只须要执行两次。相对于水平执行来讲,map会执行尽量少的次数,而不是把全部元素都 map 转换一遍。

4、中间操做顺序这么重要?

下面的例子由两个中间操做mapfilter,以及一个终端操做forEach组成。让咱们再来看看这些操做是如何执行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 转大写
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A"); // 过滤出以 A 为前缀的元素
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循环输出

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
复制代码

学习了上面一小节,您应该已经知道了,mapfilter会对集合中的每一个字符串调用五次,而forEach却只会调用一次,由于只有 "a2" 知足过滤条件。

若是咱们改变中间操做的顺序,将filter移动到链头的最开始,就能够大大减小实际的执行次数:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s)
        return s.startsWith("a"); // 过滤出以 a 为前缀的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 转大写
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循环输出

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
复制代码

如今,map仅仅只需调用一次,性能获得了提高,这种小技巧对于流中存在大量元素来讲,是很是颇有用的。

接下来,让咱们对上面的代码再添加一个中间操做sorted

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2); // 排序
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a"); // 过滤出以 a 为前缀的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 转大写
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循环输出
复制代码

sorted 是一个有状态的操做,由于它须要在处理的过程当中,保存状态以对集合中的元素进行排序。

执行上面代码,输出以下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2
复制代码

咦咦咦?此次怎么又不是垂直执行了。你须要知道的是,sorted是水平执行的。所以,在这种状况下,sorted会对集合中的元素组合调用八次。这里,咱们也能够利用上面说道的优化技巧,将 filter 过滤中间操做移动到开头部分:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
复制代码

从上面的输出中,咱们看到了 sorted从未被调用过,由于通过filter事后的元素已经减小到只有一个,这种状况下,是不用执行排序操做的。所以性能被大大提升了。

5、数据流复用问题

Java8 Stream 流是不能被复用的,一旦你调用任何终端操做,流就会关闭:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception
复制代码

当咱们对 stream 调用了 anyMatch 终端操做之后,流即关闭了,再调用 noneMatch 就会抛出异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
复制代码

为了克服这个限制,咱们必须为咱们想要执行的每一个终端操做建立一个新的流链,例如,咱们能够经过 Supplier 来包装一下流,经过 get() 方法来构建一个新的 Stream 流,以下所示:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
复制代码

经过构造一个新的流,来避开流不能被复用的限制, 这也是取巧的一种方式。

6、高级操做

Streams 支持的操做很丰富,除了上面介绍的这些比较经常使用的中间操做,如filtermap(参见Stream Javadoc)外。还有一些更复杂的操做,如collectflatMap以及reduce。接下来,就让咱们学习一下:

本小节中的大多数代码示例均会使用如下 List<Person>进行演示:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

// 构建一个 Person 集合
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));
复制代码

6.1 Collect

collect 是一个很是有用的终端操做,它能够将流中的元素转变成另一个不一样的对象,例如一个ListSetMap。collect 接受入参为Collector(收集器),它由四个不一样的操做组成:供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)。

这些都是个啥?别慌,看上去很是复杂的样子,但好在大多数状况下,您并不须要本身去实现收集器。由于 Java 8经过Collectors类内置了各类经常使用的收集器,你直接拿来用就好了。

让咱们先从一个很是常见的用例开始:

List<Person> filtered =
    persons
        .stream() // 构建流
        .filter(p -> p.name.startsWith("P")) // 过滤出名字以 P 开头的
        .collect(Collectors.toList()); // 生成一个新的 List

System.out.println(filtered);    // [Peter, Pamela]
复制代码

你也看到了,从流中构造一个 List 异常简单。若是说你须要构造一个 Set 集合,只须要使用Collectors.toSet()就能够了。

接下来这个示例,将会按年龄对全部人进行分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age)); // 以年龄为 key,进行分组

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
复制代码

除了上面这些操做。您还能够在流上执行聚合操做,例如,计算全部人的平均年龄:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age)); // 聚合出平均年龄

System.out.println(averageAge);     // 19.0
复制代码

若是您还想获得一个更全面的统计信息,摘要收集器能够返回一个特殊的内置统计对象。经过它,咱们能够简单地计算出最小年龄、最大年龄、平均年龄、总和以及总数量。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age)); // 生成摘要统计

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
复制代码

下一个这个示例,能够将全部人名链接成一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18) // 过滤出年龄大于等于18的
    .map(p -> p.name) // 提取名字
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); // 以 In Germany 开头,and 链接各元素,再以 are of legal age. 结束

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
复制代码

链接收集器的入参接受分隔符,以及可选的前缀以及后缀。

对于如何将流转换为 Map集合,咱们必须指定 Map 的键和值。这里须要注意,Map 的键必须是惟一的,不然会抛出IllegalStateException 异常。

你能够选择传递一个合并函数做为额外的参数来避免发生这个异常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2)); // 对于一样 key 的,将值拼接

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
复制代码

既然咱们已经知道了这些强大的内置收集器,接下来就让咱们尝试构建自定义收集器吧。

好比说,咱们但愿将流中的全部人转换成一个字符串,包含全部大写的名称,并以|分割。为了达到这种效果,咱们须要经过Collector.of()建立一个新的收集器。同时,咱们还须要传入收集器的四个组成部分:供应器、累加器、组合器和终止器。

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier 供应器
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator 累加器
        (j1, j2) -> j1.merge(j2),               // combiner 组合器
        StringJoiner::toString);                // finisher 终止器

String names = persons
    .stream()
    .collect(personNameCollector); // 传入自定义的收集器

System.out.println(names);  // MAX | PETER | PAMELA | DAVID
复制代码

因为Java 中的字符串是 final 类型的,咱们须要借助辅助类StringJoiner,来帮咱们构造字符串。

最开始供应器使用分隔符构造了一个StringJointer

累加器用于将每一个人的人名转大写,而后加到StringJointer中。

组合器将两个StringJointer合并为一个。

最终,终结器从StringJointer构造出预期的字符串。

6.2 FlatMap

上面咱们已经学会了如经过map操做, 将流中的对象转换为另外一种类型。可是,Map只能将每一个对象映射到另外一个对象。

若是说,咱们想要将一个对象转换为多个其余对象或者根本不作转换操做呢?这个时候,flatMap就派上用场了。

FlatMap 可以将流的每一个元素, 转换为其余对象的流。所以,每一个对象能够被转换为零个,一个或多个其余对象,并以流的方式返回。以后,这些流的内容会被放入flatMap返回的流中。

在学习如何实际操做flatMap以前,咱们先新建两个类,用来测试:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}
复制代码

接下来,经过咱们上面学习到的流知识,来实例化一些对象:

List<Foo> foos = new ArrayList<>();

// 建立 foos 集合
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// 建立 bars 集合
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
复制代码

咱们建立了包含三个foo的集合,每一个foo中又包含三个 bar

flatMap 的入参接受一个返回对象流的函数。为了处理每一个foo中的bar,咱们须要传入相应 stream 流:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
复制代码

如上所示,咱们已成功将三个 foo对象的流转换为九个bar对象的流。

最后,上面的这段代码能够简化为单一的流式操做:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
复制代码

flatMap也可用于Java8引入的Optional类。OptionalflatMap操做返回一个Optional或其余类型的对象。因此它能够用于避免繁琐的null检查。

接下来,让咱们建立层次更深的对象:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}
复制代码

为了处理从 Outer 对象中获取最底层的 foo 字符串,你须要添加多个null检查来避免可能发生的NullPointerException,以下所示:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
复制代码

咱们还可使用OptionalflatMap操做,来完成上述相同功能的判断,且更加优雅:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);
复制代码

若是不为空的话,每一个flatMap的调用都会返回预期对象的Optional包装,不然返回为nullOptional包装类。

笔者补充:关于 Optional 可参见我另外一篇译文《Java8 新特性如何防止空指针异常》

6.3 Reduce

规约操做能够将流的全部元素组合成一个结果。Java 8 支持三种不一样的reduce方法。第一种将流中的元素规约成流中的一个元素。

让咱们看看如何使用这种方法,来筛选出年龄最大的那我的:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela
复制代码

reduce方法接受BinaryOperator积累函数。该函数其实是两个操做数类型相同的BiFunctionBiFunction功能和Function同样,可是它接受两个参数。示例代码中,咱们比较两我的的年龄,来返回年龄较大的人。

第二种reduce方法接受标识值和BinaryOperator累加器。此方法可用于构造一个新的 Person,其中包含来自流中全部其余人的聚合名称和年龄:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
复制代码

第三种reduce方法接受三个参数:标识值,BiFunction累加器和类型的组合器函数BinaryOperator。因为初始值的类型不必定为Person,咱们可使用这个归约函数来计算全部人的年龄总和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76
复制代码

结果为76,可是内部究竟发生了什么呢?让咱们再打印一些调试日志:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
复制代码

你能够看到,累加器函数完成了全部工做。它首先使用初始值0和第一我的年龄相加。接下来的三步中sum会持续增长,直到76。

等等?好像哪里不太对!组合器历来都没有调用过啊?

咱们以并行流的方式运行上面的代码,看看日志输出:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
复制代码

并行流的执行方式彻底不一样。这里组合器被调用了。实际上,因为累加器被并行调用,组合器须要被用于计算部分累加值的总和。

让咱们在下一章深刻探讨并行流。

7、并行流

流是能够并行执行的,当流中存在大量元素时,能够显著提高性能。并行流底层使用的ForkJoinPool, 它由ForkJoinPool.commonPool()方法提供。底层线程池的大小最多为五个 - 具体取决于 CPU 可用核心数:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
复制代码

在个人机器上,公共池初始化默认值为 3。你也能够经过设置如下JVM参数能够减少或增长此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
复制代码

集合支持parallelStream()方法来建立元素的并行流。或者你能够在已存在的数据流上调用中间方法parallel(),将串行流转换为并行流,这也是能够的。

为了详细了解并行流的执行行为,咱们在下面的示例代码中,打印当前线程的信息:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
复制代码

经过日志输出,咱们能够对哪一个线程被用于执行流式操做,有个更深刻的理解:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
复制代码

如您所见,并行流使用了全部的ForkJoinPool中的可用线程来执行流式操做。在持续的运行中,输出结果可能有所不一样,由于所使用的特定线程是非特定的。

让咱们经过添加中间操做sort来扩展上面示例:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
复制代码

运行代码,输出结果看上去有些奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
复制代码

貌似sort只在主线程上串行执行。可是实际上,并行流中的sort在底层使用了Java8中新的方法Arrays.parallelSort()。如 javadoc官方文档解释的,这个方法会按照数据长度来决定以串行方式,或者以并行的方式来执行。

若是指定数据的长度小于最小数值,它则使用相应的Arrays.sort方法来进行排序。

回到上小节 reduce的例子。咱们已经发现了组合器函数只在并行流中调用,而不不会在串行流中被调用。

让咱们来实际观察一下涉及到哪一个线程:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
复制代码

经过控制台日志输出,累加器和组合器均在全部可用的线程上并行执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
复制代码

总之,你须要记住的是,并行流对含有大量元素的数据流提高性能极大。可是你也须要记住并行流的一些操做,例如reducecollect操做,须要额外的计算(如组合操做),这在串行执行时是并不须要。

此外,咱们也了解了,全部并行流操做都共享相同的 JVM 相关的公共ForkJoinPool。因此你可能须要避免写出一些又慢又卡的流式操做,这颇有可能会拖慢你应用中,严重依赖并行流的其它部分代码的性能。

8、结语

Java8 Stream 流编程指南到这里就结束了。若是您有兴趣了解更多有关 Java 8 Stream 流的相关信息,我建议您使用 Stream Javadoc 阅读官方文档。若是您想了解有关底层机制的更多信息,您也能够阅读 Martin Fowlers 关于 Collection Pipelines 的文章。

最后,祝您学习愉快!

赠送 10G 面试&学习福利资源

获取方式: 关注微信公众号: 小哈学Java, 后台回复"666",既可免费无套路获取资源连接,下面是目录以及部分截图:

关注微信公众号【小哈学Java】,回复“666”,便可免费无套路领取哦
关注微信公众号【小哈学Java】,回复“666”,便可免费无套路领取哦

欢迎关注微信公众号: 小哈学Java

小哈学Java,关注领取10G面试学习资料哦
小哈学Java,关注领取10G面试学习资料哦
相关文章
相关标签/搜索