java8 Collector 接口

java8的Stream中的collect方法,用于对流中的数据进行归集操做,collect方法接受的参数是一个Collector,忽略掉静态方法后,Collector接口内容以下:java

public interface Collector<T, A, R>
    // 用于生成空的累加器实例,这个累加器的类型是A
    Supplier<A> supplier();
    // 生成一个用于执行归约操做的BiConsumer<A,T>,A是supplier生成的累加器,T是数据流中的每一个元素的数据类型,能够看做,把T累加到A
    BiConsumer<A, T> accumulator();
    // 并行归集时,须要对多个累加器进行合并操做
    BinaryOperator<A> combiner();
    // 做用是把A转换为R作为最终的返回值。
    Function<A, R> finisher();
    // 特性列表,
    Set<Characteristics> characteristics();

    /** 特性:如下注释基本有道翻译自原始文档注释[手动笑哭脸] */
    enum Characteristics {
        /** 支持并行归集*/
	/** 若是收集器不一样时是无序的(UNORDERED),那么只在应用于无序数据源时,才应该并行归集 */
        CONCURRENT,
        /** 无序的,收集器并不按照Stream中的元素输入顺序执行 */
        UNORDERED,
	/** 表示完成器finisher方法返回的函数是一个恒等函数,这时,累加器对象就是归约操做的最终结果*/
	/** 若是设置,则必须是这样一种状况:从A到R的未检查强制转换将会成功。 */
        IDENTITY_FINISH
    }
}

这个接口仍是有一点点复杂的,3个泛型,5个方法,其中一个characteristics方法用于提供特性列表,其中最重要的就是CONCURRENT,是否容许并行归集; 而另外4个方法,能够返回4个函数,这4个函数各有用途;app

咱们梳理一下归集操做的主要流程,看看在这个过程当中如何使用Collector函数

  1. 对一个数据集进行归集,首先要进行遍历,遍历的过程当中,对数据集的每个元素,进行某种操做,这个操做动做,由accumulator方法提供,该方法返回一个BiConsumer<A, T>函数,此函数消费两个参数,不返回值;该函数中<A, T>两个泛型,其中 T 表示数据集中的元素,而 A 表示对元素操做过程当中产生的中间值进行存储的临时变量,即上面说的累加器;
  2. 上一步中提到了对数据操做的过程当中须要对计算结果进行临时存储,那就须要一个存储的容器,该容器由supplier方法提供,该方法返回一个Supplier<A>函数,可产生一个类型为A的对象;
  3. 若是是并行归集的话,须要一个方法把各个子任务的归集结果进行合并,combiner方法派上用场;
  4. 最后,计算完成后,还要返回一个归集结果,finisher方法可获得一个Function函数,把累加器A转换成最终的响应结果R并返回;

看一下Stream.collect的源码:工具


public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        // 声明一个存储中间值的容器变量(累加器)
	A container;
	// 若是是并行流,而且知足:[1. collector特性中包含CONCURRENT(并行操做);2. 流是无序的,或者collector特性中包含了UNORDERED(无需按顺序进行归集);],执行下面的代码进行并行归集,由于这种状况下,不须要专门提供一个将各个子任务进行合并的方法
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
	    // 首先从supplier方法中得到一个容器
            container = collector.supplier().get();
	    // 拿到执行归集操做的BiConsumer函数
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
	    // 遍历集合,调用BiConsumer函数的accept方法,而且传入存储临时值的累加器和当前遍历到的元素,并行流中,forEach遍历是并行的
            forEach(u -> accumulator.accept(container, u));
        }
        else {
	    // 不知足上述条件,就调用evaluate方法进行归集
	    // evaluate的细节先不说了,由于说来话长
            container = evaluate(ReduceOps.makeRef(collector));
        }
	// 返回值,若是collector特性列表中包含了IDENTITY_FINISH,就返回容器自身,不然, collector.finisher()得到完成器,并传入累加器进行完成操做,最终的结果做为返回值
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

另外,Stream中的collect方法还有另外一个重载方法:ui

public final <R> R collect(Supplier<R> supplier,
                                   BiConsumer<R, ? super P_OUT> accumulator,
                                   BiConsumer<R, R> combiner) {
        	return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
    }

省去了Collector,入参直接接受三个函数,正是上面讲到的三个函数,省去了对结果进行转换的finisher,而实现方法更是直接调用了evaluate,省去了是否并行流和Collector的特征判断;lua


下面看一下Collector的工具类Collectors中提供的几个经常使用实现;翻译


Collectors.toList()code

public static <T> Collector<T, ?, List<T>> toList() {
            return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new,
					List::add,
                    			(left, right) -> {left.addAll(right); return left;},
					CH_ID);
    	}
		
	static final Set<Collector.Characteristics> CH_ID
            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
  1. ArrayList::new,建立一个ArrayList做为累加器;
  2. List::add,对流中元素的操做就是直接添加到累加器中;
  3. 若是是并行归集,对子任务归集结果进行全并的方法是 addAll,后一个子任务的结果直接所有添加到前一个子任务结果中;
  4. CH_ID是一个预约义好的特性列表,只有一个,IDENTITY_FINISH,表示累加器就是最终要返回的结果,不须要转换;

也能够看出Collectors.toList()归集器的归集结果是一个ArrayList,若是想转换为其它List的实现,还须要本身操做,或者定制一个归集器;对象


Collectors.toMap接口

public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper) {
	// 调用另外一个toMap方法,最后一个参数表示supplier,生成累加器,一个HashMap
        return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
    }
	
	public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
	// 这个是要对元素进行的操做,调用俩个映射函数分别生成k和v,而后加入到累加器中
	// map.merge:若是给定key没绑定值或值为null,则绑定给定值,不然,执行重映射方法替换原来值或者删除原来的值。
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
	// 最后特性表仍然是CH_ID,直接返回累加器
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

很少说,看注释就好


分组,这个比较复杂,是组合归集器:Collectors.groupingBy

// 传入一个映射函数,用于分红key,也就是咱们的分组依据
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
	// 这里除了分组依据,还传入一个Collector,咱们称之为子归集器,用于对分组后的数据进行归集
        return groupingBy(classifier, toList());
    }
	public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
	// 这里HashMap::new就是supplier,用于生成累加器
        return groupingBy(classifier, HashMap::new, downstream);
    }

	public static <T, K, D, A, M extends Map<K, D>>
    Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                  Supplier<M> mapFactory,
                                  Collector<? super T, A, D> downstream) {
		// 子归集器的累加器
		Supplier<A> downstreamSupplier = downstream.supplier();
		// 子归集器要进行的操做
		BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
		// 父归集器的操做m表示累加器,t表示要操做的数据
        BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
			// 把数据映射成Key
            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
			// 从累加器中取出子累加器,若是没有,用后面的函数生成一个并绑定
            A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
			// 子归集器对子累加器和数据进行操做
            downstreamAccumulator.accept(container, t);
        };
	// 并行归集的合并函数
        BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
        @SuppressWarnings("unchecked")
	// 累加器
        Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;

	// 若是子归集器特性中包含IDENTITY_FINISH,默认状况下子归集器是一个toList()的结果,自己是IDENTITY_FINISH的
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
	    // 最终仍是建立一个CollectorImpl
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
        else {
	    // 子归集器的完成函数
            @SuppressWarnings("unchecked")
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
	    // 父归集器的完成函数
            Function<Map<K, A>, M> finisher = intermediate -> {
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }

比较复杂,大概的过程,不过结合注释应该仍是能看的懂;

相关文章
相关标签/搜索