Apache Flink流式处理

花了四小时,看完Flink的内容,基本了解了原理。 挖个坑,待总结后填一下。html

2019-06-02 01:22:57等欧冠决赛中,填坑。redis

1、概述

storm最大的特色是快,它的实时性很是好(毫秒级延迟)。为了低延迟它牺牲了高吞吐,而且不能保证exactly once语义。sql

在低延迟和高吞吐的流处理中,维持良好的容错是很是困难的,但为了获得有保障的准确状态,人们想到一种替代方法:将连续时间中的流数据分割成一系列微小的批量做业(微批次处理)。若是分割得足够小,计算几乎能够实现真正的流处理。由于存在延迟,因此不可能作到彻底实时,可是每一个简单的应用程序均可以实现仅有几秒甚至几亚秒的延迟。这就是Spark Streaming所使用的方法。apache

为了实现高吞吐和exactly once语义,storm推出了storm trident,也是使用了微批次处理的方法。编程

微批次处理缺点:架构

  1. 数据只能按固定时间分割,没有办法根据实际数据状况,进行不一样批次或每一个批次不一样大小的分割app

  2. 知足不了对数据实时性要求很是高的数据框架

初识flink

flink与storm,spark streaming的比较分布式

 

Apache flink主页在其顶部展现了该项目的理念:“Apache Flink是为分布式,高性能,随时可用以及准确的流处理应用程序打造的开源流处理框架”ide

流处理与批处理:

批处理的特色是有界、持久(数据已经落地)、大量,批处理很是适合须要访问全套记录才能完成的计算工做,通常用于离线统计。典型的是Hadoop,它只能用于批处理。

流处理的特色是无界、实时,流处理方式无需针对整个数据集执行操做,而是对经过系统传输的每一个数据项执行操做,通常用于实时统计。典型的是storm,它只能进行流处理。

 

有没有即能实现批处理,也能实现流处理?

spark即能进行流处理,也能实现批处理,但它并非在同一架构体系下,spark的批处理是经过spark core和spark sql实现,流处理是经过spark streaming实现。

flink什么特色呢?不论是批处理仍是流处理,它可以同时进行处理,由于它底层是不区分流批的。flink将批处理(即处理有限的静态数据)视做一种特殊的流处理。

2、flink基本架构

JobManager与TaskManager

若是粗化一点看的话,flink就是由2部分组成,即JobManager和TaskManager,是两个JVM进程。

JobManager:也称为master(对应spark里的driver),用于协调分布式执行,它们用来调度task,协调检查点,协调失败回复等。flink运行时至少存在一个master处理器,若是配置高可用模式则会存在多个master,它们其中一个是leader,而其它都是standby.

TaskManager:也称为worker(对应spark里面的executor),用于执行一个dataflow的task,数据缓冲和datastream的交换,flink运行时至少会存在一个worker处理器。

flink的编程模型

 

Stateful Stream Processing,是数据接入,计算,输出都是本身来实现,是最灵活也是最麻烦的编程接口。

DataStream/DataSet API是针对流和批处理的封装API,绝大多数的编程是在这一层。

Table API,是将数据抽像成一张表,提供select, group_by等API接口供调用。

SQL是最高级的接口,支持直接写SQL查询数据。

3、flink运行架构

任务提交流程

当启动新的Flink YARN会话时,客户端首先检查请求的资源(容器和内存)是否可用。以后,它将包含flink的jar和配置上传到HDFS(步骤1)。

客户端的下一步是请求(步骤2)YARN容器以启动ApplicationMaster(步骤3)。因为客户端将配置和jar文件注册为容器的资源,所以在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。一旦完成,ApplicationMaster(AM)就会启动。

该JobManager和AM在同一容器中运行。成功启动后,AM就很容易知道JobManager的地址(它本身的主机)。它为TaskManagers生成一个新的Flink配置文件(以便它们能够链接到JobManager)。该文件也被上传到HDFS。此外,AM容器还提供Flink的Web界面。YARN代码分配的全部端口都是临时端口。这容许用户并行执行多个Flink YARN会话。

以后,AM开始为Flink的TaskManagers分配容器,它将从HDFS下载jar文件和修改后的配置。完成这些步骤后,Flink即会设置并准备接受做业。

这个提交Yarn Session的整个过程,Yarn Session提交完成后,JobManager和TaskManager就启动完毕,等待用户任务的提交(jar包)

任务调度原理

Task Slot只平均分配内存,不分CPU!Slot指的是TaskManager可以并行执行的task最大数。

客户端将程序抽像成Datafow Graph,并能过Actor System通讯,将程序提交到JobManager,JobManager根据Dataflow Graph(相似Spark中的DAG),两个相临任务间的并行度变化,来划分任务,并将任务提交到TaskManager的Task Slot去执行。

TaskManager是一个独立的JVM进程,TaskManager和Slot能够看做worker pool模型,Slot是一个Worker,若是TaskManager里有Slot,才能被分配任务。

一个Slot里的Task,可能包含多个算子。Task按distributed进行划分,也就算子是否产生shuffle(spark里shuffle==flink的distributed).

若是有3个TaskManager,第个TaskManager中有3个Slot,那么最高支持的并行度是9,parallelism.default=9.

程序与数据流

flink程序的基础构架模块是流(streams)和转换(transformation)

fink经过source将流接进来,经过transformation算子对流进行转换,再经过sink将数据输出,这是一个flink程序的完整过程。

 

并行数据流

 

flink程序的执行具备并行、分布式的特性。在执行过程当中,一个stream包含一个或多个stream partition,而每一个operator包含一个或多个operator subtask,这些operator subtask在不一样的线程、不一样的物理机或不一样的容器中彼此互不依赖的执行。

一个特定的operator的subtask的个数被称之为其parallelism(并行度)。一个程序中,不一样的operator可能具备不一样的并行度。

stream在operator之间的传输数据的形式能够是one-to-one(forwarding)的模式,也能够是redistributing的模式,具体哪一种形式取决于operator的种类。

如上图map是one-to-one模式,而keyBy,window,apply是redistributing模式

one-to-one的算子,会被组合在一块儿成为operator-chain,一个operator-chain被分红一个task去执行。

 

4、DataStream API

flink程序结构

每一个flink程序都包含如下的若干个流程:

  1. 获取一个执行环境:execution enviroment

  2. 加载/建立初始数据:source

  3. 指定转换这些数据:transformation

  4. 指定放置计算结果的位置:sink

  5. 触发程序执行

Transformation

  • Map操做

遍历一个集合的全部元素,并对每一个元素作转换

输入一个参数,产生一个输出。

steam.map(item => item * 2)

  • FlatMap操做

输入一个参数,产生0个、1个或多个输出。

stream.flatMap(item => item.split(“ ”))

  • Filter操做

结算每一个元素的布尔值,并返回布尔值为true的元素。

stream.filter(item => item == 1)

  • Connect操做

DataStream1, DataStream2 -> ConnectedStreams

在ConnectedStream的内部,stream仍是分开的,也就是说,想对ConnectedStream执行一个Map/Filter等操做,要传入2个函数。第1个对DataStream1操做,第2个对DataStream2操做。

streamConnect  = stream1.connect(stream2)

streamConnect.map(item => item * 2, item => (item, 1L))

  • coMap, coFlatMap操做

ConnectedStreams -> DataStream

stream  = streamConnect.map(item => item * 2, item => (item, 1L))

输入是一个ConnectedStream,输出是一个普通的DataStream

  • Split + Select操做

DataStream -> SplitStream

val streamSplit = stream.split(word => (“haddoop”.equals(word)) match {

case true => List(“hadoop”)

case false => List(“other”)

}

)

上面split将流划分红2个流

val streamSelect001 = streamSplit.select(“hadoop”)

select 将指定的一个流取出来。

  • union操做

对两个或者两个以上的DataStream进行union操做,产生一个包含全部DataStream元素的新的DataStream。

  • KeyBy

DataStream -> KeyedStream,输入必须是Tuple类型,逻辑地将一个流拆分红不相交的分区,每一个分区包含具备相同Key的元素,在内部以Hash的形式实现。

val env  = StreamExecutionEnvironment.getExecutionEnvironment

var stream = env.readTextFile(“test.txt”)

val streamMap = stream.flaMap(item => item.split(“ “)).map(item => (item, 1L))

val streamKeyBy = streamMap.keyBy(0) //keyBy能够根据Tuple中的第一个元素,也能够根据第二个元素,进行partition。0表明第一个元素。

  • Reduce操做

KeyedStream -> DataStream:一个分组数据流的聚合操做,合并当前元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val streamReduce = streamKeyBy.reduce(

(item1, item2) => (item1._1, item1._2 + item2._2)

)

  • Fold操做

KeyedStream -> DataStream:一个有初始值的分组数据流的滚动折叠操做。给窗口赋一个fold功能的函数,并返回一个fold后结果

  • Aggregation操做

5、Time与Window

时间

flink中有3个时间,

EventTime:事件生成的时间

IngestionTime:事件进入flink的时间

WindowProcessingTime:事件被处理的系统时间(默认使用)

窗口

不能直接对无界的流进行聚合,要先将流划分为window,再对window进行聚合。

window分为两类:

CountWindow:按照指定的数据条数,生成一个window,与时间无关

TimeWindow:按照时间生成window

对于TimeWindow,能够根据窗口实现原理的不一样分红三类:

滚动窗口(Tumbing Window)—— 没有重叠

滑动窗口(Sliding Window)—— 重叠,有窗口长度和滑动步长两个属性

会话窗口(Sessionn Window)—— 若是相临的两条数据,间隔时间超过会话窗口时间大小,则前面的数据生成一个窗口。

每知足滑动步长,会针对window执行一次计算

val streanWindow = streamKeyBy.timeWindow(Time.Seconds(10), Time.Seconds(2)).reduce(

(item1, item2) => (item1._1, item1._2 + item2._2)

)

6、EventTime和Window

引入EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

WaterMark

若是用EventTime来决定窗口的运行,一旦出现乱序,咱们不能明确数据是否已经所有到位,但又不能无限期的等下去,此时必须有个机制来保证一个特定的时间后,必须触发window的计了,这个特别的机制就是watermark.


WaterMark是一种衡量EventTime进展的机制,它是数据自己的一个隐藏属性,数据自己携带着对应的WaterMark。

WaterMark是用于处理乱序事件的,而正确的处理乱序事件,一般用WaterMark机制结合window来实现。


数据流中的WaterMark用于表示eventTime小于watermark的数据,都已经到达了,所以,window的执行也是由WaterMark触发的。


WaterMark能够理解成一个延迟触发机制,咱们能够设置WaterMark的延时时长为t,每次系统会校验已经到达的数据中最大的maxEventTime,而后认定eventTime小于maxEventTime-t的全部数据都已经到达,若是有窗口的中止时间等于maxEventTime-t,那么这个窗口被触发。


当Flink接收到每一条数据时,都会计算产生一条watermark,watermark = 当前全部到达数据中的maxEventTime - 延迟时长,也就是说,watermark是由数据携带的,一量数据携带的watermark比当前未触发的窗口的中止时间要晚,那么就会触发相应的窗口的执行。因为watermark是由数据携带的,所以,若是运行过程当中没法获取新的数据,那么没有被触发的窗口将永远不被触发。


EventTime的窗口与Time里的窗口区别:

窗口大小设置为5s,Time窗口每5秒执行一次,无论有没有数据。

EventTime每5s生成一个窗口,但不执行。当触发条件知足后,才会执行窗口。

附录

https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html

相关文章
相关标签/搜索