本文是博主阅读Flink官方文档以及《Flink基础教程》后结合本身理解所写,如有表达有误的地方欢迎大伙留言指出。html
流式计算分为有状态和无状态两种状况,所谓状态就是计算过程当中的中间值。对于无状态计算,会独立观察每一个独立事件,并根据最后一个事件输出结果。什么意思?大白话举例:对于一个流式系统,接受到一系列的数字,当数字大于N则输出,这时候在此以前的数字的值、和等状况,压根不关心,只和最后这个大于N的数字相关,这就是无状态计算。什么是有状态计算了?想求过去一分钟内全部数字的和或者平均数等,这种须要保存中间结果的状况是有状态的计算。java
当分布式计系统中引入状态计算时,就无可避免一致性的问题。为何了?由于如果计算过程当中出现故障,中间数据咋办了?如果不保存,那就只能从新从头计算了,否则怎么保证计算结果的正确性。这就是要求系统具备容错性了。算法
谈到容错性,就无法避免一致性这个概念。所谓一致性就是:成功处理故障并恢复以后获得的结果与没有发生任何故障是获得的结果相比,前者的正确性。换句大白话,就是故障的发生是否影响获得的结果。在流处理过程,一致性分为3个级别[1]:apache
Flink的容错机制保证了exactly-once,也能够选择at-least-once。Flink的容错机制是经过对数据流不停的作快照(snapshot)实现的。针对FLink的容错机制须要注意的是:要彻底保证exactly-once,Flink的数据源系统须要有“重放”功能,什么意思了?且听下面慢慢道来。缓存
Flink作快照的过程是基于“轻量级异步快照”的算法,其核心思想就是在计算过程当中保存中间状态和在数据流中对应的位置,至于如何实现的会后续的博客中会详细说明。这些保存的信息(快照)就至关因而系统的检查点(checkpoint)(相似于window系统发生死机等问题时恢复系统到某个时间点的恢复点),作snapshot也是作一个checkpoint。在系统故障恢复时,系统会从最新的一个checkpoint开始从新计算,对应的数据源也会在对应的位置“重放“。这里的“重放”可能会致使数据的二次输出,这点的处理也在后续的博客中说明。数据结构
在Flink作分布式快照过程当中核心一个元素Barriers的使用。这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每一个算子(operator,这里所说的算子不是指相似map()等具体意义上个的,指在JobGraph中优化后的“顶点”),这里须要说明的有两点:异步
以下图所示,Barriers将将数据流分红了一个个数据集。值得提醒的是,当barriers流经算子时,会触发与checkpoint相关的行为,保存的barriers的位置和状态(中间计算结果)。async
Update:checkpoint是由JobManager中的CheckpointCoordinator周期性触发,而后在Task侧生成barrier,具体为:在Source task(TaskManager中)中barrier会根据命令周期性的在原始数据中注入barrier,而对非source task则是遇到barrier作checkpoint,即非source task其作checkpoint的时间间隔也许不是周期的,影响因素较多。此外,每一个算子作checkpoint的方式也许不一样。分布式
能够打个比方,在河上有个大坝(至关于算子),接上级通知(Flink中的JobManager)要统计水流量等信息,因此有人在上游按期(source task)放一根木头(barrier)到河中,当第一木头遇到大坝时,大坝就记下经过大坝木头的位置、水流量等相关状况,即作checkpoint(实际生活中不太可能),当第二木头遇到大坝时记下第一个木头和第二根木头之间的水流量等状况,不须要重开始计算。这里先无论故障了,否则就很差解释相同的水的“重放”问题了。 优化
当一个算子有多个数据源时,又如何作checkpoint了?
以下图,从左往右一共4副图。当算子收到其中一个数据源的barriers,而未收到另外一个数据源的barriers时(如左1图),会将先到barriers的数据源中的数据先缓冲起来,等待另外一个barriers(如左2图),当收到两个barriers(如左3图)即接收到所有数据源的barrier时,会作checkpoint,保存barriers位置和状态,发射缓冲中的数据,释放一个对应的barriers。这里须要注意是,当缓存中数据没有被发射完时,是不会处理后续数据的,这样是为了保证数据的有序性。
这里其实有一点须要注意的是,由于系统设置checkpoint的方式是经过时间间隔的形式(enableCheckpointing(intervalTime)
),因此会致使一个问题:当一个checkpoint所需时间远大于两次checkpoint之间的时间间隔时,就颇有可能会致使后续的checkpoint会失败,如果这样状况比较严重时会致使任务失败,这样Flink系统的容错性的优点就等不到保证了,因此须要合理设计checkpoint间隔时间。
以下图所示,在一次snapshot中,算子会在接受到其数据源的全部barriers的之后snapshot它们的状态,而后在发射barriers到输出流中,直到最后全部的sink算子都完成snapshot才算完成一次snapshot。其中,在准备发射的barriers造成以前,state 形式是能够改变的,以后就不能够了。state的存贮方式是能够配置的,如HDFS,默认是在JobManager的内存中。
上述描述中,须要等待算子接收到全部barriers后,开始作snapshot,存储对应的状态后,再进行下一次snapshot,其状态的存储是同步的,这样可能会形成因snapshot引发较大延时。可让算子在存储快照时继续处理数据,让快照存储异步在后台运行。为此,算子必须能生成一个 state 对象,保证后续状态的修改不会改变这个 state 对象。例如 RocksDB 中使用的 copy-on-write(写时复制)类型的数据结构,即异步状态快照。对异步状态快照,其可让算子接受到barriers后开始在后台异步拷贝其状态,而没必要等待全部的barriers的到来。一旦后台的拷贝完成,将会通知JobManager。只有当全部的sink接收到这个barriers,和全部的有状态的算子都确认完成状态的备份时,一次snapshot才算完成。如何实现的,这点后续博客将仔细分析。
Ref:
[1]《Flink基础教程》
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html