【4.分布式计算】flink

flink有批处理和流处理的计算功能,其中批处理是用流计算来模拟,更多数据处理见:https://segmentfault.com/a/11...,分布式部署;计算相关的并行模式,流处理时间窗口,容错处理,增量计算等。
官方:https://flink.apache.orghtml

逻辑架构

clipboard.png

clipboard.png

部署架构

  • Standalone
    job manager(master)+wokers
  • 基于yarn的部署
    clipboard.png
  • HA:job manager单点
    Standalone : Zookeeper
    对于 Yarn Cluaster 模式来讲,Flink 就要依靠 Yarn 自己来对 JobManager 作 HA 了。其实这里彻底是 Yarn 的机制。对于 Yarn Cluster 模式来讲,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就彻底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同样)。

运行架构

clipboard.png

  • client
    当用户提交一个Flink程序时,会首先建立一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,因此Client须要从用户提交的Flink程序配置中获取JobManager的地址,并创建到JobManager的链接,将Flink Job提交给JobManager。Flink程序=》JobGraph(Flink Dataflow:多个JobVertex组成的DAG,一个JobGraph包含了一个Flink程序的以下信息:JobID、Job名称、配置信息、一组JobVertex等)。
  • jobmanager
    它负责接收Flink Job,调度组成Job的多个Task的执行。同时,JobManager还负责收集Job的状态信息,并管理Flink集群中从节点TaskManager。JobManager所负责的各项管理功能,它接收到并处理的事件主要包括:RegisterTaskManager,SubmitJob,CancelJob,UpdateTaskExecutionState,RequestNextInputSplit,JobStatusChanged
  • worker
    JVM进程多线程,task slot内存隔离资源单位,一个job的的多讴歌subtask能够共享slot,

计算模式

在 Hadoop 中 Map 和 Reduce 是两个独立调度的 Task,而且都会去占用计算资源。对 Flink 来讲 MapReduce 是一个 Pipeline 的 Task,只占用一个计算资源web

clipboard.png

https://ci.apache.org/project...
以上有6个源,6个map,6个reduce。在2个TM(每一个3个slots)的并行执行方式以下算法

clipboard.png

clipboard.png
其中每一个可并行的有一个JV和并行的EV.好比source会在一个JV中保含6个EV,ExecutionGraph还包含IntermediateResult和IntermediateResultPartition。前者跟踪IntermediateDataSet的状态,后者是每一个分区的状态。apache

clipboard.png

窗口与时间

支持窗口

1)倾斜窗口(Tumbling Windows,记录没有重叠,固定窗口大小时间间隔)
2)滑动窗口(Slide Windows,记录有重叠,固定窗口大小和窗口间隔)
3)会话窗口(Session Windows,在内部,会话窗口操做员为每一个到达的记录建立一个新窗口,若是它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一块儿。为了可合并的,会话窗口操做者须要一个合并触发器和一个合并 的窗函数)
4)全局窗口 全局窗口自动以触发器,自定义聚合方式等,
能够基于时间或数据计数(https://flink.apache.org/news...segmentfault

支持时间

事件时间,到达时间,处理时间
基于事件时间(事件建立时间)的水位线watermark算法(延后固定或推理出的关系式个时长,以便排除事件发生处处理的时长,来收集此刻建立的事件流):
clipboard.pngwindows

当一、watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)&& 二、在[window_start_time,window_end_time)中有数据存在 时窗口关闭开始计算
以下图:设定的maxOutOfOrderness=10000L(10s),窗口3s
clipboard.png缓存

  • 按期水位线
    用户定义maxOutOfOrderness,两次水位线之间的数据能够用来调用方法生成下一次的时间,再日后推迟maxOutOfOrderness的时间便可。好比多线程

    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
        val maxOutOfOrderness = 3500L; // 3.5 seconds
        var currentMaxTimestamp: Long;
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp;
        }
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
  • 标点水位线
    数据流中有标记事件才调用extractTimestamp生成新的wartermark
  • 对于map等在graph中交叉的流事件时间,是取输入流事件时间的最小时间
  • 迟到事件:
    从新激活已经关闭的窗口并从新计算以修正结果。要保存上次结果从新计算,可能每一个迟到事件都要触发。
    将迟到事件收集起来另外处理。直接返回收集结果
    将迟到事件视为错误消息并丢弃。

容错

  • 流障碍注入
    快照:https://arxiv.org/pdf/1506.08...
    源于Chandy-Lamport算法https://lamport.azurewebsites...
    https://ci.apache.org/project...
    过程:
    将流障碍被注入流源的并行数据流中,检查点协调器会将快照n的屏障发送到其全部输出流中。
    一旦接收器操做员(流式DAG的末端)从其全部输入流接收到障碍n,本身状态持久化,向快照n确认检查点协调器。
    在全部接收器确认快照后,它被视为已完成。
    一旦完成了快照n,做业将永远再也不向源请求来自Sn以前的记录,由于此时这些记录(及其后代记录)将经过整个数据流拓扑。

    clipboard.png

  • 彻底一次调用保证
    对齐(google的millwheel用的每一个数据生成惟一编号,dedup去重实现exactly-once(milwheel)) 接收到一个流的n后,这个流的数据暂存,直到其余流也到n,对其发出快照。

    clipboard.png

  • 存储
    状态也要存储(转换函数,系统窗口数据缓冲区等等),信息很大,单独state backend存储,可存储在HDFS中(选项有内存,rocksdb等)

    clipboard.png

内部优化

避免特定状况下Shuffle、排序等昂贵操做,中间结果有必要进行缓存架构

批处理

Flink执行批处理程序做为流程序的特殊状况,其中流是有界的(有限数量的元素)。所以,上述概念以相同的方式应用于批处理程序,而且它们适用于流程序,除了少数例外:
批处理程序的容错不使用检查点。经过彻底重放流来进行恢复。成本更低。
支持迭代计算。机器学习

相关文章
相关标签/搜索