本文从源码和实例入手,为你们解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的内部实现。html
在前文[源码解析] Flink的Groupby和reduce究竟作了什么中,咱们剖析了Group和reduce都作了些什么,也对combine有了一些了解。可是总感受意犹未尽,由于在Flink还提出了若干新算子,好比GroupReduce和GroupCombine。这几个算子不搞定,总以为如鲠在喉,但没有找到一个良好的例子来进行剖析说明。java
本文是笔者在探究Flink SQL UDF问题的一个副产品。起初是为了调试一段sql代码,结果发现Flink自己给出了一个GroupReduce和GroupCombine使用的完美例子。因而就拿出来和你们共享,一块儿分析看看究竟如何使用这两个算子。mysql
请注意:这个例子是Flink SQL,因此本文中将涉及Flink SQL goup by内部实现的知识。sql
Flink官方对于这两个算子的使用说明以下:apache
GroupReduce算子应用在一个已经分组了的DataSet上,其会对每一个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会当即得到整个组。api
Flink将在组的全部元素上使用Iterable调用用户自定义函数,而且能够返回任意数量的结果元素。数据结构
GroupCombine转换是可组合GroupReduceFunction中组合步骤的通用形式。它在某种意义上被归纳为容许将输入类型 I 组合到任意输出类型O。与此相对的是,GroupReduce中的组合步骤仅容许从输入类型 I 到输出类型 I 的组合。这是由于GroupReduceFunction的 "reduce步骤" 指望本身的输入类型为 I。app
在一些应用中,咱们指望在执行附加变换(例如,减少数据大小)以前将DataSet组合成中间格式。这能够经过CombineGroup转换能以很是低的成本实现。ide
注意:分组数据集上的GroupCombine在内存中使用贪婪策略执行,该策略可能不会一次处理全部数据,而是以多个步骤处理。它也能够在各个分区上执行,而无需像GroupReduce转换那样进行数据交换。这可能会致使输出的是部分结果,因此GroupCombine是不能替代GroupReduce操做的,尽管它们的操做内容可能看起来都同样。函数
是否是有点晕?仍是直接让代码来讲话吧。如下官方示例演示了如何将CombineGroup和GroupReduce转换用于WordCount实现。即经过combine操做先对单词数目进行初步排序,而后经过reduceGroup对combine产生的结果进行最终排序。由于combine进行了初步排序,因此在算子之间传输的数据量就少多了。
DataSet<String> input = [..] // The words received as input // 这里经过combine操做先对单词数目进行初步排序,其优点在于用户定义的combine函数只调用一次,由于runtime已经把输入数据一次性都提供给了自定义函数。 DataSet<Tuple2<String, Integer>> combinedWords = input .groupBy(0) // group identical words .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() { public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine String key = null; int count = 0; for (String word : words) { key = word; count++; } // emit tuple with word and count out.collect(new Tuple2(key, count)); } }); // 这里对combine的结果进行第二次排序,其优点在于用户定义的reduce函数只调用一次,由于runtime已经把输入数据一次性都提供给了自定义函数。 DataSet<Tuple2<String, Integer>> output = combinedWords .groupBy(0) // group by words again .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) { String key = null; int count = 0; for (Tuple2<String, Integer> word : words) { key = word; count++; } // emit tuple with word and count out.collect(new Tuple2(key, count)); } });
看到这里,有的兄弟已经明白了,这和mapPartition很相似啊,都是runtime作了大量工做。为了让你们这两个算子的使用情形有深入的认识,咱们再经过一个sql的例子,向你们展现Flink内部是怎么应用这两个算子的,也能看出来他们的强大之处。
下面代码主要参考自 flink 使用问题汇总。咱们能够看到这里经过groupby进行了聚合操做。其中collect方法,相似于mysql的group_concat。
public class UdfExample { public static class MapToString extends ScalarFunction { public String eval(Map<String, Integer> map) { if(map==null || map.size()==0) { return ""; } StringBuffer sb=new StringBuffer(); for(Map.Entry<String, Integer> entity : map.entrySet()) { sb.append(entity.getKey()+","); } String result=sb.toString(); return result.substring(0, result.length()-1); } } public static void main(String[] args) throws Exception { MemSourceBatchOp src = new MemSourceBatchOp(new Object[][]{ new Object[]{"1", "a", 1L}, new Object[]{"2", "b33", 2L}, new Object[]{"2", "CCC", 2L}, new Object[]{"2", "xyz", 2L}, new Object[]{"1", "u", 1L} }, new String[]{"f0", "f1", "f2"}); BatchTableEnvironment environment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment(); Table table = environment.fromDataSet(src.getDataSet()); environment.registerTable("myTable", table); BatchOperator.registerFunction("MapToString", new MapToString()); BatchOperator.sqlQuery("select f0, mapToString(collect(f1)) as type from myTable group by f0").print(); } }
程序输出是
f0|type --|---- 1|a,u 2|CCC,b33,xyz
这个SQL语句的重点是group by。这个是程序猿常用的操做。可是你们有没有想过这个group by在真实运行起来时候是怎么操做的呢?针对大数据环境有没有作了什么优化呢?其实,Flink正是使用了GroupReduce和GroupCombine来实现而且优化了group by的功能。优化之处在于:
SQL生成Flink的过程十分错综复杂,因此咱们只能找最关键处。其是在 DataSetAggregate.translateToPlan 完成的。咱们能够看到,对于SQL语句 “select f0, mapToString(collect(f1)) as type from myTable group by f0”
,Flink系统把它翻译成以下阶段,即
从以前的文章咱们能够知道,groupBy这个其实不是一个算子,它只是排序过程当中的一个辅助步骤而已,因此咱们重点仍是要看combineGroup和reduceGroup。这偏偏是咱们想要的完美例子。
input ----> (groupBy + combineGroup) ----> (groupBy + reduceGroup) ----> output
SQL生成的Scala代码以下,其中 combineGroup在后续中将生成GroupCombineOperator,reduceGroup将生成GroupReduceOperator。
override def translateToPlan( tableEnv: BatchTableEnvImpl, queryConfig: BatchQueryConfig): DataSet[Row] = { if (grouping.length > 0) { // grouped aggregation ...... if (preAgg.isDefined) { // 咱们的例子是在这里 inputDS // pre-aggregation .groupBy(grouping: _*) .combineGroup(preAgg.get) // 将生成GroupCombineOperator算子 .returns(preAggType.get) .name(aggOpName) // final aggregation .groupBy(grouping.indices: _*) //将生成GroupReduceOperator算子。 .reduceGroup(finalAgg.right.get) .returns(rowTypeInfo) .name(aggOpName) } else { ...... } } else { ...... } } } // 程序变量打印以下 this = {DataSetAggregate@5207} "Aggregate(groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))" cluster = {RelOptCluster@5220}
LocalExecutor.execute中会生成JobGraph。JobGraph是提交给 JobManager 的数据结构,是惟一被Flink的数据流引擎所识别的表述做业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。
在生成JobGraph时候,系统获得以下JobVertex。
jobGraph = {JobGraph@5652} "JobGraph(jobId: 6aae8b5e5ad32f588136bef26f8b65f6)" taskVertices = {LinkedHashMap@5655} size = 4 {JobVertexID@5677} "c625209bb7fb9a098807551840aeaa99" -> {InputOutputFormatVertex@5678} "CHAIN DataSource (at initializeDataSource(MemSourceBatchOp.java:98) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (select: (f0, f1)) (org.apache.flink.runtime.operators.DataSourceTask)" {JobVertexID@5679} "b56ace4acd7a2f69ea110a9f262ff80a" -> {JobVertex@5680} "CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map (Map at linkFrom(MapBatchOp.java:35)) (org.apache.flink.runtime.operators.BatchTask)" {JobVertexID@5681} "3f5e2a0f700421d80ce85e02a6d9db73" -> {InputOutputFormatVertex@5682} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)" {JobVertexID@5683} "ad29dc5b2e0a39ad2cd1d164b6f859f7" -> {JobVertex@5684} "GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) (org.apache.flink.runtime.operators.BatchTask)"
咱们能够看到,在JobGraph中就生成了对应的两个算子。其中这里的FlatMap就是用户的UDF函数MapToString的映射生成。
GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map
最后,让咱们看看runtime会如何处理这两个算子。
首先,Flink会在ChainedFlatMapDriver.collect中对record进行处理,这是从Table中提取数据所必须经历的,与后续的group by关系不大。
@Override public void collect(IT record) { try { this.numRecordsIn.inc(); this.mapper.flatMap(record, this.outputCollector); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); } } // 这里可以看出来,咱们获取了第一列记录 record = {Row@9317} "1,a,1" fields = {Object[3]@9330} this.taskName = "FlatMap (select: (f0, f1))" // 程序堆栈打印以下 collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) invoke:196, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
其次,GroupReduceCombineDriver.run()中会进行combine操做。
this.sorter.write(value)
把数据写到排序缓冲区。sortAndCombineAndRetryWrite(value)
进行实际的排序,合并。由于是系统实现,因此Combine的用户自定义函数就是由Table API提供的,好比org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate
。
@Override public void run() throws Exception { final MutableObjectIterator<IN> in = this.taskContext.getInput(0); final TypeSerializer<IN> serializer = this.serializer; if (objectReuseEnabled) { ..... } else { IN value; while (running && (value = in.next()) != null) { // try writing to the sorter first if (this.sorter.write(value)) { continue; } // do the actual sorting, combining, and data writing sortAndCombineAndRetryWrite(value); } } // sort, combine, and send the final batch if (running) { sortAndCombine(); } } // 程序变量以下 value = {Row@9494} "1,a" fields = {Object[2]@9503}
sortAndCombine是具体排序/合并的过程。
org.apache.flink.runtime.operators.sort.QuickSort
完成的。org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate
完成的。org.apache.flink.table.runtime.aggregate.DataSetPreAggFunction.combine
调用 out.collect(output)
完成的。private void sortAndCombine() throws Exception { final InMemorySorter<IN> sorter = this.sorter; // 这里进行实际的排序 this.sortAlgo.sort(sorter); final GroupCombineFunction<IN, OUT> combiner = this.combiner; final Collector<OUT> output = this.output; // iterate over key groups if (objectReuseEnabled) { ...... } else { final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator); // 这里是归并操做 while (this.running && keyIter.nextKey()) { // combiner.combiner 是用户定义操做,runtime把某key对应的数据一次性传给它 combiner.combine(keyIter.getValues(), output); } } }
具体调用栈以下:
accumulate:57, CollectAggFunction (org.apache.flink.table.functions.aggfunctions) accumulate:-1, DataSetAggregatePrepareMapHelper$5 combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate) sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators) run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
这两个放在一块儿,是由于他们组成了Operator Chain。
GroupReduceDriver.run中完成了reduce。具体reduce 操做是在 org.apache.flink.table.runtime.aggregate.DataSetFinalAggFunction.reduce
完成的,而后在其中直接发送给下游 out.collect(output)
。
@Override public void run() throws Exception { // cache references on the stack final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub(); if (objectReuseEnabled) { ...... } else { final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator); // run stub implementation while (this.running && iter.nextKey()) { // stub.reduce 是用户定义操做,runtime把某key对应的数据一次性传给它 stub.reduce(iter.getValues(), output); } } }
从前文咱们能够,这里已经配置成了Operator Chain,因此out.collect(output)
会调用到CountingCollector。CountingCollector的成员变量collector已经配置成了ChainedFlatMapDriver。
public void collect(OUT record) { this.numRecordsOut.inc(); this.collector.collect(record); } this.collector = {ChainedFlatMapDriver@9643} mapper = {FlatMapRunner@9610} config = {TaskConfig@9655} taskName = "FlatMap (select: (f0, mapToString($f1) AS type))"
因而程序就调用到了 ChainedFlatMapDriver.collect
。
public void collect(IT record) { try { this.numRecordsIn.inc(); this.mapper.flatMap(record, this.outputCollector); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); } }
最终调用栈如以下:
eval:21, UdfExample$MapToString (com.alibaba.alink) flatMap:-1, DataSetCalcRule$14 flatMap:52, FlatMapRunner (org.apache.flink.table.runtime) flatMap:31, FlatMapRunner (org.apache.flink.table.runtime) collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) reduce:80, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate) run:131, GroupReduceDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
由此咱们能够看到: