有赞实时任务优化:Flink Checkpoint 异常解析与应用实践

做者:沈磊(有赞大数据)缓存

有赞实时任务主要以 Flink 为主,为了保证明时任务的容错恢复以及中止重启时的状态恢复,几乎全部的实时任务都会开启 Checkpoint 或者触发 Savepoint 进行状态保存。因为 Savepoint 底层原理的实现和 Checkpoint 几乎一致,本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint 原理流程以及常见缘由分析,让用户可以更好的理解 Flink Checkpoint,从而开发出更健壮的实时任务。网络

1、 什么是 Flink Checkpoint 和状态

1.1 Flink Checkpoint 是什么

Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即便忽然遇到异常或者机器问题时也可以进行自我恢复。Flink Checkpoint 对于用户层面来讲,是透明的,用户会感受实时任务一直在运行。架构

Flink Checkpoint 是 Flink 自身的系统行为,用户没法对其进行交互,用户能够在程序启动以前,设置好实时任务 Checkpoint 相关的参数,当任务启动以后,剩下的就全交给 Flink 自行管理。并发

1.2 为何要开启 Checkpoint

实时任务不一样于批处理任务,除非用户主动中止,通常会一直运行,运行的过程当中可能存在机器故障、网络问题、外界存储问题等等,要想实时任务一直可以稳定运行,实时任务要有自动容错恢复的功能。而批处理任务在遇到异常状况时,在从新计算一遍便可。实时任务由于会一直运行的特性,若是在从头开始计算,成本会很大,尤为是对于那种运行时间好久的实时任务来讲。框架

实时任务开启 Checkpoint 功能,也可以减小容错恢复的时间。由于每次都是从最新的 Chekpoint 点位开始状态恢复,而不是从程序启动的状态开始恢复。举个列子,若是你有一个运行一年的实时任务,若是容错恢复是从一年前启动时的状态恢复,实时任务可能须要运行好久才能恢复到如今状态,这通常是业务方所不容许的。函数

1.3 Flink 任务状态是什么

渠道文章宣传内页.png

Flink Checkpoint 会将实时任务的状态存储到远端存储,好比 HDFS ,亚马逊的 S3 等等。Flink 任务状态能够理解为实时任务计算过程当中,中间产生的数据结果,同时这些计算结果会在后续实时任务处理时,可以继续进行使用。实时任务的状态能够是一个聚合结果值,好比 WordCount 统计的每一个单词的数量,也能够是消息流中的明细数据。大数据

Flink 任务状态总体能够划分两种:Operator 状态和 KeyedState。常见的 Operator 状态,好比 Kafka Topic 每一个分区的偏移量。KeyedState 是基于 KeyedStream 来使用的,因此在使用前,你须要对你的流经过 keyby 来进行分区,常见的状态好比有 MapState、ListState、ValueState 等等。优化

下面是一个实时计算奇数和偶数的任务的示例:spa

640.jpeg

在上图中,假如输入的流来自于 Kafka ,那么 Kafka Topic 分区的偏移量是状态,全部奇数的和、全部偶数的和也都是状态。线程

2、 Flink Checkpoint 流程和原理

2.1 开启 Checkpoint 功能

想要使用 Flink Checkpoint 功能,首先是要在实时任务开启 Checkpoint。Flink 默认状况下是关闭 Checkpoint 功能,下面代码是开启 Checkpoint :

640-2.jpeg

上述代码中,设置了 Flink Checkpoint 的间隔 3 秒,设置的 Checkpoint 的语义为 EXACTLY_ONCE。Flink 默认的 Checkpoint 语义为 EXACTLY_ONCE。上述代码也使用 RocksDBStateBackend 进行状态存储。用户也能够本身设置 Flink Checkpoint 的参数,经过 CheckpointConfig 这个类进行设置,代码以下:

CheckpointConfig chkConfig = env.getCheckpointConfig();
/** 调用 CheckpointConfig 各类 set 方法 */
chkConfig.setX

2.2 Flink 一次 Checkpoint 的参与者

Flink 总体做业采用主从架构,Master 为 JobManager,Slave 为 TaskManager,Client 则是负责提交用户实时任务的代码逻辑 ,Flink 总体框架图以下图所示:

640-3.jpeg

JobManager 主要负责实时任务的调度以及对 Checkpoint 的触发,TaskManager 负责真正用户的代码执行逻辑,具体表现形式则是 Task 在 TaskManager上面进行运行,一个 Task 对应一个线程,它可能运行一个算子的 SubTask,也多是运行多个 Chain 起来的算子的 SubTask。

Flink 实时任务一次 Checkpoint 的参与者主要包括三块:JobManager、TaskManager以及 Zookeeper。JobManager 定时会触发执行 Checkpoint,具体则是在 JobManager 中运行的 CheckpointCoordinator 中触发全部 Source 的 SubTask 向下游广播 CheckpointBarrier。

TaskManager 收到 CheckpointBarrier 后,根据 Checkpoint 的语义,决定是否在进行 CheckpointBarrier 对齐时,缓冲后续的数据记录,当收到全部上游输入的 CheckpointBarrier 后,开始作 Checkpoint。TaskManager Checkpoint 完成后,会向 JobManager 发送确认完成的消息。只有当全部 Sink 算子完成 Checkpoint 且发送确认消息后,该次 Checkpoint 才算完成。

在高可用模式下,ZooKeeper 主要存储最新一次 Checkpoint 成功的目录,当Flink 任务容错恢复时,会从最新成功的 Checkpoint 恢复。Zookeeper 同时也存储着 Flink 做业的元数据信息。好比在高可用模式下,Flink 会将 JobGraph 以及相关 Jar 包存储在 HDFS 上面,Zookeeper 记录着该信息。再次容错重启时,读取这些信息,进行任务启动。

下图是一次 Checkpoint 的参与者:

640-4.jpeg

2.3 Checkpoint 协调者:CheckpointCoordinator

CheckpointCoordinator,是 Checkpoint 中最重要的类,协调着实时任务整个 Checkpoint 的执行。下图是 CheckpointCoordinator 中的方法:

640-5.jpeg

Flink CheckpointCoordinator 中有几个比较重要的方法:

  1. triggerCheckpoint,触发 Flink 任务进行 Checkpoint 的方法
  2. triggerSavepoint,触发 Flink 任务 Savepoint 的方法
  3. restoreSavepoint,Flink 任务从 Savepoint 状态恢复
  4. restoreLatestCheckpointedState,从最新一次 Checkpoint 点位状态恢复
  5. receiveAcknowledgeMessage,接受 Operator SubTask Checkpoint 完成的消息并处理

Flink CheckpointCoordinator 类是在 ExecutionGraph 造成时进行初始化的,具体则是在 ExecutionGraph 建立以后,调用 enableCheckpointing 方法,而后在该方法中,CheckpointCoordinator 进行建立。如下是 Flink Checkpoint 触发的时序图:

640-6.jpeg

当 Flink 做业状态由建立到运行时,CheckpointCoordinator 中的 ScheduledThreadPoolExecutor 会定时执行 ScheduledTrigger 中的逻辑。ScheduledTrigger 本质就是一个 Runnable,run 方法中执行 triggerCheckpoint 方法。

2.4 Flink Checkpoint 流程与原理

一次 Flink Checkpoint 的流程是从 CheckpointCoordinator 的 triggerCheckpoint 方法开始,下面来看看一次 Flink Checkpoint 涉及到的主要内容:

  1. Checkpoint 开始以前先进行预检查,好比检查最大并发的 Checkpoint 数,最小的 Checkpoint 之间的时间间隔。默认状况下,最大并发的 Checkpoint 数为 1,最小的 Checkpoint 之间的时间间隔为 0.
  2. 判断全部 Source 算子的 Subtask (Execution) 是否都处于运行状态,有则直接报错。同时检查全部待确认的算子的 SubTask(Execution)是不是运行状态,有则直接报错。
  3. 建立 PendingCheckpoint,同时为该次 Checkpoint 建立一个 Runnable,即超时取消线程,默认 Checkpoint 十分钟超时。
  4. 循环遍历全部 Source 算子的 Subtask(Execution),最底层调用 Task 的triggerCheckpointBarrier, 广播 CheckBarrier 到下游 ,同时 Checkpoint 其状态。
  5. 下游的输入中有 CheckpointBarrierHandler 类来处理 CheckpoinBarrier,而后会调用 notifyCheckpoint 方法,通知 Operator SubTask 进行 Checkpoint。
  6. 每当 Operator SubTask 完成 Checkpoint 时,都会向 CheckpointCoordoritor 发送确认消息。CheckpointCoordinator 的 receiveAcknowledgeMessage 方法会进行处理。
  7. 在一次 Checkpoint 过程当中,当全部从 Source 端到 Sink 端的算子 SubTask 都完成以后,CheckpointCoordoritor 会通知算子进行 notifyCheckpointCompleted 方法,前提是算子的函数实现 CheckpointListener 接口。

Flink 会定时在任务的 Source 算子的 SubTask 触发 CheckpointBarrier,CheckpointBarrier 是一种特殊的消息事件,会随着消息通道流入到下游的算子中。只有当最后 Sink 端的算子接收到 CheckpointBarrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成。因此在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,咱们能够在 Flink Web UI上面看到各个 Task 的 CheckpointBarrier 对齐时间。

下图是一次 Flink Checkpoint 实例流程示意图:

640-7.jpeg

Flink Checkpoint 保存的任务状态在程序取消中止时,默认会进行清除。Checkpoint 状态保留策略主要有两种:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储的状态文件。RETAIN_ON_CANCELLATION 表示当程序取消时,保存以前的 Checkpoint 存储的状态文件 用户能够结合业务状况,设置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 开启 checkpoint */
env.enableCheckpointing(10000);
/** 设置 checkpoint 保留策略,取消程序时,保留 checkpoint 状态文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.5 Flink Checkpoint 语义

Flink Checkpoint 支持两种语义:Exactly_Once 和 At_least_Once,默认的 Checkpoint 语义是 Exactly_Once。具体语义含义以下:

Exactly_Once 含义是:保证每条数据对于 Flink 任务的状态结果只影响一次。打个比方,好比 WordCount 程序,目前实时统计的 "hello" 这个单词数为 5,同时这个结果在此次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 以前, 又来 2 个 "hello" 单词,忽然程序遇到外部异常自动容错恢复,会从最近的 Checkpoint 点开始恢复,那么会从单词数为 5 的这个状态点开始恢复,Kafka 消费的数据点位也是状态为 5 这个点位开始计算,因此即便程序遇到外部异常自动恢复时,也不会影响到 Flink 状态的结果计算。

At_Least_Once 含义是:每条数据对于 Flink 任务的状态计算至少影响一次。好比在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,由于同一条消息,当 Flink 任务容错恢复后,可能将其计算屡次。

Flink 中 Exactly_Once 和 At_Least_Once 具体是针对 Flink 任务状态而言的,并非 Flink 程序对消息记录只处理一次。举个例子,当前 Flink 任务正在作 Checkpoint,该次 Checkpoint 尚未完成,此次 Checkpoint 时间段的数据其实已经进入 Flink 程序处理,只是程序状态没有最终存储到远程存储。当程序忽然遇到异常,进行容错恢复时,那么就会从最新的 Checkpoint 进行状态恢复重启,上一次 Checkpoint 成功到此次 Checkpoint 失败的数据还会进入 Flink 系统从新处理,具体实例以下图:

640.png

上图中表示一个 WordCount 实时任务的 Checkpoint,在进行 chk-5 Checkpoint 时,忽然遇到程序异常,那么实时任务会从 chk-4 进行恢复,那么以前 chk-5 处理的数据,Flink 系统会再次进行处理。不过这些数据的状态没有 Checkpoint 成功,因此 Flink 任务容错恢复再次运行时,对于状态的影响仍是只有一次。

Exactly_Once 和 At_Least_Once 具体在底层实现大体相同,具体差别表如今 CheckpointBarrier 对齐方式的处理:

640-8.jpeg

若是是 Exactly_Once 模式,某个算子的 Task 有多个输入通道时,当其中一个输入通道收到 CheckpointBarrier 时,Flink Task 会阻塞该通道,其不会处理该通道后续数据,可是会将这些数据缓存起来,一旦完成了全部输入通道的 CheckpointBarrier 对齐,才会继续对这些数据进行消费处理。

对于 At_least_Once,一样针对某个算子的 Task 有多个输入通道的状况下,当某个输入通道接收到 CheckpointBarrier 时,它不一样于 Exactly Once,即便没有完成全部输入通道 CheckpointBarrier 对齐,At Least Once 也会继续处理后续接收到的数据。因此使用 At Least Once 不能保证数据对于状态计算只有一次的计算影响。

3、 Flink Checkpoint 常见失败缘由和注意点

3.1 Flink Checkpoint 常见失败缘由分析

Flink Checkpoint 失败有不少种缘由,常见的失败缘由以下:

  1. 用户代码逻辑没有对于异常处理,让其直接在运行中抛出。好比解析 Json 异常,没有捕获,致使 Checkpoint失败,或者调用 Dubbo 超时异常等等。
  2. 依赖外部存储系统,在进行数据交互时,出错,异常没有处理。好比输出数据到 Kafka、Redis、HBase等,客户端抛出了超时异常,没有进行捕获,Flink 任务容错机制会再次重启。
  3. 内存不足,频繁GC,超出了 GC 负载的限制。好比 OOM 异常
  4. 网络问题、机器不可用问题等等。

从目前的具体实践状况来看,Flink Checkpoint 异常觉大多数仍是用户代码逻辑的问题,对于程序异常没有正确的处理致使。因此在编写 Flink 实时任务时,必定要注意处理程序可能出现的各类异常。这样,也会让实时任务的逻辑更加的健壮。

当本身的 Flink 实时任务 Checkpoint 失败时,用户能够先经过 Flink Web UI 进行快速定位 Checkpoint 失败的缘由,若是在 Flink Web UI 上面没有看到异常信息,能够去看任务的具体日志进行定位,以下是 Flink Web UI 查看错误缘由示意图:

640-9.jpeg

3.2 Flink Checkpoint 参数配置及注意点

下面是设置 Flink Checkpoint 参数配置的建议及注意点:

  1. 当 Checkpoint 时间比设置的 Checkpoint 间隔时间要长时,能够设置 Checkpoint 间最小时间间隔。这样在上次 Checkpoint 完成时,不会立马进行下一次 Checkpoint,而是会等待一个最小时间间隔,以后再进行 Checkpoint。不然,每次 Checkpoint 完成时,就会立马开始下一次 Checkpoint,系统会有不少资源消耗 Checkpoint 方面,而真正任务计算的资源就会变少。
  2. 若是Flink状态很大,在进行恢复时,须要从远程存储上读取状态进行恢复,若是状态文件过大,此时可能致使任务恢复很慢,大量的时间浪费在网络传输方面。此时能够设置 Flink Task 本地状态恢复,任务状态本地恢复默认没有开启,能够设置参数 state.backend.local-recovery 值为 true 进行激活。
  3. Checkpoint 保存数,Checkpoint 保存数默认是1,也就是只保存最新的 Checkpoint 的状态文件,当进行状态恢复时,若是最新的 Checkpoint 文件不可用时(好比 HDFS 文件全部副本都损坏或者其余缘由),那么状态恢复就会失败,若是设置 Checkpoint 保存数 2,即便最新的Checkpoint恢复失败,那么Flink 会回滚到以前那一次 Checkpoint 的状态文件进行恢复。考虑到这种状况,用户能够增长 Checkpoint 保存数。
  4. 建议设置的 Checkpoint 的间隔时间最好大于 Checkpoint 的完成时间。

下图是不设置 Checkpoint 最小时间间隔示例图,能够看到,系统一致在进行 Checkpoint,大量的资源使用在 Flink Chekpoint 上,可能对运行的任务产生必定影响:

640-2.png

还有一种特殊的状况,Flink 端到端 Sink 的 EXACTLYONCE 的问题,也就是数据从 Flink 端到外部消息系统的消息一致性。打个比方,Flink 输出数据到 Kafka 消息系统中,若是使用 Kafka 0.10 的版本,Flink 不支持端到端的 EXACTLYONCE,可能存在消息重复输入到 Kafka。

640-3.png

如上图所示,当作 chk-5 Checkpoint 的时候,chk-5 失败,而后从 chk-4 来进行恢复,可是 chk-5 的部分数据在 Chekpoint 失败以前就有部分进入到 Kafka 消息系统,再次恢复时,该部分数据可能再次重放到 Kafka 消息系统中。
Flink 中解决端到端的一致性有两种方法:作幂等以及事务写,幂等的话,可使用 KV 存储系统来作幂等,由于 KV 存储系统的屡次操做结果都是相同的。Flink 内部目前支持二阶段事务提交,Kafka 0.11 以上版本支持事务写,因此支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。

4、 有赞的优化实践

有赞实时计算对于 Flink 任务的 Checkpoint 和 Savepoint 作了两个方面工做,第一个工做是对于 Flink Checkpoint 失败的状况,若是 Checkpoint 失败过于频繁,同时 Flink Checkpoint 失败次数若是达到平台默认的失败阈值,平台会及时给用户报警提示。咱们会每 5 分钟检查一次实时任务,统计实时任务近 15 分钟内,Flink Checkpoint 失败次数的最大值和最小值的差值达到平台默认的阈值,则会立马给用户报警,让用户可以及时的处理问题。

固然,并非全部的 Flink 实时任务 Checkpoint 失败平台都能发现,由于 Checkpoint 失败次数的检查,首先与用户配置的 Checkpoint 的时间间隔有关。举个例子,若是用户配置的 Checkpoint 间隔为 1 小时,其实平台默认 Checkpoint 逻辑检查根本就没法发现实时任务 Checkpoint 失败。

针对这种状况,实时平台也支持用户自定义设置 Checkpoint 失败阈值,目前支持两种 Checkpoint 失败逻辑检查,一个是 实时任务的 Checkpoint 失败次数的总和达到阈值,另外一个则是近 10 分钟内,Flink Checkpoint 次数的最大值和最小值的差值的计算逻辑,用户能够根据实时任务的敏感度,设置具体的参数。

第二个方面则是针对 Flink 任务的状态恢复,为了防止实时任务的状态丢失,实时计算平台会按期的对实时任务进行 Savepoint 触发,当任务因为外界因素致使任务失败时,这种失败是任务直接挂掉,Yarn 任务的状态直接为 Killed,这种状况下,若是用户开启自动拉起功能,实时平台自动拉起实时任务,同时从最新的 Savepoint 进行状态恢复,以致于状态不丢失。同时,实时计算平台也支持用户中止任务时,触发 Savepoint,再次重启实时任务时,仍是从中止时的任务状态进行恢复。

5、 总结

目前,有赞在实时计算方面,还有很长的路要走。在知足业务的同时,可能也会有不少的坑须要踩。后面有赞实时计算会重点在实时数仓方面进行投入,同时会基于 Flink SQL 进行功能扩展和开发。

为了用户开发实时任务的便利性,后面有赞会开始进行在线实时计算平台的设计开发。将来也会将实时任务迁移到 K8S 上面,这样在大促场景下,可以更方便的进行资源的扩容和缩容。

将来,有赞实时计算平台会为用户带来更好的开发体验,下降用户开发实时任务的难度,让咱们一块儿拭目以待。

相关文章
相关标签/搜索