static class CollectorImpl<T, A, R> implements Collector<T, A, R>
设计上,自己就是一个辅助类,是一个工厂。做用是给开发者提供常见的收集器实现。提供的方法都是静态方法,能够直接调用。java
函数式编程最大的特色:表示作什么,而不是如何作。开发者更注重如作什么,底层实现如何作。编程
/** * Implementations of {@link Collector} that implement various useful reduction * operations, such as accumulating elements into collections, summarizing * elements according to various criteria, etc. 没有实现的方法,能够本身去编写收集器。 * <p>The following are examples of using the predefined collectors to perform * common mutable reduction tasks: * 举例: * <pre>{@code * // Accumulate names into a List 名字加入到一个集合。 * List<String> list = people.stream().map(Person::getName).collect(Collectors.toList()); * * // Accumulate names into a TreeSet 名字加入到一个Set。 待排序的集合。 * Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new)); * * // Convert elements to strings and concatenate them, separated by commas * String joined = things.stream() * .map(Object::toString) * .collect(Collectors.joining(", ")); * * // Compute sum of salaries of employee 计算员工工资的总数。 * int total = employees.stream() * .collect(Collectors.summingInt(Employee::getSalary))); * * // Group employees by department 对员工进行分组。 * Map<Department, List<Employee>> byDept * = employees.stream() * .collect(Collectors.groupingBy(Employee::getDepartment)); * * // Compute sum of salaries by department 根据部门计算工资的总数。 * Map<Department, Integer> totalByDept * = employees.stream() * .collect(Collectors.groupingBy(Employee::getDepartment, * Collectors.summingInt(Employee::getSalary))); * * // Partition students into passing and failing 将学生进行分区。 * Map<Boolean, List<Student>> passingFailing = * students.stream() * .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD)); * * }</pre> * * @since 1.8 提供了常见的方法。没有的话能够去自定义。 */ public final class Collectors {
public static void main(String[] args) { Student student1 = new Student("zhangsan", 80); Student student2 = new Student("lisi", 90); Student student3 = new Student("wangwu", 100); Student student4 = new Student("zhaoliu", 90); Student student5 = new Student("zhaoliu", 90); List<Student> students = Arrays.asList(student1, student2, student3, student4, student5); //list 转换成一个流,再转换成一个集合. List<Student> students1 = students.stream().collect(Collectors.toList()); students1.forEach(System.out::println); System.out.println("- - - - - - -"); // collect 方法底层原理介绍. //有多种方法能够实现同一个功能.什么方式更好呢? 越具体的方法越好. 减小自动装箱拆箱操做. System.out.println("count:" + students.stream().collect(Collectors.counting())); System.out.println("count:" + (Long) students.stream().count()); System.out.println("- - - - - - - -"); //举例练习 // 找出集合中分数最低的学生,打印出来. students.stream().collect(minBy(Comparator.comparingInt(Student::getScore))).ifPresent(System.out::println); // 找出集合中分数最大成绩 students.stream().collect(maxBy(Comparator.comparingInt(Student::getScore))).ifPresent(System.out::println); // 求平均值 System.out.println(students.stream().collect(averagingInt(Student::getScore))); // 求分数的综合 System.out.println(students.stream().collect(summingInt(Student::getScore))); // 求各类汇总信息 结果为IntSummaryStatistics{count=5, sum=450, min=80, average=90.000000, max=100} System.out.println(students.stream().collect(summarizingInt(Student::getScore))); System.out.println(" - - - - - "); // 字符串的拼接 结果为:zhangsanlisiwangwuzhaoliuzhaoliu System.out.println(students.stream().map(Student::getName).collect(joining())); //拼接加分隔符 结果为:zhangsan,lisi,wangwu,zhaoliu,zhaoliu System.out.println(students.stream().map(Student::getName).collect(joining(","))); // 拼接加先后缀 结果为:hello zhangsan,lisi,wangwu,zhaoliu,zhaoliu world System.out.println(students.stream().map(Student::getName).collect(joining(",", "hello ", " world"))); System.out.println("- - - - - - "); // group by 多层分组 // 根据分数和名字进行分组 输出结果为: // {80={zhangsan=[Student{name='zhangsan', score=80}]}, // 100={wangwu=[Student{name='wangwu', score=100}]}, // 90={lisi=[Student{name='lisi', score=90}], zhaoliu=[Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}]}} Map<Integer, Map<String, List<Student>>> collect = students.stream().collect(groupingBy(Student::getScore, groupingBy(Student::getName))); System.out.println(collect); System.out.println("- - - - - - - "); // partitioningBy 多级分区 输出结果为:{false=[Student{name='zhangsan', score=80}], true=[Student{name='lisi', score=90}, Student{name='wangwu', score=100}, Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}]} Map<Boolean, List<Student>> collect1 = students.stream().collect(partitioningBy(student -> student.getScore() > 80)); System.out.println(collect1); // 按照大于80分区,再按照90分区 //输出结果为:{false={false=[Student{name='zhangsan', score=80}], true=[]}, true={false=[Student{name='lisi', score=90}, Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}], true=[Student{name='wangwu', score=100}]}} Map<Boolean, Map<Boolean, List<Student>>> collect2 = students.stream().collect(partitioningBy(student -> student.getScore() > 80, partitioningBy(student -> student.getScore() > 90))); System.out.println(collect2); //分区, 而后求出每一个分组中的个数. 结果为:{false=1, true=4} Map<Boolean, Long> collect3 = students.stream().collect(partitioningBy(student -> student.getScore() > 80, counting())); System.out.println(collect3); System.out.println("- - - - - - - "); //根据名字分组,获得学生的分数 --, 使用collectingAndThen 求最小值,而后整合起来. 最后Optional.get()必定有值. students.stream().collect(groupingBy(Student::getName,collectingAndThen(minBy(Comparator.comparingInt(Student::getScore)), Optional::get))); }
Comparator 比较器。引用了多个default方法。数组
完成一个功能时有多个方法,使用特化的方法。由于效率会更高。减小了装箱拆箱的操做。减小性能损耗。多线程
public static void main(String[] args) { List<String> list = Arrays.asList("nihao", "hello", "world", "welcome"); //对list按照字母的升序排序 // list.stream().sorted().forEach(System.out::println); //按照字符串的长度排序 // Collections.sort(list, (item1, item2) -> item1.length() - item2.length()); // Collections.sort(list, Comparator.comparingInt(String::length)); //字符串的降序排序 // list.sort(Comparator.comparingInt(String::length).reversed()); // 下边的形式会报错 item识别成了(Obejct). //lambda表达式的类型推断. 若是没法推断类型,须要本身制定类型 // list.sort(Comparator.comparingInt(item-> item.length()).reversed()); //这样写就成功了. list.sort(Comparator.comparingInt((String item )-> item.length()).reversed()); //为何这个地方没法推断类型? // 能推断出的 : list.stream().... Strean<T> 传递的有参数. 精确的类型能够进行类型推断. //这个地方没有明确具体是什么类型.ToIntFunction<? super T> .能够是String 或者在往上的父类 这个地方当作了Object类了. // list.sort(Comparator.comparingInt((Boolean item)-> 1).reversed()); //这种Boolean 就会报错.编译不经过. System.out.println(list); }
thenComparing()多级排序的练习。;并发
List<String> list = Arrays.asList("nihao", "hello", "world", "welcome"); //两层的比较.先按照字符串的长度升序排序. 长度相同,根据每个ASCII码的升序排序. (不区分大小写的 ,按照字母排序的规则) 几种实现的方法。 list.sort(Comparator.comparingInt(String::length).thenComparing(String.CASE_INSENSITIVE_ORDER)); list.sort(Comparator.comparingInt(String::length).thenComparing((item1,item2) -> item1.toUpperCase().compareTo(item2.toUpperCase()))); list.sort(Comparator.comparingInt(String::length).thenComparing(Comparator.comparing(String::toUpperCase))); //排序后将顺序翻转过来. reverseOrder(); list.sort(Comparator.comparingInt(String::length).thenComparing(String::toLowerCase,Comparator.reverseOrder())); // 按照字符串的长度降序排序, 再根据ASCII的降序排序 list.sort(Comparator.comparingInt(String::length).reversed() .thenComparing(String::toLowerCase,Comparator.reverseOrder())); //多级排序 list.sort(Comparator.comparingInt(String::length).reversed() .thenComparing(String::toLowerCase, Comparator.reverseOrder()) .thenComparing(Comparator.reverseOrder())); // 最后一个thenComparing()没有发生做用。
jdk提供了Collector接口。app
public class MySetCollector<T> implements Collector<T,Set<T>,Set<T>> { @Override public Supplier<Set<T>> supplier() { //用于提供一个空的容器 System.out.println("supplier invoked! "); return HashSet::new; // 不接受对象,返回一个Set对象 } @Override public BiConsumer<Set<T>, T> accumulator() { // 累加器类型. 接收两个参数不返回值. //完成的功能: 不断的往set中添加元素 System.out.println("accumulator invoked! "); return Set<T>::add ; // return HashSet<T>::add ; 返回HashSet报错. 缘由: 返回的是中间类型的返回类型. 不论返回什么类型的Set ,Set都符合要求. } @Override public BinaryOperator<Set<T>> combiner() { //将并行流的多个结果给合并起来. System.out.println("combiner invoked! "); return (set1,set2)->{ set1.addAll(set2); return set1; }; } @Override public Function<Set<T>, Set<T>> finisher() { //完成器,把全部的结果都合并在一块儿. 返回一个最终的结果类型 //若是中间类型 和最终结果类型一致, 不执行此方法; System.out.println("finisher invoked! "); // return t -> t ; return Function.identity(); // 老是返回参数. } @Override public Set<Characteristics> characteristics() { System.out.println("characterstics invoked! "); return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED)); // 这个地方 不给参数,IDENTITY_FINISH . 则会调用finisher() } public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world"); Set<String> collect = list.stream().collect(new MySetCollector<>()); System.out.println(collect); } 输出结果为: supplier invoked! accumulator invoked! combiner invoked! characterstics invoked! characterstics invoked! [world, hello] }
接下来跟源码,看一下程序的调用过程。ide
@Override @SuppressWarnings("unchecked") public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
// 举例: 需求:将一个Set,进行一个收集.对结果进行加强,封装在一个map当中. // 输入:Set<String> // 输出:Map<String,String> // 示例输入: [hello,world,hello world] // 示例输出: {[hello,hello],[world,world],[hello world,hello world]}
public class MySetCollector2<T> implements Collector<T, Set<T>, Map<T, T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked!"); return HashSet::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumlator invoked!"); return (set, item) -> { set.add(item); //每次调用 打印出线程 这里会打印6次, System.out.println("accunlator : " +set+ ", "+ Thread.currentThread().getName()); //出现异常的缘由在这里: // 一个线程去修改一个集合,同时另一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。若是是并行操做的话,就不要在操做中额外的添加操做。添加就添加,别再去打印他。 }; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner invoked!"); //并行流的时候才会被调用. 将并行流的多个结果给合并起来 return (set1, set2) -> { set1.addAll(set2); return set2; }; } @Override public Function<Set<T>, Map<T, T>> finisher() { System.out.println("finisher invoked!"); // 中间类型和最终类型 同样,这个是不会被调用的. //这里不同 . 会进行调用 return set -> { Map<T, T> map = new HashMap<>(); // Map<T, T> map = new TreeMap<>(); 直接返回一个排序的Map set.forEach(item -> map.put(item,item)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println(" characteristics invoked"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));// 这个参数不能乱写. 要理解每一个枚举的具体意思. // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));// 这个参数不能乱写. 要理解每一个枚举的具体意思. //加了这个参数 Characteristics.CONCURRENT // 会出异常, 会正常运行. Caused by: java.util.ConcurrentModificationException // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.IDENTITY_FINISH)); // 加了参数Characteristics.IDENTITY_FINISH . 会报错 // Process 'command '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1 // IDENTITY_FINISH 实际的含义: 若是用和这个参数,表示 Finish函数就是 identity函数。 而且转换必定要是成功的。失败的话会抛异常. // 这个收集器具备什么特性 ,由Characteristics 来定义. 就算你赋值的不实际,他也照样执行. } public static void main(String[] args) { List<String> list = Arrays.asList("hello","hello", "world", "helloworld","1","4","j"); Set<String> set = new HashSet<>(list); System.out.println("set"+set); // Map<String, String> collect = set.stream().collect(new MySetCollector2<>()); Map<String, String> collect = set.parallelStream().collect(new MySetCollector2<>()); //并行流 System.out.println(collect); } }
并行: accumlator invoked! accunlator : [j], main accunlator : [j, hello], main accunlator : [helloworld, 4, j, hello], ForkJoinPool.commonPool-worker-2 accunlator : [helloworld, 1, 4, j, hello], ForkJoinPool.commonPool-worker-2 accunlator : [helloworld, 1, world, 4, j, hello], ForkJoinPool.commonPool-worker-2
串行。 accunlator : [j], main accunlator : [helloworld], ForkJoinPool.commonPool-worker-11 accunlator : [helloworld, 1], ForkJoinPool.commonPool-worker-11 accunlator : [helloworld, 1, world], ForkJoinPool.commonPool-worker-11 accunlator : [4], ForkJoinPool.commonPool-worker-9 accunlator : [j, hello], main
/** * Characteristics indicating properties of a {@code Collector}, which can * be used to optimize reduction implementations. */ enum Characteristics { // 特征 /** * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * 并发的,同一个结果容器能够由多个线程同时调用。 * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. 若是不是UNORDERED。只能用于无序的数据源。 若是不加CONCURRENT,仍是能够操做并行流。可是操做的不是一个结果容器,而是多个结果容器。则须要调用finisher. 若是加了CONCURRENT,则是多个线程操做同一结果容器。 则无需调用finisher. */ CONCURRENT, /** * Indicates that the collection operation does not commit to preserving * the encounter order of input elements. (This might be true if the * result container has no intrinsic order, such as a {@link Set}.) 收集操做并不保留顺序。无序的。 */ UNORDERED, /** * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. 若是用和这个参数,表示 Finish函数就是 identity函数。 而且转换必定要是成功的。不会调用Finish方法 */ IDENTITY_FINISH }
一个线程去修改一个集合,同时另一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。函数式编程
若是是并行操做的话,就不要在操做中额外的添加操做。添加就添加,别再去打印他。函数
若是不加CONCURRENT,仍是能够操做并行流。可是操做的不是一个结果容器,而是多个结果容器。则须要调用finisher.
若是加了CONCURRENT,则是多个线程操做同一结果容器。 则无需调用finisher.性能
超线程(HT, Hyper-Threading)是英特尔研发的一种技术,于2002年发布。超线程技术原先只应用于Xeon 处理器中,当时称为“Super-Threading”。以后陆续应用在Pentium 4 HT中。早期代号为Jackson。 [1]
经过此技术,英特尔实如今一个实体CPU中,提供两个逻辑线程。以后的Pentium D纵使不支持超线程技术,但就集成了两个实体核心,因此仍会见到两个线程。超线程的将来发展,是提高处理器的逻辑线程。英特尔于2016年发布的Core i7-6950X即是将10核心的处理器,加上超线程技术,使之成为20个逻辑线程的产品
Collectors类中方法的实现练习。收集器老是有中间的容器。有必要的总结一下收集器中的方法。
当你具有一些前提的东西以后,你再去看难的东西就会以为理所固然的。
对于Collectors静态工厂类来讲,实现一共分为两种状况:
经过CollectorImpl来实现。
经过reducing方法来实现;reducing方法自己又是经过CollectorImpl实现的。
总的来讲,都是经过CollectorImpl来实现的。
1. toCollection(collectionFactory) 。 将集合转成指定的集合。 public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { return new CollectorImpl<>(collectionFactory, Collection<T>::add, (r1, r2) -> { r1.addAll(r2); return r1; }, CH_ID); }
2. toList()是 toCollection()方法的一种具体实现。 public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
3. toSet() 是toCollection()方法的一种具体实现。 public static <T> Collector<T, ?, Set<T>> toSet() { return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, CH_UNORDERED_ID); }
4. joining(); 融合成一个字符串。还有两个重载的,单参数的和多参数的 public static Collector<CharSequence, ?, String> joining() { return new CollectorImpl<CharSequence, StringBuilder, String>( StringBuilder::new, StringBuilder::append, (r1, r2) -> { r1.append(r2); return r1; }, StringBuilder::toString, CH_NOID); } public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) { return joining(delimiter, "", ""); } public static Collector<CharSequence, ?, String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) { return new CollectorImpl<>( () -> new StringJoiner(delimiter, prefix, suffix), StringJoiner::add, StringJoiner::merge, StringJoiner::toString, CH_NOID); }
5.mapping(); 将收集器的A 映射成B public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)), downstream.combiner(), downstream.finisher(), downstream.characteristics()); } such as : Map<City, Set<String>> lastNamesByCity = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet())));
6.collectingAndThen(); 收集处理转换完后, 再去进行一个转换。 public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) { Set<Collector.Characteristics> characteristics = downstream.characteristics(); if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) { if (characteristics.size() == 1) characteristics = Collectors.CH_NOID; else { characteristics = EnumSet.copyOf(characteristics); characteristics.remove(Collector.Characteristics.IDENTITY_FINISH); // 这个地方为何要把IDENTITY_FINISH 去掉。 // 若是不去掉的话, 最终结果直接返回中间结果的类型 characteristics = Collections.unmodifiableSet(characteristics); } } return new CollectorImpl<>(downstream.supplier(), downstream.accumulator(), downstream.combiner(), downstream.finisher().andThen(finisher), characteristics); } such as : List<String> people = people.stream().collect(collectingAndThen(toList(),Collections::unmodifiableList));
7. counting(); 计数。 public static <T> Collector<T, ?, Long> counting() { return reducing(0L, e -> 1L, Long::sum); }
8. 最大值最小值 public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.minBy(comparator)); } public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.maxBy(comparator)); }
9. summingInt();求和。 public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new int[1], // 这个地方为何不能够用一个0,来当作中间类型呢?数字自己是一个值类型的,不可变的,无法引用。数组自己是一个引用类型,能够进行传递。数组自己是一个容器。 (a, t) -> { a[0] += mapper.applyAsInt(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); } public static <T> Collector<T, ?, Long> summingLong(ToLongFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[1], (a, t) -> { a[0] += mapper.applyAsLong(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); } public static <T> Collector<T, ?, Double> summingDouble(ToDoubleFunction<? super T> mapper) { /* * In the arrays allocated for the collect operation, index 0 * holds the high-order bits of the running sum, index 1 holds * the low-order bits of the sum computed via compensated * summation, and index 2 holds the simple sum used to compute * the proper result if the stream contains infinite values of * the same sign. */ return new CollectorImpl<>( () -> new double[3], (a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t)); a[2] += mapper.applyAsDouble(t);}, (a, b) -> { sumWithCompensation(a, b[0]); a[2] += b[2]; return sumWithCompensation(a, b[1]); }, a -> computeFinalSum(a), CH_NOID); }
10. averagingInt(); 求平均值。 public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID); } public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsLong(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID); } public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper) { /* * In the arrays allocated for the collect operation, index 0 * holds the high-order bits of the running sum, index 1 holds * the low-order bits of the sum computed via compensated * summation, and index 2 holds the number of values seen. */ return new CollectorImpl<>( () -> new double[4], (a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t)); a[2]++; a[3]+= mapper.applyAsDouble(t);}, (a, b) -> { sumWithCompensation(a, b[0]); sumWithCompensation(a, b[1]); a[2] += b[2]; a[3] += b[3]; return a; }, a -> (a[2] == 0) ? 0.0d : (computeFinalSum(a) / a[2]), CH_NOID); }
11. reducing() ; 详解。 public static <T> Collector<T, ?, T> reducing(T identity, BinaryOperator<T> op) { return new CollectorImpl<>( boxSupplier(identity), (a, t) -> { a[0] = op.apply(a[0], t); }, (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; }, a -> a[0], CH_NOID); }
12. groupingBy(); 分组方法详解。 public static <T, K> Collector<T, ?, Map<K, List<T>>> //使用者自己不注重中间类型怎么操做。 groupingBy(Function<? super T, ? extends K> classifier) { return groupingBy(classifier, toList()); //调用两个参数的 groupingBy(); } * @param <T> the type of the input elements //T; 接收的类型。 * @param <K> the type of the keys // K,分类器函数中间返回结果的类型。 * @param <A> the intermediate accumulation type of the downstream collector * @param <D> the result type of the downstream reduction * public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingBy(classifier, HashMap::new, downstream); // 调用三参数的 groupingBy() } //功能最彻底的groupingBy(); /** * Returns a {@code Collector} implementing a cascaded "group by" operation * on input elements of type {@code T}, grouping elements according to a * classification function, and then performing a reduction operation on * the values associated with a given key using the specified downstream * {@code Collector}. The {@code Map} produced by the Collector is created * with the supplied factory function. * * <p>The classification function maps elements to some key type {@code K}. * The downstream collector operates on elements of type {@code T} and * produces a result of type {@code D}. The resulting collector produces a * {@code Map<K, D>}. * * <p>For example, to compute the set of last names of people in each city, * where the city names are sorted: * <pre>{@code * Map<City, Set<String>> namesByCity * = people.stream().collect(groupingBy(Person::getCity, TreeMap::new, * mapping(Person::getLastName, toSet()))); * }</pre> * * @implNote * The returned {@code Collector} is not concurrent. For parallel stream * pipelines, the {@code combiner} function operates by merging the keys * from one map into another, which can be an expensive operation. If * preservation of the order in which elements are presented to the downstream * collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)} * may offer better parallel performance. * 返回的 并非并发的。若是顺序并非很重要的话, 推荐使用groupingByConcurrent(); 并发的分组函数。 * @param <T> the type of the input elements * @param <K> the type of the keys * @param <A> the intermediate accumulation type of the downstream collector * @param <D> the result type of the downstream reduction * @param <M> the type of the resulting {@code Map} * @param classifier a classifier function mapping input elements to keys * @param downstream a {@code Collector} implementing the downstream reduction * @param mapFactory a function which, when called, produces a new empty * {@code Map} of the desired type * @return a {@code Collector} implementing the cascaded group-by operation * * @see #groupingBy(Function, Collector) * @see #groupingBy(Function) * @see #groupingByConcurrent(Function, Supplier, Collector) */ public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(container, t); }; BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); //接收两个参数,参会一个结果。 @SuppressWarnings("unchecked") Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; // 进行一个强制的类型转换。 if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { //若是 IDENTITY_FINISH , 则不用调用finisher方法。 return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); } }
13. groupingByConcurrent(); 并发的分组方法。 使用前提是对数据里边的顺序没有要求。 /** * Returns a concurrent {@code Collector} implementing a cascaded "group by" * operation on input elements of type {@code T}, grouping elements * according to a classification function, and then performing a reduction * operation on the values associated with a given key using the specified * downstream {@code Collector}. */ // ConcurrentHashMap 是一个支持并发的Map public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList()); } public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream); } public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings("unchecked") Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory; BiConsumer<ConcurrentMap<K, A>, T> accumulator; if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(resultContainer, t); }; } else { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); synchronized (resultContainer) { // 这里有一个同步的操做。虽然是多线程操做同一容器,可是同时仍是只有一个线程操做,进行了同步。 downstreamAccumulator.accept(resultContainer, t); } }; } if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<ConcurrentMap<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); } }
14. partitioningBy(); 分区详解。 public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) { return partitioningBy(predicate, toList()); } public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) -> downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t); BinaryOperator<A> op = downstream.combiner(); BinaryOperator<Partition<A>> merger = (left, right) -> new Partition<>(op.apply(left.forTrue, right.forTrue), op.apply(left.forFalse, right.forFalse)); Supplier<Partition<A>> supplier = () -> new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); } else { Function<Partition<A>, Map<Boolean, D>> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); } }
jdk的代码,就是咱们学习的范本。
讲这么细的缘由并非由于要本身去写,是为了了解内部是具体怎么实现的。调用的时候就信心很是的足。