Java基础系列-Java8 Stream 简明教程

Stream 是 Java8 中一个重大的更新。Stream 为Java 真正带来了函数式编程的特性。对函数式编程不了解的人每每不知道如何动手,经过Benjamin 的教程来完整的学习一下 Java 的这个特性,学会这些技能会让你的代码看起来更酷。html


这是一个经过代码示例来深度讲解 Java8 Stream 的教程。当我第一次看到 Stream 的 API 时,我感到很迷惑,由于这个名称听起来和 Java I/O 包中的 InputStreamOutputStream 有关系。可是实际上它们是彻底不一样的东西。 Stream 是 Monad(函数式编程),它为 Java 带来了函数式编程的特性,下面是维基百科对 Monad 的解释:java

In functional programming, a monad is a structure that represents computations defined as sequences of steps. A type with a monad structure defines what it means to chain operations, or nest functions of that type together.shell

这份教程会讲解 Java8 Stream 的原理以及不一样操做之间的区别。你将会学习到 Stream 操做的处理顺序以及不一样的顺序对性能的影响。还会对经常使用的操做如 ReducecollectflatMap 进行详细讲解。在教程的最后会说明并行 Stream 的优势。编程

注:Stream 中的 API 称之为操做api

若是你还不熟悉 Java8 的 lambda 表达式、函数式接口以及方法引用,能够先去读一下这份Java8 教程数组

Stream 原理

一个 Stream 表明着一组元素以及支持对这些元素进行计算的不一样操做微信

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2
复制代码

Stream 操做分为中间操做终端操做。中间操做会返回一个 Stream 对象,因此能够对中间操做进行链式操做。终端操做会返回一个 void 或者非 Stream 的对象。在上面的例子中,filtermapsorted 都是中间操做,而 forEach 则是一个终端操做。Stream 完整的操做 API 能够查看文档。Stream 链式操做能够查看上面的例子,链式操做也称之为管道操做数据结构

许多 Stream 操做接受 Lambda 或者函数式接口来限定操做范围。这些操做中绝大多数都必须是non-interfering无状态的,这是什么意思呢?oracle

注:在函数式编程中,函数自己是能够做为参数的app

non-interfering 表示方法在执行的过程当中不会改动流中原数据,好比在前面的例子中没有 lambda 表达式修改了 myList 中的元素。

无状态表示方法屡次执行的结果是肯定的,好比前面的例子中没有 lambda 表达式会依赖在执行过程当中会被修改的外部做用域中的变量。

不一样种类的 Stream

Stream 能够经过多种方式建立,尤为是各类容器对象。List 和 Set 都支持 stream()parallelStream() 方法来建立串行或者并行的 Stream。并行 Stream 能够同时运行在多个线程上,在下文会详细讲解,当前先经过串行 Stream 来演示:

Arrays.asList("a1", "a2", "a3")
        .stream()
        .findFirst()
        .ifPresent(System.out::println); //a1
复制代码

调用 List 的 stream() 方法会返回一个 Stream 对象。可是获得 Stream 对象不必定要建立 Collection 对象,看下面的代码:

Stream.of("a1", "a2", "a3")
         .findFirst()
         .ifPresent(System.out.println);
复制代码

只须要经过 Stream.of() 就能够把一堆对象建立为 Stream。

另外在 Java8 还能够经过 IntStreamLongStreamDoubleStream 等来操做原生数据类型 intlongdouble

IntStream 经过 range() 方法能够替代 for 循环:

IntStream.range(1,4)
            .forEach(System.out::println);
 // 1
 // 2
 // 3
复制代码

全部的原生类型均可以和其余对象同样使用 Stream,可是全部的原生类型 Stream 都使用专门的 lambda 表达式,好比 int 使用 IntFunction 而不是 Function,使用 IntPredicate 而不是 Predicate

而且原生类型 Stream 还另外支持终端聚合操做 sum() 以及 average():

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0
复制代码

这些操做在将对象转化为原生类型的时候很是有用,反之亦然。出于这个目的,普通 Stream 支持特别的 map 操做,好比 mapToInt()mapToLong()mapToDouble()

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3
复制代码

原生数据类型能够经过 mapToObj() 转化为对象:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3
复制代码

下面这个例子是一个组合操做:double Stream 的元素首先被转成 int 最后被转化成 String:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3
复制代码

处理次序

上文中已经详细描述了如何建立和使用不一样类型的 Stream,下面会深刻研究 Stream 的操做是如何进行的。

中间操做的一个重要特征是延迟,看下面这个没有终端操做的例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });
复制代码

当上面的代码片断执行完成的时候,控制台并无输出任何东西。这是由于中间操做在有终端操做的时候才会执行。

给上面的例子加上终端操做 forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));
复制代码

执行这段代码会有以下的输出:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c
复制代码

输出结果的顺序可能会让人惊讶。以前你可能会认为 Stream 中的元素会在一个操做中所有处理完以后才会进入到下一个操做。但实际的状况是一个元素在全部的操做执行完成以后才会轮到下一个元素。"d2" 首先被 filterforEach 的处理,而后 "a2" 才会被处理。

这样能够减小每一个操做实际处理元素的个数,看下面这个例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
复制代码

这个 anyMatch 操做只在输入元素知足条件的状况下才会返回 true。在上面的例子中,运行到第二个元素 "a2" 时就会返回 true,而后就会中止处理其余元素,因此 map 操做也只是执行了两次,这正是得益于 Stream 的链式处理次序。

为何次序很关键

下面的这个例子由两个中间操做 mapfilter 以及一个终端操做 forEach 组成。再看一下这些操做是如何执行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
复制代码

正如上面的例子所分析,map 和 filter 对每一个字符串各执行了 5 次,而 forEach 仅仅执行了一次。

能够简单的调整操做的顺序来减小操做执行的总次数,下面的例子中把 filter 操做放到了 map 前面:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
复制代码

调整后,map 只执行了一次,整个操做管道在输入大量元素时的执行速度会快不少。若是 Stream 有不少的操做,时序考虑一下能不能经过调整持续来优化。

在上面的例子中另外加上 sorted 操做:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
复制代码

sotred 是一个另类的中间操做,它是有状态的。由于在排序的过程当中必需要维护数据的状态。

执行上面的例子会产生以下输出:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2
复制代码

首先,sorted 会把输入的全部元素排好序以后才会进入下一个操做,和其余操做不一样,sorted 是水平执行的。因此在上面的例子中 sorted 才会被执行 8 次。

经过调整操做的次序能够再一次提高执行的性能:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
复制代码

在这个例子中 sorted 永远也不会被执行,在 filter 执行完了以后就剩下一个元素,也就没有排序的必要。在输入大量元素的状况下,性能也会获得极大的提高。

重用 Stream

Java8 中的 Stream 是不能被重用的。一旦执行了终端操做,那么 Stream 就会被关闭:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception
复制代码

在 anyMatch 以后调用 noneMatch 会产生以下的异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
复制代码

若是须要解决这一点,能够为每个终端操做建立一个新的 Stream,好比可使用 Supplier 来建立全部中间操做已经执行完成的 Stream:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
复制代码

每调用一次 get() 方法都会建立一个新的 Stream,而后就能够执行须要执行的终端操做了。

进阶操做

Stream 支持大量不一样的操做,在上面的例子中已经介绍了最重要的操做如 filtermap。完整的操做能够在官方文档中查看。下面会重点介绍更加复杂的操做 collectflatMapreduce

这节绝大部分的代码例子都会使用下面 Person list 做为演示数据:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));
复制代码

Collect

若是须要将 Stream 中运行的结果转成一个不一样的类型,好比 List、Set 或者 Map,collect 就很是有用。collect 操做接受由 suppileraccumulatorcombinerfinisher 等四个部分组成的 Collector 对象。听起来很复杂,但 java8 中 Collectors 类中的大量方法开箱即用,对不少通用的操做并不须要本身去实现:

注:suppiler, accumulator, combiner, finisher 都是函数式接口

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]
复制代码

很简单就能够从 Stream 中获取一个 List,若是须要一个 Set,调用 Collectors.toSet() 就好了。

下面的这个例子是经过年龄来给 person 分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
复制代码

Collectors 功能不少,还能够用来对 Stream 中的元素作聚合操做,好比计算全部 person 的平均年龄:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0
复制代码

还能够用来作统计,summarizing 会返回一个内建的统计对象,经过这个对象能够很方便的获得最大年龄、最小年龄、平均年龄等统计结果:

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
复制代码

下面的例子中把全部 person 的名字拼成了一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
复制代码

joining 接收一个间隔符和可选的前缀、后缀字符串。

为了输出 map 结果。必须指定 map 的 key 和 value。须要注意 key 必须是惟一的,不然会报 IllegalStateException 异常。能够经过传入另一个合并方法做为参数来避免这个异常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
复制代码

上面介绍了一些很强大 Collectors 的内建方法。下面来实现一个自定义的 collector。将全部 Person 的名称转成大写并输入到一个字符串中,每一个名字使用 | 来隔开。自定义的 collecotr 使用 Collecotr.of() 来实现,须要实现其中的四个部分:supplieraccumulatorcombinerfinisher

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID
复制代码

在 Java 中,String 对象是不可变的。因此须要一个 StringJoiner 来组合字符串,suppiler 实例化一个带 | 分隔符的 StringJoiner 对象。accumulator 把字符串转成大写而且放进 StringJoiner 对象,combiner 将两个 StringJoiner 对象合成一个,最后 finisher 把 StringJoiner 对象输出为 String 对象。

flatMap

在上面已经介绍了如何使用 map 将 Stream 中的对象转成另一种类型的对象。map 只能把一种类型转成另一种特定的类型,在把一种类型转成任意种类型的状况下,map 就有点受限制了。而 flatMap 正是来解决这个问题的。

flatMap 会把 Stream 中的每一个元素转成另外一个 Stream 中的其余对象。因此每一个元素依赖 STream 会被转成 0 个,1 个或者多个其余对象。这些生成的新的 stream 会在 flatMap 操做结束的时候返回。

在使用 flatMap 以前,须要定义如下的数据结构:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}
复制代码

接下来,利用 Stream 初始化一些对象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i))); 

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
复制代码

如今,生成了包含三个 foo 对象的 list,每一个 foo 对象中又包含三个 bar 对象。

flatMap 接收一个返回 Stream 对象的方法做为参数,为了分解 foo 中的每一个 bar 对象,传入一个合适的方法:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
复制代码

上面那个例子成功的把一个包含三个 foo 对象的 Stream 转成了包含 9 个 bar 对象的 Stream。

并且,上面的那些代码能够被简化成一个 Stream 管道操做:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
复制代码

flatMap 操做对 java8 中的 Optional 对象也有用,Optional 对象的操做会返回另外一个类型的 Optional 对象。因此这个特性能够用来消除空指针检查。

定义类的抽象层次以下:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}
复制代码

为了从 Outer 对象中调用 Inner 对象中的 foo 字符串,须要作不少的空指针检查来避免潜在的空指针异常:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
复制代码

这些操做能够经过 flatMap 来进行优化:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);
复制代码

每调用一次都会返回一个 Optional 对象,对象中包裹着目标对象或者 null。

Reduce

Reduce 组合 Stream 中全部的元素,而后产生一个单独的结果。Java8 支持三种 reduce 方法。第一种 reduce 对于每一个 Stream 只会返回一个元素。下面这个例子计算除了年龄最大的人的名字:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela
复制代码

reduce 方法接受一个 BinaryOperator 函数。在 Person 这个例子中,其实是一个 BiFunction,两个操做数的类型都是一致的。BiFunction 与 Function 很像,可是前者接收两个参数。这个例子中比较全部 person 的年龄属性来找出最大年龄的 person。

第二种 reduce 方法接受一个目标对象和一个 BinaryOperator。下面这个方法能够聚合全部的 person 属性来建立一个新的 person:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
复制代码

第三种 reduce 接受三个参数:一个目标对象、一个 BiFunction、一个 BinaryOperator 类型的 combiner。由于这个传入的值不必定是 Person 类型,因此咱们能够利用这个特性来计算全部 Person 年龄的总和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76
复制代码

最后的结果是 76,那么中间的计算过程是什么样的的呢?下面 debug 了计算的过程:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
复制代码

能够看到 accumulator 函数完成了全部的计算,调用的第一次获得的是初始值 0 和 Max person。而后后续的三步完成了全部年龄的的累加。在最后一步获得了全部年龄的累加结果 76。

可是上面的例子看起来稍微有点问题,由于 combiner 函数根本没有执行,可是真的是这样的吗?看下面的代码咱们就能发现秘密所在:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
复制代码

在并行执行的状况下有着彻底不一样的执行行为。在这里 combiner 执行了,accumulator 在并行状况下被执行的时候,combiner 用来累加 accumulator 的执行结果。

在下一节会详细分析并行 Stream。

并行 Stream

在输入元素数量不少的状况下,经过并行执行 Stream 能够提高执行性能。并行 Stream 使用了 ForkJoinPool,这个对象能够经过 ForkJoinPool.commonPool() 来获得。底层的线程池最多能够有五个线程,取决于物理机器能够用的 CPU 有几个。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
复制代码

在个人机器上这个线程的数量被设定为 3。这个值能够经过 JVM 的参数来进行调整:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
复制代码

Collection 对象能够经过 parallelStream() 来建立一个并行的 Stream。或者也能够对一个串行的 Stream 对象调用 parallel() 来转成并行 Stream。

为了理解 Stream 是如何并行执行的,下面这个例子把线程的状况都打印出来了:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
复制代码

经过研究 debug 输出,能够看到 Stream 执行过程当中哪些线程确实用到了:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
复制代码

并行 Stream 执行操做的过程当中用到了线程池中全部的线程。上面输出的结果顺序可能每次都是不同的,这是由于线程执行的顺序自己就是不同的。

给上面的例子加上 sort 操做:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
复制代码

执行的结果看起来有点奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
复制代码

能够看到了 sort 操做只会在主线程中执行。并行 Stream 中的 sort 操做实际用到了 Java8 中的新接口 Arrays.parallelSort()。在 Javadoc 中说明了数组的长度决定了这个排序操做是串行仍是并行执行:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上面的例子,能够发现 combiner 函数只会在并行状况下执行。下面来看一下哪些线程确实执行了:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
复制代码

上面例子的输出说明了 accumulator 和 combiner 在并行 Stream 中会在全部的可用线程上执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
复制代码

全部在输入元素的量很大的状况下,并行 Stream 会带来很大的性能提高。可是须要注意一些操做好比 reducecollect 须要额外的 combine 操做,可是在串行 Stream 中并不须要。

此外,全部的并行 Stream 都依赖 ForkJoinPool 线程池。因此应当尽可能避免实现一些阻塞 Stream 的操做,由于这样会下降那些依赖并行 Stream 的程序的性能。

(完)

原文

关注微信公众号,聊点其余的

相关文章
相关标签/搜索