Groupby和reduce是大数据领域常见的算子,可是不少同窗应该对其背后机制不甚了解。本文将从源码入手,为你们解析Flink中Groupby和reduce的原理,看看他们在背后作了什么。java
探究的缘由是想到了几个问题 :算法
为了便于你们理解,咱们先总结下,对于一个Groupby + Reduce的操做,Flink作了以下处理:apache
这样以前的疑问就基本获得了解释。编程
MapReduce是一种编程模型,用于大规模数据集的并行运算。概念 "Map(映射)"和"Reduce(归约)" 是它们的主要思想,其是从函数式编程语言,矢量编程语言里借来的特性。api
咱们目前使用的Flink,Spark都出自于MapReduce,因此咱们有必有追根溯源,看看MapReduce是如何区分各个阶段的。网络
若是把MapReduce细分,能够分为一下几大过程:数据结构
Combine是咱们须要特殊注意的。在mapreduce中,map多,reduce少。在reduce中因为数据量比较多,因此咱们干脆在map阶段中先把本身map里面的数据归类,这样到了reduce的时候就减轻了压力。app
Combine能够理解为是在map端的reduce的操做,对单个map任务的输出结果数据进行合并的操做。combine是对一个map的,而reduce合并的对象是对于多个map。框架
map函数操做所产生的键值对会做为combine函数的输入,经combine函数处理后再送到reduce函数进行处理,减小了写入磁盘的数据量,同时也减小了网络中键值对的传输量。在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点自己运行,至关于对map函数的输出作了一次reduce。编程语言
集群上的可用带宽每每是有限的,产生的中间临时数据量很大时就会出现性能瓶颈,所以应该尽可能避免Map端任务和Reduce端任务之间大量的数据传输。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少。
Partition是分割map每一个节点的结果,按照key分别映射给不一样的reduce,mapreduce使用哈希HashPartitioner帮咱们归类了。这个咱们也能够自定义。
这里其实能够理解归类。咱们对于错综复杂的数据归类。好比在动物园里有牛羊鸡鸭鹅,他们都是混在一块儿的,可是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的做用就是把这些数据归类。只不过是在写程序的时候,
在通过mapper的运行后,咱们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。由于当前map端只作加1的操做,在reduce task里才去合并结果集。假如咱们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪一个reduce task去作呢,是须要马上决定的。
MapReduce提供Partitioner接口,它的做用就是根据key或value及reduce task的数量来决定当前的这对输出数据最终应该交由哪一个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,若是用户本身对Partitioner有需求,能够订制并设置到job上。
在咱们的例子中,假定 “aaa”通过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。
shuffle就是map和reduce之间的过程,包含了两端的combine和partition。它比较难以理解,由于咱们摸不着,看不到它。它属于mapreduce的框架,编程的时候,咱们用不到它。
Shuffle的大体范围就是:怎样把map task的输出结果有效地传送到reduce端。也能够这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。
简单地说,reduce task在执行以前的工做就是不断地拉取当前job里每一个map task的最终结果,而后对从不一样地方拉取过来的数据不断地作merge,最终造成一个文件做为reduce task的输入文件。
咱们以Flink的KMeans算法做为样例,具体摘要以下:
public class WordCountExampleReduce { DataStream ds; public static void main(String[] args) throws Exception { //构建环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //经过字符串构建数据集 DataSet<String> text = env.fromElements( "Who‘s there?", "I think I hear them. Stand, ho! Who‘s there?"); //分割字符串、按照key进行分组、统计相同的key个数 DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } }); //打印 wordCounts.print(); } //分割字符串的方法 public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
输出是:
(hear,1) (ho!,1) (them.,1) (I,2) (Stand,,1) (Who‘s,2) (there?,2) (think,1)
首先,咱们从Flink基本JAVA API来入手开始挖掘。
咱们须要留意的是:GroupBy并无对应的Operator。GroupBy只是生成DataSet转换的一个中间步骤或者辅助步骤。
GroupBy功能的基类是Grouping,其只是DataSet转换的一个中间步骤。其几个主要成员是:
// Grouping is an intermediate step for a transformation on a grouped DataSet. public abstract class Grouping<T> { protected final DataSet<T> inputDataSet; protected final Keys<T> keys; protected Partitioner<?> customPartitioner; }
Grouping并无任何业务相关的API,具体API都是在其派生类中,好比UnsortedGrouping。
咱们代码中对应的就是UnsortedGrouping类。咱们看到它提供了不少业务API,好比:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....
回到咱们的示例,groupBy作了以下操做
.groupBy(0).reduce(new CentroidAccumulator())
返回的是ReduceOperator。这就对应了前面咱们提到的,groupBy只是中间步骤,reduce才能返回一个Operator。public class UnsortedGrouping<T> extends Grouping<T> { // groupBy返回一个UnsortedGrouping public UnsortedGrouping<T> groupBy(int... fields) { return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType())); } // reduce返回一个ReduceOperator public ReduceOperator<T> reduce(ReduceFunction<T> reducer) { return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName()); } }
对于业务来讲,reduce才是真正有意义的逻辑算子。
从前文的函数调用和ReduceOperator定义能够看出,.groupBy(0).reduce()
的调用结果是生成一个ReduceOperator,而 UnsortedGrouping 被设置为 ReduceOperator 的 grouper 成员变量,做为辅助操做。
public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> { private final ReduceFunction<IN> function; private final Grouping<IN> grouper; // UnsortedGrouping被设置在这里,后续reduce操做中会用到。 public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) { this.function = function; this.grouper = input; // UnsortedGrouping被设置在这里,后续reduce操做中会用到。 this.hint = CombineHint.OPTIMIZER_CHOOSES; // 优化时候会用到。 } }
让咱们顺着Flink程序执行阶段继续看看系统都作了些什么。
程序执行的第一步是:当程序运行时候,首先会根据java API的结果来生成执行plan。
public JobClient executeAsync(String jobName) throws Exception { final Plan plan = createProgramPlan(jobName); }
其中重要的函数是translateToDataFlow,由于在translateToDataFlow方法中,会从批处理Java API模块中operators包往核心模块中operators包的转换。
对于咱们的示例程序,在生成 Graph时,translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用。下面是代码缩减版。
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) { ...... // UnsortedGrouping中的keys被取出, else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { // reduce with field positions ReduceOperatorBase<IN, ReduceFunction<IN>> po = new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name); po.setCustomPartitioner(grouper.getCustomPartitioner()); po.setInput(input); po.setParallelism(getParallelism()); // 没有并行度的变化 return po;//translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用 } } }
咱们代码最终生成的执行计划以下,咱们能够看出来,执行计划基本符合咱们的估计:简单的从输入到输出。中间有意义的算子其实只有Reduce。
GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase
具体在代码中体现以下是:
plan = {Plan@1296} sinks = {ArrayList@1309} size = 1 0 = {GenericDataSinkBase@1313} "collect()" formatWrapper = {UserCodeObjectWrapper@1315} input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)" hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES" customPartitioner = null input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)" input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"
程序执行的第二步是:Flink对于Plan会继续优化,生成Optimized Plan。其核心代码位于PlanTranslator.compilePlan 函数,这里获得了Optimized Plan。
这个编译的过程不做任何决策与假设,也就是说做业最终如何被执行早已被优化器肯定,而编译也是在此基础上作肯定性的映射。因此咱们将集中精力看如何优化plan。
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) { Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration); OptimizedPlan optimizedPlan = optimizer.compile(plan); JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration); return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId()); }
在内部调用plan的accept方法遍历它。accept会挨个在每一个sink上调用accept。对于每一个sink会先preVisit,而后 postVisit。
这里优化时候有几个注意点:
在 GraphCreatingVisitor.preVisit 中,当发现Operator是 ReduceOperatorBase 类型的时候,会创建ReduceNode。
else if (c instanceof ReduceOperatorBase) { n = new ReduceNode((ReduceOperatorBase<?, ?>) c); }
ReduceNode是Reducer Operator的Optimizer表示。
public class ReduceNode extends SingleInputNode { private final List<OperatorDescriptorSingle> possibleProperties; private ReduceNode preReduceUtilityNode; }
生成ReduceNode时候,会根据以前提到的 hint 来决定 combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
public ReduceNode(ReduceOperatorBase<?, ?> operator) { DriverStrategy combinerStrategy; switch(operator.getCombineHint()) { case OPTIMIZER_CHOOSES: combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE; break; } }
生成的优化执行计划以下,咱们能够看到,这时候设置了并行度,也把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
Data Source ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE) ——> Reduce(SORTED_REDUCE) ——> Data Sink
具体在代码中体现以下是:
optimizedPlan = {OptimizedPlan@1506} allNodes = {HashSet@1510} size = 5 0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4
程序执行的第三步是:创建JobGraph。LocalExecutor.execute中会生成JobGraph。Optimized Plan 通过优化后生成了 JobGraph, JobGraph是提交给 JobManager 的数据结构。
主要的优化为,将多个符合条件的节点 chain 在一块儿做为一个节点,这样能够减小数据在节点之间流动所须要的序列化/反序列化/传输消耗。
JobGraph是惟一被Flink的数据流引擎所识别的表述做业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception { final JobGraph jobGraph = getJobGraph(pipeline, configuration); }
咱们能够看出来,这一步造成了一个Operator Chain:
CHAIN DataSource -> FlatMap -> Combine (Reduce)
因而咱们看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一块儿。
具体在程序中打印出来:
jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)" taskVertices = {LinkedHashMap@1742} size = 3 {JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)" {JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)" {JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
Job提交以后,就是程序正式运行了。这里实际上涉及到了三次排序,
这里是第一次排序。
当一批数据处理完成以后,在ChainedFlatMapDriver中调用到close函数进行发送数据给下游。
public void close() { this.outputCollector.close(); }
Operator Chain会调用到ChainedReduceCombineDriver.close
public void close() { // send the final batch try { switch (strategy) { case SORTED_PARTIAL_REDUCE: sortAndCombine(); // 咱们是在这里 break; case HASHED_PARTIAL_REDUCE: reduceFacade.emit(); break; } } catch (Exception ex2) { throw new ExceptionInChainedStubException(taskName, ex2); } outputCollector.close(); dispose(false); }
sortAndCombine中先排序,而后作combine,最后会不断发送数据。
private void sortAndCombine() throws Exception { final InMemorySorter<T> sorter = this.sorter; if (!sorter.isEmpty()) { sortAlgo.sort(sorter); // 这里会先排序 final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; final ReduceFunction<T> function = this.reducer; final Collector<T> output = this.outputCollector; final MutableObjectIterator<T> input = sorter.getIterator(); if (objectReuseEnabled) { ...... } else { T value = input.next(); // 这里就是combine // iterate over key groups while (running && value != null) { comparator.setReference(value); T res = value; // iterate within a key group while ((value = input.next()) != null) { if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); } else { // new key group break; } } output.collect(res); //发送数据 } } } }
最后发送给哪一个下游,是由OutputEmitter.selectChannel决定的。有以下几种决定方式:
hash-partitioning, broadcasting, round-robin, custom partition functions。这里采用的是PARTITION_HASH。
每一个task都会把一样字符串统计结果发送给一样的下游ReduceDriver。这就保证了下游Reducer必定不会出现统计出错。
public final int selectChannel(SerializationDelegate<T> record) { switch (strategy) { ... case PARTITION_HASH: return hashPartitionDefault(record.getInstance(), numberOfChannels); ... } } private int hashPartitionDefault(T record, int numberOfChannels) { int hash = this.comparator.hash(record); return MathUtils.murmurHash(hash) % numberOfChannels; }
具体调用栈:
hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime) hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime) hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping) emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining) close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining) close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics) close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining) invoke:215, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
这里是第二次排序。
在 BatchTask中,会先Sort, Merge输入,而后才会交由Reduce来具体完成过。sort & merge操做具体是在UnilateralSortMerger类中完成的。
getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort) getInput:1110, BatchTask (org.apache.flink.runtime.operators) prepare:95, ReduceDriver (org.apache.flink.runtime.operators) run:474, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
UnilateralSortMerger是一个full fledged sorter,它实现了一个多路merge sort。其内部的逻辑被划分到三个线程上(read, sort, spill),这三个线程彼此之间经过一系列blocking queues来构成了一个闭环。
其内存经过MemoryManager分配,因此这个组件不会超过给其分配的内存。
该类主要变量摘录以下:
public class UnilateralSortMerger<E> implements Sorter<E> { // ------------------------------------------------------------------------ // Threads // ------------------------------------------------------------------------ /** The thread that reads the input channels into buffers and passes them on to the merger. */ private final ThreadBase<E> readThread; /** The thread that merges the buffer handed from the reading thread. */ private final ThreadBase<E> sortThread; /** The thread that handles spilling to secondary storage. */ private final ThreadBase<E> spillThread; // ------------------------------------------------------------------------ // Memory // ------------------------------------------------------------------------ /** The memory segments used first for sorting and later for reading/pre-fetching * during the external merge. */ protected final List<MemorySegment> sortReadMemory; /** The memory segments used to stage data to be written. */ protected final List<MemorySegment> writeMemory; /** The memory manager through which memory is allocated and released. */ protected final MemoryManager memoryManager; // ------------------------------------------------------------------------ // Miscellaneous Fields // ------------------------------------------------------------------------ /** * Collection of all currently open channels, to be closed and deleted during cleanup. */ private final HashSet<FileIOChannel> openChannels; /** * The monitor which guards the iterator field. */ protected final Object iteratorLock = new Object(); /** * The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in * progress and it will be set once we have < merge factor sorted sub-streams that will then be streamed sorted. */ protected volatile MutableObjectIterator<E> iterator; // 若是你们常常调试,就会发现driver中的input都是这个兄弟。 private final Collection<InMemorySorter<?>> inMemorySorters; }
ReadingThread:这种线程持续读取输入,而后把数据放入到一个待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.
SortingThread : 这种线程对于上游填充好的buffer进行排序。The thread that sorts filled buffers.
SpillingThread:这种线程进行归并操做。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.
UnilateralSortMerger有一个特殊变量:
protected volatile MutableObjectIterator<E> iterator;
这个变量就是最终sort-merger的输出。若是你们调试过算子,就会发现这个变量就是具体算子的输入input类型。最终算子的输入就是来自于此。
这里是第三次排序,咱们能够看出来reduce是怎么和groupby一块儿运做的。
.groupBy(0)
,ReduceDriver就是单纯获取输入的第一个数值 T value = input.next();
comparator.setReference(value);
由于groubBy只是指定按照第一个位置比较,没有指定具体key数值,因此这个value就是key了。此处记为while (1)
,代码中有注解。while (2)
while (2)
以后,代码依然在 while (1)
,此时value是新值,因此继续在 while (1)
中运行 。把value继续赋于比较算子 comparator.setReference(value);
,因而进行新的key比较。public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { @Override public void run() throws Exception { final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; final ReduceFunction<T> function = this.taskContext.getStub(); final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { ...... } else { // 针对 `.groupBy(0)`,ReduceDriver就是单纯获取输入的第一个数值 `T value = input.next();` T value = input.next(); // while (1) // iterate over key groups while (this.running && value != null) { numRecordsIn.inc(); // 把value赋于比较算子,这个value就是key了。 comparator.setReference(value); T res = value; // while (2) // iterate within a key group,循环比较这个key while ((value = input.next()) != null) { numRecordsIn.inc(); if (comparator.equalToReference(value)) { // same group, reduce,若是下一个数值是同一个key,就reduce res = function.reduce(res, value); } else { // new key group,若是下一个数值不是同一个key,就跳出循环,放弃比较。 break; } } // 把reduce结果输出 output.collect(res); } } } }
mapreduce里的shuffle 里的 sort merge 和combine
实战录 | Hadoop Mapreduce shuffle之Combine探讨