Java 8 Stream 教程

Java 8 Stream Tutorial


本文采用实例驱动的方式,对JAVA8的stream API进行一个深刻的介绍。虽然JAVA8中的stream API与JAVA I/O中的InputStream和OutputStream在名字上比较相似,可是实际上是另一个东西,Stream API是JAVA函数式编程中的一个重要组成部分。html

本文描述如何使用JAVA8的Stream API。经过本文,你能够了解Stream API的执行顺序,不一样的执行顺序会对stream api的执行效率有较大的影响。本文会详细描述Stream API中的reducecollectflatMap等操做,结尾部分会深刻讲解parallel streamsjava

若是你对JAVA8中新增的概念:lambda表达式,函数式接口,方法引用不熟悉。能够从:Java 8 Tutorial一文中获取相关的知识。git


Streams如何工做?

stream是一个能够对个序列中的元素执行各类计算操做的一个元素序列。github

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

stream包含中间(intermediate operations)和最终(terminal operation)两种形式的操做。中间操做(intermediate operations)的返回值仍是一个stream,所以能够经过链式调用将中间操做(intermediate operations)串联起来。最终操做(terminal operation)只能返回void或者一个非stream的结果。在上述例子中:filter, map ,sorted是中间操做,而forEach是一个最终操做。更多关于stream的中可用的操做能够查看java doc。上面例子中的链式调用也被称为操做管道流。shell

大多stream操做接受某种形式的lambda表达式做为参数,经过方法接口的形式指定操做的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,好比上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操做的执行是独立的,好比上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。编程


streams分类

能够从不一样的数据源建立stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,经过这些方法能够建立一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams),api

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

List对象上调用stream()方法能够返回一个常规的对象流。在下面的例子中咱们不须要建立一个collection对象也可使用stream:数组

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

直接使用Stream.of()方法就能从一组对象建立一个stream对象,多线程

除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流可以处理基本数据类型如:int,long,double。好比:IntStream可使用range()方法可以替换掉传统的for循环并发

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,可是基本类型流(primitive streams)能使用一些特殊的lambda表达式,好比:用IntFunction代替Function,用IntPredicate代替Predicate,同时基本类型流(primitive streams)中能够支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

能够经过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的相互转换

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

下面这个例子中doubles stream先被映射成int stream,而后又被映射成String类型的对象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

处理顺序

前面描述了如何建立和使用各类stream,如今开始深刻了解stream执行引擎的工做原理。

Laziness(延迟加载)是中间操做(intermediate operations)的一个重要特性。以下面这个例子:中间操做(terminal operation)缺失,当执行这个代码片断的时候,并不会在控制台打印相应的内容,这是由于只有最终操做(terminal operation)存在的时候,中间操做(intermediate operations)才会执行。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

给上面的例子添加最终操做(terminal operation)forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

执行结果以下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

执行结果比较让人惊奇,想固然的作法是水平执行此流上的全部元素。可是其实是每个元素沿着链垂直移动,第一个字符串"d2"执行完filterforEach后第二个元素"a2"才开始执行。

这种沿着链垂直移动的行为能够下降每个元素上进行操做的数量,如咱们在下面的例子中所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {最终操做
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

当对给定元素执行判断为真时anyMatch操做会马上返回true,在上面例子中执行到元素“A2”的时候,元素判断为真anyMatch马上返回true,因为流是沿着链垂直移动的,所以上面的map操做只会执行两次。

注:stream的执行流程相似shell中管道:ps xxx | grep "sss" | grep "ccc",是按照输入行的形式进行处理。


执行效率与steream执行链顺序的关系

下面的例子由两个中间操做(intermediate operations)map和filter以及一个最终操做(terminal operation)forEach构成,咱们观察这些动做是如何执行的。

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

你可能已经猜测到:mapfilter操做被执行了5次,可是forEach操做只被执行了1次。咱们能够经过修改操做的执行顺序(如:将filter操做移到操做链的头部),大幅度下降执行次数

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1中间操做
// filter:  b3
// filter:  c

修改后map只被执行了1次,若是此时数据量比较大则操做管道的执行效率会有较大的提高,在处理复杂方法链的时候须要注意执行顺序对执行效率的影响。

给上面的例子添加sort操做。

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

执行结果以下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

Sorting 是一种特殊的中间操做(intermediate operation),在对集合中元素进行排序过程当中须要保存元素的状态,所以Sorting 是一种有状态的操做(stateful operation)。

首先,在整个输入集上执行排序操做(即先对集合进行水平操做),因为输入集合中的元素间存在多种组合,所以上面的例子中sorted操做被执行了8次。

能够经过对执行链重排序的方式,提高stream的执行效率。修改执行链顺序以后因为filter操做的过滤,致使sorted操做的输入集只有一个元素,在大数据量的状况下可以大幅度提升执行效率。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

流复用

Java 8 streams不能被复用,当你执行完任何一个最终操做(terminal operation)的时候流就被关闭了。

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一个stream中执行完anyMatch后再执行noneMatch就会抛出以下异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

能够经过为每一个最终操做(terminal operation)建立一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个stream supplier类来在已经存在的中间操做(intermediate operations )的stream基础上构建一个新的stream。

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

streamSupplier的每一个get()方法会构造一个新的stream,咱们能够在这个stream上执行指望的最终操做(terminal operation)。


高级操做

Streams支持多种不一样的操做(operations),咱们已经了解过filter,map等比较重要的操做。你能够经过Stream Javadoc进一步了解更多的操做。如今咱们开始深刻探讨更复杂的操做:collect flatMap reduce

假设存在以下的用户列表:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect(收集)

Collect(收集)是一种是十分有用的最终操做,它能够把stream中的元素转换成另一种形式,好比;list,set,map。Collect使用Collector做为参数,Collector包含四种不一样的操做:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,可是一个好消息是java 8经过Collectors类内置了各类复杂的收集操做,所以对于大部分经常使用的操做来讲,你不须要本身去实现collector类。

从一个十分常见的用类开始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

经过上面的demo能够看出,将stream转换为List十分简单,若是想转换为Set的话,只需使用Collectors.toSet()就能够了。

下面的例子暂时将用户按年龄分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors类功能繁多,你能够经过Collectors对stream中的元素进行汇聚,好比:计算全部用户的年纪。

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

能够经过summarizing collectors能返回一个内置的统计对象,经过这个对象可以获取更加全面的统计信息,好比用户年纪中的最大值,最小值,平均年纪等结果。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展现如何将全部用户链接成一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

join collector的三个参数分别表示:链接符,字符串前缀,字符串后缀(可选)。

将一个stream转换为map,咱们必须指定map的key和value如何映射。要注意的是key的值必须是惟一性的,不然会抛出IllegalStateException,可是能够经过使用合并函数(可选)绕过这个IllegalStateException异常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

前文已经介绍了jdk内置的一些颇有用的collectors,接下来开始介绍如何构造咱们本身所需的collector,咱们的目标是将stream中全部用户的用户名变成大写并用"|"符号链接成一个字符串。为了达成这个目标咱们经过Collector.of()方法建立了一个新的collector,咱们必须给这个collector提供四种功能:supplier, accumulator, combiner,finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

因为JAVA中String是一个不可变对象,所以咱们须要一个辅助类(好比StringJoiner)来帮助collect构造咱们的字符串。supplier建立了一个包含适当分隔符的StringJoiner对象,accumulator用来将每一个用户名转为大写并添加到supplier建立的StringJoiner中,combiner将两个StringJoiners对象链接成一个,最后一步的finisher从StringJoiner中构建出所但愿的获得的string对象。


FlatMap

咱们已经了解:经过map方法能够将stream中的一种对象转换成另一种对象。可是map方法仍是有使用场景限制,只能将一种对象映射为另一种特定的已经存在的对象。是否可以将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。

FlatMap方法能够将一个stream中的每个元素对象转换为另外一个stream中的另外一种元素对象,所以能够将stream中的每一个对象改形成零,一个或多个。flatMap操做的返回流包含这些改造后的对象。

为了演示flatMap,定义一个继承关系以下:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

经过流实例化一队对象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

完成上述操做以后咱们获得三个foos,每一个foos包含三个bars。

FlatMap接收一个返回值为stream的函数作参数,经过传递合适的函数,就能够解析每个foo下对应的bar对象

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

正如所见,咱们成功地将三个对象的stream转换成一个包含九个对象的stream

最后,上面的示例代码能够简化为一个单一管道流:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional类,Optionals flatMap能返回一个另外的类的optional包装类,能够用来减小对null的检查。

假设有以下这种多层级结构:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

为了获取内部outer实例的内部foo对象,须要添加一系列空指针判断

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

能够采用optionals flatMap 操做得到相同的结果:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

上面的例子中flatMap的每次调用都会返回一个用Optional对象,若是有返回值则这个Optional对象是这个返回值的包装类,若是返回值不存在则返回null。


Reduce(减小)

reduce操做能够将stream中全部元素组合起来获得一个元素,JAVA8支持三中不一样的reduce方法。

第一种能从stream元素序列中提取一个特定的元素。好比下面的从用户列表中选择年纪最大的用户操做:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

上面的实例中reduce方法接收一个二元累加计算函数(BinaryOperator accumulator function)做为参数,二元操做(BinaryOperator)实际就是上在两个操做数共享同一类型。示例中函数比较两人年龄,返回的最大年龄的人。

第二种reduce操做接收一个标识值和一个二元操做累加器做为参数,这个reduce方法能够把stream中全部用户的名字和年龄汇总获得一个新用户。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三种reduce方法,接收三个参数:一个标示值(identity value),一个二元操做累加器(BiFunction accumulator),一个二元组合方法。因为标识符参数未被严格限制为person类型,所以咱们能够用这个reduce方法来获取用户的总年龄。

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

计算的结果是76,经过添加调试输出,咱们能够详细地了解执行引擎中发生了什么。

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

从调试输出中能够看到,累加器作了全部的工做,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值因为累加不断变大,在最后一步汇总的年纪增加到76。

注意,上面的调试输出中combiner没有执行,经过parallel执行上面相同stream。

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

Executing this stream in parallel results in an entirely different execution behavior. Now the combiner is actually called. Since the accumulator is called in parallel, the combiner is needed to sum up the separate accumulated values.

经过并行的方式执行上面的stream操做,获得的是另一种彻底不相同的执行动做。在并行stream中combiner方法会被调用。这是因为累加器是被并行调用的,所以组合器须要对分开的累加操做进行求和。

下一章会详细描述并行stream。


Parallel Streams(并行流)

为了提升大量输入时的执行效率,stream能够采用并行的放行执行。并行流(Parallel Streams)经过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(其实是由底层可用的物理cpu核数决定的)。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:
在个人机器上公共池初始化为每一个默认3并行,这个值能够经过调整jvm参数来修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections中包含parallelStream()方法,经过这个方法可以为Collections中的元素建立并行流。另外也能够调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。

为了了解并行流的执行动做,下面的例子会打印当前线程的执行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

执行的结果以下:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

经过分析调试输出,咱们能够更好地了解哪个线程执行了哪些stream操做。从上面的输出中咱们能够看到parallel stream使用了ForkJoinPool提供的全部可用的线程来执行流的各类操做。因为不能肯定哪一个线程会执行并行流的哪一个操做,所以反复执行上面的代码,打印的结果会不一样。

扩充上面的例子,添加sort操做

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

执行结果以下:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

这个执行结果看起来比较奇怪,看起来sort操做只是在main线程中顺序执行的。实际上,parallel stream中的sort操做使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操做是顺序执行仍是并行执行。java doc 描述以下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一章的例子,咱们已经了解combiner方法只能在parallel streams中调用,让咱们来看下那些线程被实际调用:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

执行结果以下:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

从控制台输出能够看到accumulator和combiner操做都被可用的线程并行执行了。

总结起来:在大数据量输入的时候,parallel streams能够带来比较大的性能提高。可是应该记住,一些并行操做,好比:reduce,collect须要额外的计算(组合操做),可是在顺序流中,这些组合操做是不须要的。

另外,咱们知道全部的parallel stream操做共享一个jvm范围内的ForkJoinPool,因此你应该注意避免在parallel stream上执行慢阻塞流操做,由于这些操做可能致使你应用中依赖parallel streams操做的其余部分也会响应变慢。


结尾

若是你想更多了解JAVA 8 的stream,你能够阅读stream的JAVA doc,若是你想更深刻了解stream的底层机制,你能够阅读Martin Fowlers的文章Collection Pipelines

若是你对js也感兴趣,你能够查看Stream.js(一个用js实现的java 8 stream api),你也能够查看我写的java8教程。

但愿这个教程对你有帮助,你也喜欢阅读这个教程。这个教程的源码和例子在github上,你能够免费fork或者在twitter上给我反馈。


相关文章
相关标签/搜索