《Java8实战》-第六章读书笔记(用流收集数据-02)

使用流收集数据

分区

分区是分组的特殊状况:由一个谓词(返回一个布尔值的函数)做为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着获得的分组 Map 的键类型是 Boolean ,因而它最多能够分为两组—— true 是一组, false 是一组。例如,若是你是素食者或是请了一位素食的朋友来共进晚餐,可能会想要把菜单按照素食和非素食分开:java

Map<Boolean, List<Dish>> partitionedMenu =
                // 分区函数
                menu.stream().collect(partitioningBy(Dish::isVegetarian));

这会返回下面的 Map :git

{false=[Dish{name='pork'}, Dish{name='beef'}, Dish{name='chicken'}, Dish{name='prawns'}, Dish{name='salmon'}], 
true=[Dish{name='french fries'}, Dish{name='rice'}, Dish{name='season fruit'}, Dish{name='pizza'}]}

那么经过 Map 中键为 true 的值,就能够找出全部的素食菜肴了:github

List<Dish> vegetarianDishes = partitionedMenu.get(true);

请注意,用一样的分区谓词,对菜单 List 建立的流做筛选,而后把结果收集到另一个 List中也能够得到相同的结果:算法

List<Dish> vegetarianDishes =
                        menu.stream().filter(Dish::isVegetarian).collect(toList());

分区的优点

分区的好处在于保留了分区函数返回 true 或 false 的两套流元素列表。在上一个例子中,要获得非素食 Dish 的 List ,你可使用两个筛选操做来访问 partitionedMenu 这个 Map 中 false键的值:一个利用谓词,一个利用该谓词的非。并且就像你在分组中看到的, partitioningBy工厂方法有一个重载版本,能够像下面这样传递第二个收集器:小程序

Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianDishesByType =
                menu.stream().collect(
                        // 分区函数
                        partitioningBy(Dish::isVegetarian,
                                // 第二个收集器
                                groupingBy(Dish::getType)));

这将产生一个二级 Map :安全

{false={MEAT=[Dish{name='pork'}, Dish{name='beef'}, Dish{name='chicken'}], FISH=[Dish{name='prawns'}, Dish{name='salmon'}]}, 
true={OTHER=[Dish{name='french fries'}, Dish{name='rice'}, Dish{name='season fruit'}, Dish{name='pizza'}]}}

这里,对于分区产生的素食和非素食子流,分别按类型对菜肴分组,获得了一个二级 Map,和上面的相似。再举一个例子,你能够重用前面的代码来找到素食和非素食中热量最高的菜:框架

Map<Boolean, Dish> mostCaloricPartitionedByVegetarian = menu.stream().collect(
                partitioningBy(Dish::isVegetarian, collectingAndThen(
                        maxBy(comparingInt(Dish::getCalories)),
                        Optional::get
                )));

这将产生如下结果:ide

{false=Dish{name='pork'}, true=Dish{name='pizza'}}

你能够把分区看做分组一种特殊状况。 groupingBy 和partitioningBy 收集器之间的类似之处并不止于此。函数

将数字按质数和非质数分区

假设你要写一个方法,它接受参数 int n,并将前n个天然数分为质数和非质数。但首先,找出可以测试某一个待测数字是不是质数的谓词会颇有帮助:性能

private static boolean isPrime(int candidate) {
    // 产生一个天然数范围,从2开始,直至但不包括待测数
    return IntStream.range(2, candidate)
            // 若是待测数字不能被流中任何数字整除则返回 true
            .noneMatch(i -> candidate % i == 0);
}

一个简单的优化是仅测试小于等于待测数平方根的因子:

private static boolean isPrime(int candidate) {
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return IntStream.rangeClosed(2, candidateRoot)
            .noneMatch(i -> candidate % i == 0);
}

如今最主要的一部分工做已经作好了。为了把前n个数字分为质数和非质数,只要建立一个包含这n个数的流,用刚刚写的 isPrime 方法做为谓词,再给 partitioningBy 收集器归约就行了:

private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(
                    partitioningBy(candidate -> isPrime(candidate)));
}

如今咱们已经讨论过了 Collectors 类的静态工厂方法可以建立的全部收集器,并介绍了使用它们的实际例子。

收集器接口

Collector 接口包含了一系列方法,为实现具体的归约操做(即收集器)提供了范本。咱们已经看过了 Collector 接口中实现的许多收集器,例如 toList 或 groupingBy 。这也意味着,你能够为 Collector 接口提供本身的实现,从而自由地建立自定义归约操做。

要开始使用 Collector 接口,咱们先看看本章开始时讲到的一个收集器—— toList 工厂方法,它会把流中的全部元素收集成一个 List 。咱们当时说在平常工做中常常会用到这个收集器,并且它也是写起来比较直观的一个,至少理论上如此。经过仔细研究这个收集器是怎么实现的,咱们能够很好地了解 Collector 接口是怎么定义的,以及它的方法所返回的函数在内部是如何为collect 方法所用的。

首先让咱们在下面的列表中看看 Collector 接口的定义,它列出了接口的签名以及声明的五个方法。

public interface Collector<T, A, R> {
        Supplier<A> supplier();
        BiConsumer<A, T> accumulator();
        Function<A, R> finisher();
        BinaryOperator<A> combiner();
        Set<Characteristics> characteristics();
}

本列表适用如下定义。

  1. T 是流中要收集的项目的泛型。
  2. A 是累加器的类型,累加器是在收集过程当中用于累积部分结果的对象。
  3. R 是收集操做获得的对象(一般但并不必定是集合)的类型。

例如,你能够实现一个 ToListCollector<T> 类,将 Stream<T> 中的全部元素收集到一个List<T> 里,它的签名以下:

public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

咱们很快就会澄清,这里用于累积的对象也将是收集过程的最终结果。

理解 Collector 接口声明的方法

如今咱们能够一个个来分析 Collector 接口声明的五个方法了。经过分析,你会注意到,前四个方法都会返回一个会被 collect 方法调用的函数,而第五个方法 characteristics 则提供了一系列特征,也就是一个提示列表,告诉 collect 方法在执行归约操做的时候能够应用哪些优化(好比并行化)。

1. 创建新的结果容器: supplier 方法

supplier 方法必须返回一个结果为空的 Supplier ,也就是一个无参数函数,在调用时它会建立一个空的累加器实例,供数据收集过程使用。很明显,对于将累加器自己做为结果返回的收集器,好比咱们的 ToListCollector ,在对空流执行操做的时候,这个空的累加器也表明了收集过程的结果。在咱们的 ToListCollector 中, supplier 返回一个空的 List ,以下所示:

@Override
public Supplier<List<T>> supplier() {
    return () -> new ArrayList<>();
}

请注意你也能够只传递一个构造函数引用:

@Override
public Supplier<List<T>> supplier() {
    return ArrayList::new;
}

2. 将元素添加到结果容器: accumulator 方法

accumulator 方法会返回执行归约操做的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数:保存归约结果的累加器(已收集了流中的前 n-1 个项目),还有第n个元素自己。该函数将返回void ,由于累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。对于ToListCollector ,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:

@Override
public BiConsumer<List<T>, T> accumulator() {
    return (list, item) -> list.add(item);
}

你也可使用方法引用,这会更为简洁:

@Override
public BiConsumer<List<T>, T> accumulator() {
    return List::add;
}

3. 对结果容器应用最终转换: finisher 方法

在遍历完流后, finisher 方法必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操做的最终结果。一般,就像 ToListCollector 的状况同样,累加器对象刚好符合预期的最终结果,所以无需进行转换。因此 finisher 方法只需返回 identity 函数:

@Override
public Function<List<T>, List<T>> finisher() {
    return Function.identity();
}

这三个方法已经足以对流进行循序规约。实践中的实现细节可能还要复杂一点,一方面是应为流的延迟性质,可能在collect操做以前还需完成其余中间操做的流水线,另外一方面则是理论上可能要进行并行规约。

4. 合并两个结果容器: combiner 方法

四个方法中的最后一个————combiner方法会返回一个供归约操做的使用函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。对于toList而言,这个方法的实现很是简单,只要把从流的第二个部分收集到的项目列表加到遍历第一部分时获得的列表后面就好了:

@Override
public BinaryOperator<List<T>> combiner() {
    return (list1, list2) -> {
        list1.addAll(list2);
        return list1;
    };
}

有了这第四个方法,就能够对流进行并行归约了。它会用到Java7中引入的分支/合并框架和Spliterator抽象。

5. characteristics 方法

最后一个方法—— characteristics 会返回一个不可变的 Characteristics 集合,它定义了收集器的行为——尤为是关于流是否能够并行归约,以及可使用哪些优化的提示。Characteristics 是一个包含三个项目的枚举。

  1. UNORDERED ——归约结果不受流中项目的遍历和累积顺序的影响。
  2. CONCURRENT —— accumulator 函数能够从多个线程同时调用,且该收集器能够并行归约流。若是收集器没有标为 UNORDERED ,那它仅在用于无序数据源时才能够并行归约。
  3. IDENTITY_FINISH ——这代表完成器方法返回的函数是一个恒等函数,能够跳过。这种状况下,累加器对象将会直接用做归约过程的最终结果。这也意味着,将累加器 A 不加检查地转换为结果 R 是安全的。

咱们迄今开发的 ToListCollector 是 IDENTITY_FINISH 的,由于用来累积流中元素的List 已是咱们要的最终结果,用不着进一步转换了,但它并非 UNORDERED ,由于用在有序流上的时候,咱们仍是但愿顺序可以保留在获得的 List 中。最后,它是 CONCURRENT 的,但咱们刚才说过了,仅仅在背后的数据源无序时才会并行处理。

所有融合到一块儿

前一小节中谈到的五个方法足够咱们开发本身的 ToListCollector 了。你能够把它们都融合起来,以下面的代码清单所示。

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
    @Override
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

    @Override
    public Function<List<T>, List<T>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
    }
}

请注意,这个是实现与Collections.toList()方法并不彻底相同,但区别仅仅是一些小的优化。这些优化的一个主要方面是Java API所提供的收集器在须要返回空列表时使用了 Collections.emptyList() 这个单例(singleton)。这意味着它可安全地替代原生Java,来收集菜单流中的全部 Dish 的列表:

List<Dish> dishes = menuStream.collect(new ToListCollector<>());

这个实现和标准的

List<Dish> dishes = menuStream.collect(toList());

构造之间的其余差别在于 toList 是一个工厂,而 ToListCollector 必须用 new 来实例化。

进行自定义收集而不去实现 Collector

对于 IDENTITY_FINISH 的收集操做,还有一种方法能够获得一样的结果而无需从头实现新的 Collectors 接口。 Stream 有一个重载的 collect 方法能够接受另外三个函数—— supplier 、accumulator 和 combiner ,其语义和 Collector 接口的相应方法返回的函数彻底相同。因此好比说,咱们能够像下面这样把菜肴流中的项目收集到一个 List 中:

List<Dish> dishes = menuStream.collect(
                ArrayList::new,
                List::add,
                List::addAll);

咱们认为,这第二种形式虽然比前一个写法更为紧凑和简洁,却不那么易读。此外,以恰当的类来实现本身的自定义收集器有助于重用并可避免代码重复。另外值得注意的是,这第二个collect 方法不能传递任何 Characteristics ,因此它永远都是一个 IDENTITY_FINISH 和CONCURRENT 但并不是 UNORDERED 的收集器。

在下一节中,咱们一块儿来实现一个收集器的,让咱们对收集器的新知识更上一层楼。你将会为一个更为复杂,但更为具体、更有说服力的用例开发本身的自定义收集器。

开发你本身的收集器以得到更好的性能

咱们用 Collectors 类提供的一个方便的工厂方法建立了一个收集器,它将前n个天然数划分为质数和非质数,以下所示。

将前n个天然数按质数和非质数分区:

private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(
                    partitioningBy(candidate -> isPrime(candidate)));
}

当时,经过限制除数不超过被测试数的平方根,咱们对最初的 isPrime 方法作了一些改进:

private static boolean isPrime(int candidate) {
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return IntStream.rangeClosed(2, candidateRoot)
            .noneMatch(i -> candidate % i == 0);
}

还有没有办法来得到更好的性能呢?答案是“有”,但为此你必须开发一个自定义收集器。

仅用质数作除数

一个可能的优化是仅仅看看被测试数是否是可以被质数整除。要是除数自己都不是质数就用不着测了。因此咱们能够仅仅用被测试数以前的质数来测试。然而咱们目前所见的预约义收集器的问题,也就是必须本身开发一个收集器的缘由在于,在收集过程当中是没有办法访问部分结果的。这意味着,当测试某一个数字是不是质数的时候,你无法访问目前已经找到的其余质数的列表。

假设你有这个列表,那就能够把它传给 isPrime 方法,将方法重写以下:

private static boolean isPrime(List<Integer> primes, int candidate) {
    return primes.stream().noneMatch(i -> candidate % i == 0);
}

并且还应该应用先前的优化,仅仅用小于被测数平方根的质数来测试。所以,你须要想办法在下一个质数大于被测数平方根时当即中止测试。不幸的是,Stream API中没有这样一种方法。你可使用 filter(p -> p <= candidateRoot) 来筛选出小于被测数平方根的质数。但 filter要处理整个流才能返回恰当的结果。若是质数和非质数的列表都很是大,这就是个问题了。你用不着这样作;你只需在质数大于被测数平方根的时候停下来就能够了。所以,咱们会建立一个名为 takeWhile 的方法,给定一个排序列表和一个谓词,它会返回元素知足谓词的最长前缀:

public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {
    int i = 0;
    for (A item : list) {
        if (!p.test(item)) {
            return list.subList(0, i);
        }
        i++;
    }
    return list;
}

利用这个方法,你就能够优化 isPrime 方法,只用不大于被测数平方根的质数去测试了:

private static boolean isPrime(List<Integer> primes, int candidate){
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return takeWhile(primes, i -> i <= candidateRoot)
            .stream()
            .noneMatch(p -> candidate % p == 0);
}

请注意,这个 takeWhile 实现是即时的。理想状况下,咱们会想要一个延迟求值的takeWhile ,这样就能够和 noneMatch 操做合并。不幸的是,这样的实现超出了本章的范围,你须要了解Stream API的实现才行。

有了这个新的 isPrime 方法在手,你就能够实现本身的自定义收集器了。首先要声明一个实现 Collector 接口的新类,而后要开发 Collector 接口所需的五个方法。

1. 第一步:定义 Collector 类的签名

让咱们从类签名开始吧,记得 Collector 接口的定义是:

public interface Collector<T, A, R>

其中 T 、 A 和 R 分别是流中元素的类型、用于累积部分结果的对象类型,以及 collect 操做最终结果的类型。这里应该收集 Integer 流,而累加器和结果类型则都是 Map<Boolean,List<Integer>>,键是 true 和 false ,值则分别是质数和非质数的 List :

public class PrimeNumbersCollector implements Collector<Integer, Map<Boolean, List<Integer>>,
        Map<Boolean, List<Integer>>>

2. 第二步:实现归约过程

接下来,你须要实现 Collector 接口中声明的五个方法。 supplier 方法会返回一个在调用时建立累加器的函数:

@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
    return () -> new HashMap<Boolean, List<Integer>>(2) {
        {
            put(true, new ArrayList<>());
            put(false, new ArrayList<>());
        }
    };
}

这里不但建立了累积器的Map,还为true和false两个键下面出实话了对应的空列表。在收集过程当中会把质数和非指数分别添加到这里。收集器重要的方法是accumulator,由于它定义了如何收集流中元素的逻辑。这里它也是实现了前面所讲的优化的关键。如今在任何一次迭代中,均可以访问收集过程的部分结果,也就是包含迄今找到的质数的累加器:

@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
    return ((Map<Boolean, List<Integer>> acc, Integer candidate) -> acc.get(isPrime(acc.get(true), candidate)).add(candidate));
}

在这个个方法中,你调用了isPrime方法,将待测试是否为质数的数以及迄今为止找到的质数列表(也就是累积Map中true键对应的值)传递给它。此次调用的结果随后被用做获取质数或非质数列表的键,这样就能够把新的被测数添加到恰当的列表中。

3.第三步:让收集器并行工做(若是可能)

下一个方法要在并行收集时把两个部分累加器合并起来,这里,它只须要合并两个Map,即将第二个Map中质数和非质数列表中的全部数字合并到第一个Map的对应列表中就好了:

@Override
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
    return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
        map1.get(true).addAll(map2.get(true));
        map1.get(false).addAll(map2.get(false));
        return map1;
    };
}

请注意,实际上这个收集器是不能并行的,由于该算法自己是顺序的。这意味着永远都不会调用combiner方法,你能够把它的实现留空。为了让这个例子完整,咱们仍是决定实现它。

4.第四步:finisher方法和收集器的characteristics方法

最后两个方法实现都很简单。前面说过,accumulator正好就是收集器的结果,也用不着进一步转换,那么finisher方法就返回identity函数:

@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
    return Function.identity();
}

就characteristics方法而言,咱们已经说过,它既不是CONCURRENT也不是UNOREDERED,但倒是IDENTITY_FINISH的:

@Override
public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
}

如今,你能够用这个新的自定义收集器来替代partitioningBy工厂方法建立的那个,并得到彻底相同的结果了:

private static Map<Boolean, List<Integer>> partitionPrimesWithCustomCollector(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(new PrimeNumbersCollector());
}
Map<Boolean, List<Integer>> primes = partitionPrimesWithCustomCollector(10);
// {false=[4, 6, 8, 9, 10], true=[2, 3, 5, 7]}
System.out.println(primes);

收集器性能比较

用partitioningBy工厂方法穿件的收集器和你刚刚开发的自定义收集器在功能上是同样的,可是咱们没有实现用自定义收集器超越partitioningBy收集器性能的目标呢?如今让咱们写个小程序测试一下吧:

public class CollectorHarness {
    public static void main(String[] args) {
        long fastest = Long.MAX_VALUE;
        // 运行十次
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            // 将前100万个天然数按指数和非质数区分
            partitionPrimes(1_000_000);
            long duration = (System.nanoTime() - start) / 1_000_000;
            // 检查这个执行是不是最快的一个
            if (duration < fastest) {
                fastest = duration;
            }
            System.out.println("done in " + duration);
        }
        System.out.println("Fastest execution done in " + fastest + " msecs");
    }
}

在因特尔I5 6200U 2.4HGz的笔记上运行获得如下的结果:

done in 976
done in 1091
done in 866
done in 867
done in 760
done in 759
done in 777
done in 894
done in 765
done in 763
Fastest execution done in 759 msecs

如今把测试框架的 partitionPrimes 换成 partitionPrimesWithCustomCollector ,以便测试咱们开发的自定义收集器的性能。

public class CollectorHarness {
    public static void main(String[] args) {
        excute(PrimeNumbersCollectorExample::partitionPrimesWithCustomCollector);
    }

    private static void excute(Consumer<Integer> primePartitioner) {
        long fastest = Long.MAX_VALUE;
        // 运行十次
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            // 将前100万个天然数按指数和非质数区分
            // partitionPrimes(1_000_000);
            primePartitioner.accept(1_000_000);
            long duration = (System.nanoTime() - start) / 1_000_000;
            // 检查这个执行是不是最快的一个
            if (duration < fastest) {
                fastest = duration;
            }
            System.out.println("done in " + duration);
        }
        System.out.println("Fastest execution done in " + fastest + " msecs");
    }
}

如今,程序打印:

done in 703
done in 649
done in 715
done in 434
done in 386
done in 403
done in 449
done in 416
done in 353
done in 405
Fastest execution done in 353 msecs

还不错!看来咱们没有白费功夫开发这个自定义收集器。

总结

  1. collect 是一个终端操做,它接受的参数是将流中元素累积到汇总结果的各类方式(称为收集器)。
  2. 预约义收集器包括将流元素归约和汇总到一个值,例如计算最小值、最大值或平均值。
  3. 预约义收集器能够用 groupingBy 对流中元素进行分组,或用 partitioningBy 进行分区。
  4. 收集器能够高效地复合起来,进行多级分组、分区和归约。
  5. 你能够实现 Collector 接口中定义的方法来开发你本身的收集器。

代码

Github:chap6

Gitee:chap6

相关文章
相关标签/搜索