Java8新特性之:流(三)

四. 用流收集数据java

收集器 -- Stream API的功能安全

    reduce方法和collect的区别:reduce方法旨在把两个值结合起来生成一个新的值,他是一个不可变的归约。而collect方法的设计就是要改变容器,从而累积要输出的结果。数据结构

    1.分组:groupingBy框架

        -- 多级分组ide

            Collectors.groupingBy工厂方法建立收集器,它除了普通的分类函数外,还能够接受collector类型的第二个参数。函数

            进行二级分组时能够把一个内层groupingBy传递给外层groupingBy,并定义一个为流中项目分类的二级标准。性能

            eg:测试

//多级分组:先根据种类分组,再在种类中根据卡路里进行分组
public void testDoubleGroup() {
    Map<DishType, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = getMenu().stream().
            collect(
                Collectors.groupingBy(Dish:: getType, //一级分类函数
                    Collectors.groupingBy(dish -> { //二级分类函数
                            if(dish.getCalories() <= 400) return CaloricLevel.DIET;
                            else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                            else return CaloricLevel.FAT;
                        })
                )
            );
}

        -- 按子组收集数据编码

            把收集器的结果转换为另外一种类型,可使用Collectors.collectingAndThen工厂方法返回的收集器。spa

            与groupingBy联合使用的其余收集器:经过groupingBy工厂方法的第二个参数传递的收集器将会对分到同一组中的全部流元素执行进一步的规约操做。

            eg:

//按子组收集数据:与groupingby联合使用的其余收集器
public void testGroupMap() {
    Map<DishType, Integer> totalCaloriesByType = getMenu().stream().collect(Collectors.groupingBy(Dish::getType, Collectors.summingInt(Dish::getCalories)));
    for (Map.Entry<DishType, Integer> entry : totalCaloriesByType.entrySet()) {
        System.out.println("key = " + entry.getKey() + " and value = " + entry.getValue());
    }
}
//按子组收集数据
public void testGroupByMaxType() {
    Map<DishType, Dish> mostValoricByType = getMenu().stream()
        .collect(
            Collectors.groupingBy( //最外层
                Dish::getType, //根据菜肴的类型把菜单流分组,获得三个子流
                Collectors.collectingAndThen( //分组操做的每一个子流都用这第二个收集器作进一步规约
                    Collectors.maxBy(Comparator.comparingInt(Dish::getCalories)), //第三个收集器maxBy
                    Optional::get //由规约收集器进行子流的规约操做并进行Optional get转换函数获得各种型中热量最高的Dish
                )
            )
        );
    for (Map.Entry<DishType, Dish> entry : mostValoricByType.entrySet()) {
        System.out.println("key = " + entry.getKey() + " and value = " + entry.getValue().getName());
    }
}


    2.分区:partitioningBy

        分区是分组的特殊状况。

        分区:由一个谓词(返回一个布尔值的函数)做为分类函数,它成分区函数。

        分区函数返回一个布尔值,故获得的分组Map的键类型是Boolean,最多能够分为两组(true,false)。

        由于分区是分组的特殊状况,分区函数也能进行二级分组和将收集器结果转换为另外一种类型。

        Collectors类的静态工厂方法:

工厂方法

返回类型

用于

例子

toList

List<T>

把流中全部项目收集到一个List

List<Dish> dishes = menu.Stream.collect(toList());

toSet

Set<T>

把流中全部项目收集到一个Set,删除重复项

Set<Dish> dishes = menu.Stream.collect(toSet());

toCollection

Collection<T>

把流中全部项目收集到给定的供应源建立的集合

Collection<Dish> dishes = menuStream.collect(toCollection(), ArrayList::new);

counting

Long

计算流中元素的个数

long howManyDishes = menuStream.collection(counting());

summingInt

Integer

对流中项目的一个整数属性求和

int totalCalories = menuStream.collect(summingInt(Dish::getCalories));

averagingInt

Double

计算流中项目Integer属性的平均值

double avgCalories = menuStream.collect(averagingInt(Dish::getCalories));

summarizingInt

IntSummaryStatistics

收集关于流中项目Integer属性的统计值,例如最大/最小/总和与平均值

IntSummaryStatistics = menuStream.collect(summarizingInt(Dish::getCalories));

joining

String

链接对流中每一个项目调用toString方法所生成的字符串

String shortMenu = menuStream.map(Dish::getName).collect(joining(", "));

maxBy

Optional<T>

一个包裹了流中按照给定比较器选出最大元素的Optional,或若是流为空Optional.empty()

Optional<Dish> fattest = menuStream.collect(maxBy(comparingInt(Dish::getCaleries)));

minBy

Optional<T>

一个包裹了流中按照给定比较器选出最小元素的Optional,或若是流为空Optional.empty()

Optional<Dish> lightest = menuStream.collect(minBy(comparingInt(Dish::getCaleries)));

reducing

规约操做产生的类型

从一个做为累加器的初始值开始,利用BinaryOperator与流中元素逐个结合,从而将流规约为单个值

int totalCalories = menuStream.collect(reducing(0, Dish::getCalories, Integer::sum));

collectingAndThen

转换函数返回的类型

包裹另外一个收集器,对其结果应用转换函数

int howManyDishes = menuStream.collect(collectingAndThen(toList(), List::size));

groupingBy

Map<K, List<T>>

根据项目的一个属性的值对流中的项目做问组,并将属性值做为结果Map的键

Map<Dish.Type, List<Dish>> dishesByType = menuStream.collect(groupingBy(Dish::getType));

partitioningBy

Map<Boolean, List<T>>

根据对流中每一个项目应用谓词的结果来对项目进行分区

Map<Boolean, List<Dish>> vegetarianDishes = menuStream.collect(partitioningBy(Dish::isVegetarian));


    3.Collector接口

public interface Collector<T, A, R> {
    Supplier<A> supplier();        //创建新的结果容器
    BiConsumer<A, T> accumulator();    //将元素添加到结果容器
    Function<A, R> finisher();                //对结果容器应用最终转换(返回累积过程的最后一个要调用的函数)
    BinaryOperator<A> combiner();        //合并两个结果容器
    Set<Characteristics> characteristics();    //返回Characteristic集合,定义了收集器的行为(尤为是关于流是否能够并行规约)
}

        -- T是流中要收集的项目的泛型;

        -- A是累加器的类型,累加器是在收集过程当中用于累积部分结果的对象;

        -- R是收集操做获得的对象(一般单并不必定是集合)的类型。

        --Characteristic是一个包含三个项目的枚举:

            UNORDERED:归约结果不受流中项目的遍历和累积顺序的影响。

            CONCURRENT:accumulator函数能够从多个线程同时调用,且该收集器能够并行归约流。若是收集器没有标为UNORDERED,那它仅在用于无序数据源时才能够并行归约。

            INDENTITY_FINISH:代表完成器方法返回的函数是一个恒等函数,能够跳过。这种状况下,累加器对象会直接用做归约过程的最终结果。这也意味着,累加器A不加检查地转换为结果R是安全的。

        eg:

/**
* ToListCollector是IDENTITY_FINISH的,由于用来累积流元素中的List已是咱们要的最终结果,用不着进一步转换,
* 但它并非UNORDERED,由于用在有序流上的时候,咱们仍是但愿顺序可以保留在获得的List中。
* 最后,他是CONCURRENT的,但它仅仅在背后的数据源无序时才会并行处理
* @param <T>
*/
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
    /**
    * 建立集合操做点的起始点
    * @return
    */
    @Override
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }
    /**
    * 累积遍历过的项目,原位修改累加器
    * @return
    */
    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }
    /**
    * 恒等函数
    * @return
    */
    @Override
    public Function<List<T>, List<T>> finisher() {
        return Function.identity();
    }
    /**
    *修改第一个累加器,
    * 将其与第二个累加器的内容合并
    * 返回修改后的第一个累加器
    * @return
    */
    @Override
    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }
    /**
    *为收集器添加IDENTITY_FINISH, CONCURRENT标记
    * @return
    */
    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
    }
}



五. 并行数据处理与性能

    正确高效地使用并行流

    1.并行流

        并行流:是一个把内容分红多个数据块,并用不一样的线程分别处理每一个数据块的流。

        能够经过对收集源调用parallelStream方法来把集合转换为并行流。

        并行流从哪儿来,有多少个,如何自定义:

            并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你处理器的数量,这个值是由Runtime.getRuntime().availableProcessors()获得的。

            能够经过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小。这是一个所有设置。

        并行化的代价:

            并行化过程自己须要对流作递归划分,把每一个子流的概括操做分配到不一样的线程,而后把这些操做的结果合并成一个值。

            在多个内核之间移动数据的代价也可能比你想象的要大。

            因此要保证在内核中并行执行工做的时间比在内核之间传输数据时间长。必须用对并行Stream,不要采用不易并行haunted的操做如iterate。

        使用并行流时要避免共享可变状态。

        高效使用并行流:

            -- 测试;

            -- 装箱。自动装箱和拆箱操做会大大下降性能。java8中有原始流类型(IntStream,LongStream,DoubleStream)来避免这种操做,单反有可能都应该使用这些流。

            -- 有些操做自己在并行流上的性能就比顺序流差。limit,findFirst等依赖元素顺序的操做,能够用findAny代替findFirst。能够调用unordered方法来把有序流变成无序流。

            -- 流的操做流水线的总计算成本。一个元素经过流水线的大体处理成本较高就意味着使用并行流时性能好的可能性较大;

            -- 对于比较小的数据量,选择并行流不是一个好的额决定;

            -- 要考虑背后的数据结构是否易于分解。使用range工厂方法建立的原始类型流也能够快速分解;

            -- 流自身的特色,以及流水线中的中间操做修改流的方式,均可能会改变分解过程的性能;

            -- 要考虑终端操做中合并步骤的代价是大是小。

        流的数据源和可分解性:

可分解性

ArrayList

极佳

LinkedList

IntStream.range

极佳

Stream.iterate

HashSet

TreeSet

 

   2.分支/合并框架

        分支/合并框架的目的是以递归的方式将能够并行的任务拆分红更小的任务,而后将每一个子任务的结果合并起来生成总体结果。是ExecutorService接口的一个实现,他把子任务分配给线程池ForkJoinPool中的工做线程。

        要把任务交到这个池,必须建立RecursiveTask<R>的一个子类,其中R是并行化任务(一级全部子任务)产生的结果类型,或者入股任务不返回结果,则是RecursiveActive类型。定义RecursiveTask只需实现compute抽象方法。

            该方法的实现相似于:  

if (任务足够小或不可分) {
    顺序计算该任务
} else {       
    将任务分红两个子任务
    递归调用本方法,拆分每一个子任务,等待全部子任务完成
    合并每一个子任务的结果
}


    3.Spliterator

        可分迭代器。

        和Iterator同样,Spliterator也用于遍历数据源中的元素。但他是为了并行执行而设计的。

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> var1);    //相似于普通的Iterator,它会按顺序一个一个使用Spliterator中的元素,若是还有其余元素要遍历就返回true
    Spliterator<T> trySplit();                       //专门为Spliterator接口设计的,它能够把一些元素划分出去给第二个Spliterator(由该方法返回),让它们两个并行处理
    long estimateSize();                             //估计还剩多少元素要遍历
    int characteristics();                           //返回一个int,表明Spliterator自己特性集的编码
}

        Spliterator的特性:  

特性

含义

ORDERED

元素既定的顺序(例如List),所以Spliterator在遍历和划分时也会遵循这一顺序

DISTINCT

对于任意一对遍历过的元素x和y,x.equals(y)返回false

SORTED

遍历的元素按照一个预约义的顺序排序

SIZED

该Spliterator由一个已知大小的源创建(例如Set),所以estimateSize()返回的是准确值

NONNULL

保证遍历的元素不会为null

IMMUTABLE

Spliterator的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素

CONCURRENT

该Spliterator的数据源能够被其余线程同时修改而无需同步

SUBSIZED

该Spliterator和全部从它拆分出来的Spliterator都是SIZED

相关文章
相关标签/搜索