Flink 原理与实现:数据流上的类型和操做

Flink 为流处理和批处理分别提供了 DataStream API 和 DataSet API。正是这种高层的抽象和 flunent API 极大地便利了用户编写大数据应用。不过不少初学者在看到官方 Streaming 文档中那一大坨的转换时,经常会蒙了圈,文档中那些只言片语也很难讲清它们之间的关系。因此本文将介绍几种关键的数据流类型,它们之间是如何经过转换关联起来的。下图展现了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。数据库

DataStream

DataStream 是 Flink 流处理 API 中最核心的数据结构。它表明了一个运行在多个分区上的并行流。一个 DataStream 能够从 StreamExecutionEnvironment 经过env.addSource(SourceFunction) 得到。缓存

DataStream 上的转换操做都是逐条的,好比 map()flatMap()filter()。DataStream 也能够执行 rebalance(再平衡,用来减轻数据倾斜)和 broadcaseted(广播)等分区转换。数据结构

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = stream.rebalance()
val str3: DataStream[AnotherType] = stream.map { ... }

上述 DataStream 上的转换在运行时会转换成以下的执行图:app

如上图的执行图所示,DataStream 各个算子会并行运行,算子之间是数据流分区。如 Source 的第一个并行实例(S1)和 flatMap() 的第一个并行实例(m1)之间就是一个数据流分区。而在 flatMap() 和 map() 之间因为加了 rebalance(),它们之间的数据流分区就有3个子分区(m1的数据流向3个map()实例)。这与 Apache Kafka 是很相似的,把流想象成 Kafka Topic,而一个流分区就表示一个 Topic Partition,流的目标并行算子实例就是 Kafka Consumers。ide

KeyedStream

KeyedStream用来表示根据指定的key进行分组的数据流。一个KeyedStream能够经过调用DataStream.keyBy()来得到。而在KeyedStream上进行任何transformation都将转变回DataStream。在实现中,KeyedStream是把key的信息写入到了transformation中。每条记录只能访问所属key的状态,其上的聚合函数能够方便地操做和保存对应key的状态。函数

WindowedStream & AllWindowedStream

WindowedStream表明了根据key分组,而且基于WindowAssigner切分窗口的数据流。因此WindowedStream都是从KeyedStream衍生而来的。而在WindowedStream上进行任何transformation也都将转变回DataStream学习

val stream: DataStream[MyType] = ...
val windowed: WindowedDataStream[MyType] = stream
        .keyBy("userId")
        .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
val result: DataStream[ResultType] = windowed.reduce(myReducer)

上述 WindowedStream 的样例代码在运行时会转换成以下的执行图:大数据

Flink 的窗口实现中会将到达的数据缓存在对应的窗口buffer中(一个数据可能会对应多个窗口)。当到达窗口发送的条件时(由Trigger控制),Flink 会对整个窗口中的数据进行处理。Flink 在聚合类窗口有必定的优化,即不会保存窗口中的全部值,而是每到一个元素执行一次聚合函数,最终只保存一份数据便可。优化

在key分组的流上进行窗口切分是比较经常使用的场景,也可以很好地并行化(不一样的key上的窗口聚合能够分配到不一样的task去处理)。不过有时候咱们也须要在普通流上进行窗口的操做,这就是 AllWindowedStreamAllWindowedStream是直接在DataStream上进行windowAll(...)操做。AllWindowedStream 的实现是基于 WindowedStream 的(Flink 1.1.x 开始)。Flink 不推荐使用AllWindowedStream,由于在普通流上进行窗口操做,就势必须要将全部分区的流都聚集到单个的Task中,而这个单个的Task很显然就会成为整个Job的瓶颈。spa

JoinedStreams & CoGroupedStreams

双流 Join 也是一个很是常见的应用场景。深刻源码你能够发现,JoinedStreams 和 CoGroupedStreams 的代码实现有80%是如出一辙的,JoinedStreams 在底层又调用了 CoGroupedStreams 来实现 Join 功能。除了名字不同,一开始很难将它们区分开来,并且为何要提供两个功能相似的接口呢??

实际上这二者仍是很点区别的。首先 co-group 侧重的是group,是对同一个key上的两组集合进行操做,而 join 侧重的是pair,是对同一个key上的每对元素进行操做。co-group 比 join 更通用一些,由于 join 只是 co-group 的一个特例,因此 join 是能够基于 co-group 来实现的(固然有优化的空间)。而在 co-group 以外又提供了 join 接口是由于用户更熟悉 join(源于数据库吧),并且可以跟 DataSet API 保持一致,下降用户的学习成本。

JoinedStreams 和 CoGroupedStreams 是基于 Window 上实现的,因此 CoGroupedStreams 最终又调用了 WindowedStream 来实现。

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
 
val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)
    .where("userId").equalTo("id")
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

上述 JoinedStreams 的样例代码在运行时会转换成以下的执行图:

双流上的数据在同一个key的会被分别分配到同一个window窗口的左右两个篮子里,当window结束的时候,会对左右篮子进行笛卡尔积从而获得每一对pair,对每一对pair应用 JoinFunction。不过目前(Flink 1.1.x)JoinedStreams 只是简单地实现了流上的join操做而已,距离真正的生产使用仍是有些距离。由于目前 join 窗口的双流数据都是被缓存在内存中的,也就是说若是某个key上的窗口数据太多就会致使 JVM OOM(然而数据倾斜是常态)。双流join的难点也正是在这里,这也是社区后面对 join 操做的优化方向,例如能够借鉴Flink在批处理join中的优化方案,也能够用ManagedMemory来管理窗口中的数据,并当数据超过阈值时能spill到硬盘。

ConnectedStreams

在 DataStream 上有一个 union 的转换 dataStream.union(otherStream1, otherStream2, ...),用来合并多个流,新的流会包含全部流中的数据。union 有一个限制,就是全部合并的流的类型必须是一致的。ConnectedStreams 提供了和 union 相似的功能,用来链接两个流,可是与 union 转换有如下几个区别:

  1. ConnectedStreams 只能链接两个流,而 union 能够链接多于两个流。
  2. ConnectedStreams 链接的两个流类型能够不一致,而 union 链接的流的类型必须一致。
  3. ConnectedStreams 会对两个流的数据应用不一样的处理方法,而且双流之间能够共享状态。这在第一个流的输入会影响第二个流时, 会很是有用。

以下 ConnectedStreams 的样例,链接 input 和 other 流,并在input流上应用map1方法,在other上应用map2方法,双流能够共享状态(好比计数)。

val input: DataStream[MyType] = ...
val other: DataStream[AnotherType] = ...
 
val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)
 
val result: DataStream[ResultType] = 
        connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {
            override def map1(value: MyType): ResultType = { ... }
            override def map2(value: AnotherType): ResultType = { ... }
        })

当并行度为2时,其执行图以下所示:

总结

本文介绍经过不一样数据流类型的转换图来解释每一种数据流的含义、转换关系。后面的文章会深刻讲解 Window 机制的实现,双流 Join 的实现等。

相关文章
相关标签/搜索