Flink Checkpoint 问题排查实用指南

做者:邱从贤(山智)ios

在 Flink 中,状态可靠性保证由 Checkpoint 支持,看成业出现 failover 的状况下,Flink 会从最近成功的 Checkpoint 恢复。在实际状况中,咱们可能会遇到 Checkpoint 失败,或者 Checkpoint 慢的状况,本文会统一聊一聊 Flink 中 Checkpoint 异常的状况(包括失败和慢),以及可能的缘由和排查思路。git

1. Checkpoint 流程简介

首先咱们须要了解 Flink 中 Checkpoint 的整个流程是怎样的,在了解整个流程以后,咱们才能在出问题的时候,更好的进行定位分析。github

flink_checkpoint.jpg

从上图咱们能够知道,Flink 的 Checkpoint 包括以下几个部分:apache

  • JM trigger checkpoint
  • Source 收到 trigger checkpoint 的 PRC,本身开始作 snapshot,并往下游发送 barrier
  • 下游接收 barrier(须要 barrier 都到齐才会开始作 checkpoint)
  • Task 开始同步阶段 snapshot
  • Task 开始异步阶段 snapshot
  • Task snapshot 完成,汇报给 JM

上面的任何一个步骤不成功,整个 checkpoint 都会失败。性能优化

2 Checkpoint 异常状况排查

2.1 Checkpoint 失败

能够在 Checkpoint 界面看到以下图所示,下图中 Checkpoint 10423 失败了。网络

checkpoint_failure.jpg

点击 Checkpoint 10423 的详情,咱们能够看到类系下图所示的表格(下图中将 operator 名字截取掉了)。多线程

checkpoint_ack_buffer.jpg

上图中咱们看到三行,表示三个 operator,其中每一列的含义分别以下:app

  • 其中 Acknowledged 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中咱们能够知道第三个 operator 总共有 5 个 subtask,可是只有 4 个进行了 ack;
  • 第二列 Latest Acknowledgement 表示该 operator 的全部 subtask 最后 ack 的时间;
  • End to End Duration 表示整个 operator 的全部 subtask 中完成 snapshot 的最长时间;
  • State Size 表示当前 Checkpoint 的 state 大小 -- 主要这里若是是增量 checkpoint 的话,则表示增量大小;
  • Buffered During Alignment 表示在 barrier 对齐阶段积攒了多少数据,若是这个数据过大也间接表示对齐比较慢);

Checkpoint 失败大体分为两种状况:Checkpoint Decline 和 Checkpoint Expire。机器学习

2.1.1 Checkpoint Decline

咱们能从 jobmanager.log 中看到相似下面的日志
Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. 其中
10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id,85d268e6fbc19411185f7e4868a44178 是 job id,咱们能够在 jobmanager.log 中查找 execution id,找到被调度到哪一个 taskmanager 上,相似以下所示:异步

2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.
2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

从上面的日志咱们知道该 execution 被调度到 hostnameABCDEcontainer_e24_1566836790522_8088_04_013155_1 slot 上,接下来咱们就能够到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体缘由了。

另外对于 Checkpoint Decline 的状况,有一种状况咱们在这里单独抽取出来进行介绍:Checkpoint Cancel。

当前 Flink 中若是较小的 Checkpoint 尚未对齐的状况下,收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消掉。咱们能够看到相似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

这个日志表示,当前 Checkpoint 19 还在对齐阶段,咱们收到了 Checkpoint 20 的 barrier。而后会逐级通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。

在下游 task 收到被 cancelBarrier 的时候,会打印相似以下的日志:

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

或者

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

或者

WARN
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

上面三种日志都表示当前 task 接收到上游发送过来的 barrierCancel 消息,从而取消了对应的 Checkpoint。

2.1.2 Checkpoint Expire

若是 Checkpoint 作的很是慢,超过了 timeout 尚未完成,则整个 Checkpoint 也会失败。当一个 Checkpoint 因为超时而失败是,会在 jobmanager.log 中看到以下的日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178  expired before completing.

表示 Chekpoint 1 因为超时而失败,这个时候能够能够看这个日志后面是否有相似下面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

能够按照 2.1.1 中的方法找到对应的 taskmanager.log 查看具体信息。

下面的日志若是是 DEBUG 的话,咱们会在开始处标记 DEBUG

咱们按照下面的日志把 TM 端的 snapshot 分为三个阶段,开始作 snapshot 前,同步阶段,异步阶段:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

这个日志表示 TM 端 barrier 对齐后,准备开始作 Checkpoint。

DEBUG
2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy       - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx
_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.

上面的日志表示当前这个 backend 的同步阶段完成,共使用了 0 ms。

DEBUG
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx,  metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms

上面的日志表示异步阶段完成,异步阶段使用了 369 ms

在现有的日志状况下,咱们经过上面三个日志,定位 snapshot 是开始晚,同步阶段作的慢,仍是异步阶段作的慢。而后再按照状况继续进一步排查问题。

2.2 Checkpoint 慢

在 2.1 节中,咱们介绍了 Checkpoint 失败的排查思路,本节会分状况介绍 Checkpoint 慢的状况。

Checkpoint 慢的状况以下:好比 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 常常须要作 9 分钟(咱们但愿 1 分钟左右就可以作完),并且咱们预期 state size 不是很是大。

对于 Checkpoint 慢的状况,咱们能够按照下面的顺序逐一检查。

2.2.0 Source Trigger Checkpoint 慢

这个通常发生较少,可是也有可能,由于 source 作 snapshot 并往下游发送 barrier 的时候,须要抢锁(这个如今社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考[1])。若是一直抢不到锁的话,则可能致使 Checkpoint 一直得不到机会进行。若是在 Source 所在的 taskmanager.log 中找不到开始作 Checkpoint 的 log,则能够考虑是否属于这种状况,能够经过 jstack 进行进一步确认锁的持有状况。

2.2.1 使用增量 Checkpoint

如今 Flink 中 Checkpoint 有两种模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 会把当前的 state 所有备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,所以增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优点。

如今 Flink 中仅在 RocksDBStateBackend 中支持增量 Checkpoint,若是你已经使用 RocksDBStateBackend,能够经过开启增量 Checkpoint 来加速,具体的能够参考 [2]。

2.2.2 做业存在反压或者数据倾斜

咱们知道 task 仅在接受到全部的 barrier 以后才会进行 snapshot,若是做业存在反压,或者有数据倾斜,则会致使所有的 channel 或者某些 channel 的 barrier 发送慢,从而总体影响 Checkpoint 的时间,这两个能够经过以下的页面进行检查:

backpressure_high.jpg

上图中咱们选择了一个 task,查看全部 subtask 的反压状况,发现都是 high,表示反压状况严重,这种状况下会致使下游接收 barrier 比较晚。

hotKey.png

上图中咱们选择其中一个 operator,点击全部的 subtask,而后按照 Records Received/Bytes Received/TPS 从大到小进行排序,能看到前面几个 subtask 会比其余的 subtask 要处理的数据多。

若是存在反压或者数据倾斜的状况,咱们须要首先解决反压或者数据倾斜问题以后,再查看 Checkpoint 的时间是否符合预期。

2.2.2 Barrier 对齐慢

从前面咱们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐全部上游发送过来的 barrier),而后开始同步阶段,再作异步阶段。若是 barrier 一直对不齐的话,就不会开始作 snapshot。

barrier 对齐以后会有以下日志打印:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

若是 taskmanager.log 中没有这个日志,则表示 barrier 一直没有对齐,接下来咱们须要了解哪些上游的 barrier 没有发送下来,若是你使用 At Least Once 的话,能够观察下面的日志:

DEBUG
Received barrier for checkpoint 96508 from channel 5

表示该 task 收到了 channel 5 来的 barrier,而后看对应 Checkpoint,再查看还剩哪些上游的 barrier 没有接受到,对于 ExactlyOnce 暂时没有相似的日志,能够考虑本身添加,或者 jmap 查看。

2.2.3 主线程太忙,致使没机会作 snapshot

在 task 端,全部的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,若是主线程在处理太慢(好比使用 RocksDBBackend,state 操做慢致使总体处理慢),致使 barrier 处理的慢,也会影响总体 Checkpoint 的进度,在这一步咱们须要可以查看某个 PID 对应 hotmethod,这里推荐两个方法:

  1. 屡次连续 jstack,查看一直处于 RUNNABLE 状态的线程有哪些;
  2. 使用工具 AsyncProfile dump 一份火焰图,查看占用 CPU 最多的栈;

若是有其余更方便的方法固然更好,也欢迎推荐。

2.2.4 同步阶段作的慢

同步阶段通常不会太慢,可是若是咱们经过日志发现同步阶段比较慢的话,对于非 RocksDBBackend 咱们能够考虑查看是否开启了异步 snapshot,若是开启了异步 snapshot 仍是慢,须要看整个 JVM 在干吗,也可使用前一节中的工具。对于 RocksDBBackend 来讲,咱们能够用 iostate 查看磁盘的压力如何,另外能够查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少。

RocksDB 开始 snapshot 的日志以下:

2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

snapshot 结束的日志以下:

2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

2.2.6 异步阶段作的慢

对于异步阶段来讲,tm 端主要将 state 备份到持久化存储上,对于非 RocksDBBackend 来讲,主要瓶颈来自于网络,这个阶段能够考虑观察网络的 metric,或者对应机器上可以观察到网络流量的状况(好比 iftop)。

对于 RocksDB 来讲,则须要从本地读取文件,写入到远程的持久化存储上,因此不只须要考虑网络的瓶颈,还须要考虑本地磁盘的性能。另外对于 RocksDBBackend 来讲,若是以为网络流量不是瓶颈,可是上传比较慢的话,还能够尝试考虑开启多线程上传功能[3]。

3 总结

在第二部份内容中,咱们介绍了官方编译的包的状况下排查一些 Checkpoint 异常状况的主要场景,以及相应的排查方法,若是排查了上面全部的状况,仍是没有发现瓶颈所在,则能够考虑添加更详细的日志,逐步将范围缩小,而后最终定位缘由。

上文提到的一些 DEBUG 日志,若是 flink dist 包是本身编译的话,则建议将 Checkpoint 整个步骤内的一些 DEBUG 改成 INFO,可以经过日志了解整个 Checkpoint 的总体阶段,何时完成了什么阶段,也在 Checkpoint 异常的时候,快速知道每一个阶段都消耗了多少时间。

参考内容

[1] Change threading-model in StreamTask to a mailbox-based approach
[2] 增量 checkpoint 原理介绍
[3] RocksDBStateBackend 多线程上传 State


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:

https://developer.aliyun.com/...

首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:

https://tianchi.aliyun.com/ma...

相关文章
相关标签/搜索