本文将从FlatMap概念和如何使用开始入手,深刻到Flink是如何实现FlatMap。但愿能让你们对这个概念有更深刻的理解。html
首先咱们先从概念入手。前端
自从响应式编程慢慢壮大以来,这两个单词如今愈来愈被你们熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。不少兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给你们简单讲解下。java
它把数组流
中的每个值,使用所提供的函数执行一遍,一一对应。获得与元素个数相同的数组流
。而后返回这个新数据流。apache
flat是扁平的意思。因此这个操做是:先映射(map),再拍扁(join)。编程
flatMap输入多是多个子数组流
。因此flatMap先针对 每一个子数组流
的每一个元素进行映射操做。而后进行扁平化处理,最后聚集全部进行扁平化处理的结果集造成一个新的列表(扁平化简而言之就是去除全部的修饰)。api
flatMap与map另一个不同的地方就是传入的函数在处理完后返回值必须是List。数组
好比拿到一个文本文件以后,咱们是按行读取,按行处理。若是要对每一行的单词数进行计数,那么应该选择Map方法,若是是统计词频,就应该选择flatMap方法。数据结构
若是还不清楚,能够看看下面这个例子:app
梁山新进一批好马,准备给每一个马军头领配置一批。因而获得函数以及头领名单以下: 函数 = ( 头领 => 头领 + 好马 ) 五虎将 = List(关胜、林冲、秦明、呼延灼、董平 ) 八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 ) // Map函数的例子 利用map函数,咱们能够获得 五虎将马军 五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 ) 结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 ) // flatMap函数的例子 可是为了获得统一的马军,则能够用flatMap: 马军头领 = List(五虎将,八骠骑) 马军 = 马军头领.flatMap( 头领 => 头领 + 好马 ) 结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 )
如今你们应该清楚了吧。接下来看看几个FlatMap的实例。框架
Scala自己对于List类型就有map和flatMap操做。举例以下:
val names = List("Alice","James","Apple") val strings = names.map(x => x.toUpperCase) println(strings) // 输出 List(ALICE, JAMES, APPLE) val chars = names.flatMap(x=> x.toUpperCase()) println(chars) // 输出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
以上是scala语言层面的实现。下面咱们看看Flink框架是如何使用FlatMap的。
网上常见的一个Flink应用的例子:
//加载数据源 val source = env.fromElements("china is the best country","beijing is the capital of china") //转化处理数据 val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
case class WordWithCount(word: String, count: Long) val text = env.socketTextStream(host, port, '\n') val windowCounts = text.flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") windowCounts.print()
上面提到的都是简单的使用,若是有复杂需求,在Flink中,咱们能够经过继承FlatMapFunction和RichFlatMapFunction来自定义算子。
FlatMapFunction
对于不涉及到状态的使用,能够直接继承 FlatMapFunction,其定义以下:
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; }
如何自定义算子呢,这个能够直接看看Flink中的官方例子
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens. public class Tokenizer implements FlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) { for (String token : value.split("\\W")) { out.collect(token); } } } // [...] DataSet<String> textLines = // [...] DataSet<String> words = textLines.flatMap(new Tokenizer());
RichFlatMapFunction
对于涉及到状态的状况,用户可使用继承 RichFlatMapFunction 类的方式来实现UDF。
RichFlatMapFunction属于Flink的Rich函数类。从名称上来看,这种函数类在普通的函数类上增长了Rich前缀,好比RichMapFunction
、RichFlatMapFunction
或RichReduceFunction
等等。比起普通的函数类,Rich函数类增长了:
open()
方法:Flink在算子调用前会执行这个方法,能够用来进行一些初始化工做。close()
方法:Flink在算子最后一次调用结束后执行这个方法,能够用来释放一些资源。getRuntimeContext
方法:获取运行时上下文。每一个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程当中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,咱们能够从上下文里获取状态数据。FlatMap对应的RichFlatMapFunction以下:
@Public public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> { @Override public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; }
其基类 AbstractRichFunction 以下,能够看到主要是和运行时上下文创建了联系,而且有初始化和退出操做:
@Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { return this.runtimeContext; } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } } @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} }
如何最好的使用? 固然仍是官方文档和例子最靠谱。
由于涉及到状态,因此若是使用,你必须建立一个 StateDescriptor
,才能获得对应的状态句柄。 这保存了状态名称(你能够建立多个状态,而且它们必须具备惟一的名称以即可以引用它们),状态所持有值的类型,而且可能包含用户指定的函数,例如ReduceFunction
。 根据不一样的状态类型,能够建立ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
状态经过 RuntimeContext
进行访问,所以只能在 rich functions 中使用。 可是咱们也会看到一个例子。RichFunction
中 RuntimeContext
提供以下方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
FoldingState getFoldingState(FoldingStateDescriptor)
MapState getMapState(MapStateDescriptor)
下面是一个 FlatMapFunction
的例子,展现了如何将这些部分组合起来:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { private var sum: ValueState[(Long, Long)] = _ override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { // access the state value val tmpCurrentSum = sum.value // If it hasn't been used before, it will be null val currentSum = if (tmpCurrentSum != null) { tmpCurrentSum } else { (0L, 0L) } // update the count val newSum = (currentSum._1 + 1, currentSum._2 + input._2) // update the state sum.update(newSum) // if the count reaches 2, emit the average and clear the state if (newSum._1 >= 2) { out.collect((input._1, newSum._2 / newSum._1)) sum.clear() } } override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) ) } } object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4) and (1,5) env.execute("ExampleManagedState") }
这个例子实现了一个简单的计数窗口。 咱们把元组的第一个元素看成 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态从新开始。 请注意,咱们会为每一个不一样的 key(元组中第一个元素)保存一个单独的值。
FlatMap从Flink编程模型角度讲属于一个算子,用来对数据流或者数据集进行转换。从框架角度说,FlatMap是怎么实现的呢? 或者说FlatMap是怎么从用户代码转换到Flink运行时呢 ?
首先说说 DataSet相关这套系统中FlatMap的实现。
请注意,DataSteam对应的那套系统中,operator名字都是带着Stream的,好比StreamOperator。
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
这段代码调用的就是DataSet中的API。具体以下:
public abstract class DataSet<T> { public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) { String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation); } }
能够看出,flatMap @ DataSet
主要就是生成了一个FlatMapOperator,这个能够理解为是逻辑算子。其定义以下:
public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> { protected final FlatMapFunction<IN, OUT> function; protected final String defaultName; public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) { super(input, resultType); this.function = function; this.defaultName = defaultName; } @Override protected FlatMapFunction<IN, OUT> getFunction() { return function; } // 这个translateToDataFlow就是生成计划(Plan)的关键代码 @Override protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "FlatMap at " + defaultName; // create operator FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set parallelism if (this.getParallelism() > 0) { // use specified parallelism po.setParallelism(this.getParallelism()); } else { // if no parallelism has been specified, use parallelism of input operator to enable chaining po.setParallelism(input.getParallelism()); } return po; } }
FlatMapOperator的基类以下:
public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> { } // Base class for operations that operates on a single input data set. public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> { private final DataSet<IN> input; }
DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成做业图(JobGraph)以前的实现差异很大。流处理程序是生成流图(StreamGraph),而批处理程序是生成计划(Plan)并由优化器对其进行优化并生成优化后的计划(OptimizedPlan)。
计划(Plan)以数据流(dataflow)的形式来表示批处理程序,但它只是批处理程序最初的表示,在一个批处理程序生成做业图以前,计划还会被进行优化以产生更高效的方案。Plan不一样于流图(StreamGraph),它以sink为入口,由于一个批处理程序可能存在若干个sink,因此Plan采用集合来存储它。另外Plan还封装了批处理做业的一些基本属性:jobId、jobName以及defaultParallelism等。
生成Plan的核心部件是算子翻译器(OperatorTranslation),createProgramPlan方法经过它来”翻译“出计划,核心代码以下
public class OperatorTranslation { // 接收每一个需遍历的DataSink对象,而后将其转换成GenericDataSinkBase对象 public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) { List<GenericDataSinkBase<?>> planSinks = new ArrayList<>(); //遍历sinks集合 for (DataSink<?> sink : sinks) { //将翻译生成的GenericDataSinkBase加入planSinks集合*,对每一个sink进行”翻译“ planSinks.add(translate(sink)); } //以planSins集合构建Plan对象 Plan p = new Plan(planSinks); p.setJobName(jobName); return p; } private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) { //会调用到 FlatMapOperator 的 translateToDataFlow org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input); } }
FlatMapOperatorBase就是生成的plan中的一员。
public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { @Override protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList<OUT> result = new ArrayList<OUT>(input.size()); TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer); for (IN element : input) { IN inCopy = inSerializer.copy(element); function.flatMap(inCopy, resultCollector); } FunctionUtils.closeFunction(function); return result; } }
而最后优化时候,则FlatMapOperatorBase会被优化成FlatMapNode。
public class GraphCreatingVisitor implements Visitor<Operator<?>> { public boolean preVisit(Operator<?> c) { else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); } } }
自此,FlatMap就被组合到 DataSet的 OptimizedPlan 中。下一步Flink会依据OptimizedPlan来生成 JobGraph。
做业图(JobGraph)是惟一被Flink的数据流引擎所识别的表述做业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。
在运行状态下,若是上游有数据流入,则FlatMap这个算子就会发挥做用。
对于DataStream,则是另一套体系结构。首先咱们找一个使用DataStream的例子看看。
//获取数据: 从socket中获取 val textDataStream = env.socketTextStream("127.0.0.1", 8888, '\n') val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1)) //groupby: 按照指定的字段聚合 val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1)) windowDstram.sum("count").print()
上面例子中,flatMap 调用的是DataStream中的API,具体以下:
public class DataStream<T> { public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { //clean函数用来移除FlatMapFunction类对象的外部类部分,这样就能够进行序列化 //getType用来获取类对象的输出类型 TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); return flatMap(flatMapper, outType); } // 构建了一个StreamFlatMap的Operator public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) { return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper))); } // 依次调用下去 @PublicEvolving public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); } protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // 构建Transform对象,Transform对象中包含其上游Transform对象,这样上游下游就串成了一个Transform链。 OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); // 将这Transform对象放入env的transform对象列表。 getExecutionEnvironment().addOperator(resultTransform); // 返回流 return returnStream; } }
上面源码中的几个概念须要澄清。
Transformation:首先,FlatMap在FLink编程模型中是算子API,在DataStream中会生成一个Transformation,即逻辑算子。
逻辑算子Transformation最后会对应到物理算子Operator,这个概念对应的就是StreamOperator。
StreamOperator:DataStream 上的每个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。
processElement()
方法也是UDF的逻辑被调用的地方,例如FlatMapFunction
里的flatMap()
方法。
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private transient TimestampedCollector<OUT> collector; public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); // 调用用户定义的FlatMap userFunction.flatMap(element.getValue(), collector); } }
咱们能够看到,StreamFlatMap继承了AbstractUdfStreamOperator,从而间接继承了StreamOperator。
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable { }
StreamOperator是根接口。对于 Streaming 来讲全部的算子都继承自 StreamOperator。继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator。实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractUdfStreamOperator。
从 API 到 逻辑算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink会依据StreamOperator来生成 JobGraph。
做业图(JobGraph)是惟一被Flink的数据流引擎所识别的表述做业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。
【Flink】Flink基础之实现WordCount程序(Java与Scala版本)