阿里巴巴大规模应用Flink的踩坑经验:如何大幅下降 HDFS 压力?

众所周知 Flink 是当前普遍使用的计算引擎,Flink 使用 checkpoint 机制进行容错处理[1],Flink 的 checkpoint 会将状态快照备份到分布式存储系统,供后续恢复使用。在 Alibaba 内部咱们使用的存储主要是 HDFS,当同一个集群的 Job 到达必定数量后,会对 HDFS 形成很是大的压力,本文将介绍一种大幅度下降 HDFS 压力的方法 -- 小文件合并。node

背景

无论使用 FsStateBackend、RocksDBStateBackend 仍是 NiagaraStateBackend,Flink 在进行 checkpoint 的时候,TM 会将状态快照写到分布式文件系统中,而后将文件句柄发给 JM,JM 完成全局 checkpoint 快照的存储,以下图所示。安全

1.png

对于全量 checkpoint 来讲,TM 将每一个 checkpoint 内部的数据都写到同一个文件,而对于 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]来讲,则会将每一个 sst 文件写到一个分布式系统的文件内。看成业量很大,且做业的并发很大时,则会对底层 HDFS 造成很是大的压力:1)大量的 RPC 请求会影响 RPC 的响应时间(以下图所示);2)大量文件对 NameNode 内存形成很大压力。多线程

qps_1.jpg

qps_2.jpg

在 Flink 中曾经尝试使用 ByteStreamStateHandle 来解决小文件多的问题[3],将小于必定阈值的 state 直接发送到 JM,由 JM 统一写到分布式文件中,从而避免在 TM 端生成小文件。可是这个方案有必定的局限性,阈值设置过小,还会有不少小文件生成,阈值设置太大,则会致使 JM 内存消耗太多有 OOM 的风险。并发

1 小文件合并方案

针对上面的问题咱们提出一种解决方案 -- 小文件合并。
在原来的实现中,每一个 sst 文件会打开一个
CheckpointOutputStream,每一个 CheckpointOutputStream 对应一个 FSDataOutputStream,将本地文件写往一个分布式文件,而后关闭 FSDataOutputStream,生成一个 StateHandle。以下图所示:异步

2.png

小文件合并则会重用打开的 FSDataOutputStream,直至文件大小达到预设的阈值为止,换句话说多个 sst 文件会重用同一个 DFS 上的文件,每一个 sst 文件占用 DFS 文件中的一部分,最终多个 StateHandle 共用一个物理文件,以下图所示。分布式

3.png

在接下来的章节中咱们会描述实现的细节,其中须要重点考虑的地方包括:工具

  1. 并发 checkpoint 的支持
    Flink 天生支持并发 checkpoint,小文件合并方案则会将多个文件写往同一个分布式存储文件中,若是考虑不当,数据会写串或者损坏,所以咱们须要有一种机制保证该方案的正确性,详细描述参考 2.1 节
  2. 防止误删文件
    咱们使用引用计数来记录文件的使用状况,仅经过文件引用计数是否降为 0 进行判断删除,则可能误删文件,如何保证文件不会被错误删除,咱们将会在 2.2 节进行阐述
  3. 下降空间放大
    使用小文件合并以后,只要文件中还有一个 statehandle 被使用,整个分布式文件就不能被删除,所以会占用更多的空间,咱们在 2.3 节描述了解决该问题的详细方案
  4. 异常处理
    咱们将在 2.4 节阐述如何处理异常状况,包括 JM 异常和 TM 异常的状况
  5. 2.5 节中会详细描述在 Checkpoint 被取消或者失败后,如何取消 TM 端的 Snapshot,若是不取消 TM 端的 Snapshot,则会致使 TM 端实际运行的 Snapshot 比正常的多

在第 3 节中阐述了小文件合并方案与现有方案的兼容性;第 4 节则会描述小文件合并方案的优点和不足;最后在第 5 节咱们展现在生产环境下取得的效果。优化

2 设计实现

本节中咱们会详细描述整个小文件合并的细节,以及其中的设计要点。
这里咱们大体回忆一下 TM 端 Snapshot 的过程阿里云

  1. TM 端 barrier 对齐
  2. TM Snapshot 同步操做
  3. TM Snapshot 异步操做

其中上传 sst 文件到分布式存储系统在上面的第三步,同一个 checkpoint 内的文件顺序上传,多个 checkpoint 的文件上传可能同时进行。spa

2.1 并发 checkpoint 支持

Flink 天生支持并发 checkpoint,所以小文件合并方案也须要可以支持并发 checkpoint,若是不一样 checkpoint 的 sst 文件同时写往一个分布式文件,则会致使文件内容损坏,后续没法从该文件进行 restore。

在 FLINK-11937[4] 的提案中,咱们会将每一个 checkpoint 的 state 文件写到同一个 HDFS 文件,不一样 checkpoint 的 state 写到不一样的 HDFS 文件 -- 换句话说,HDFS 文件不跨 Checkpoint 共用,从而避免了多个客户端同时写入同一个文件的状况。

后续咱们会继续推动跨 Checkpoint 共用文件的方案,固然在跨 Checkpoint 共用文件的方案中,并行的 Checkpoint 也会写往不一样的 HDFS 文件。

2.2 防止误删文件

复用底层文件以后,咱们使用引用计数追踪文件的使用状况,在文件引用数降为 0 的状况下删除文件。可是在某些状况下,文件引用数为 0 的时候,并不表明文件不会被继续使用,可能致使文件误删。下面咱们会详>细描述开启并发 checkpoint 后可能致使文件误删的状况,以及解决方案。

咱们如下图为例,maxConcurrentlyCheckpoint = 2

concurrent.png

上图中共有 3 个 checkpoint,其中 chk-1 已经完成,chk-2 和 chk-3 都基于 chk-1 进行,chk-2 在 chk-3 前完成,chk-3 在注册4.sst的时候发现,发现4.sst在 chk-2 中已经注册过,会重用 chk-2 中4.sst对应的 stateHandle,而后取消 chk-3 中的4.sst的注册,而且删除 stateHandle,在处理完 chk-3 中4.sst以后,该 stateHandle 对应的分布式文件的引用计数为 0,若是咱们这个时候删除分布式文件,则会同时删除5.sst对应的内容,致使后续没法从 chk-3 恢复。

这里的问题是如何在stateHandle对应的分布式文件引用计数降为 0 的时候正确判断是否还会继续引用该文件,所以在整个 checkpoint 完成处理以后再判断某个分布式文件可否删除,若是真个 checkpoint 完成发现文件没有被引用,则能够安全删除,不然不进行删除。

2.3 下降空间放大

使用小文件合并方案后,每一个 sst 文件对应分布式文件中的一个 segment,以下图所示

segment.png

文件仅能在全部 segment 都再也不使用时进行删除,上图中有 4 个 segment,仅 segment-4 被使用,可是整个文件都不能删除,其中 segment[1-3] 的空间被浪费掉了,从实际生产环境中的数据可知,总体的空间放大率(实际占用的空间 / 真实有用的空间)在 1.3 - 1.6 之间。

为了解决空间放大的问题,在 TM 端起异步线程对放大率超过阈值的文件进行压缩。并且仅对已经关闭的文件进行压缩。

整个压缩的流程以下所示:

  1. 计算每一个文件的放大率
  2. 若是放大率较小则直接跳到步骤 7
  3. 若是文件 A 的放大率超过阈值,则生成一个对应的新文件 A‘(若是这个过程当中建立文件失败,则由 TM 负责清理工做)
  4. 记录 A 与 A’ 的映射关系
  5. 在下一次 checkpoint X 往 JM 发送落在文件 A 中的 StateHandle 时,则使用 A` 中的信息生成一个新的 StateHandle 发送给 JM
  6. checkpoint X 完成后,咱们增长 A‘ 的引用计数,减小 A 的引用计数,在引用计数降为 0 后将文件 A 删除(若是 JM 增长了 A’ 的引用,而后出现异常,则会从上次成功的 checkpoint 从新构建整个引用计数器)
  7. 文件压缩完成

2.4 异常状况处理

在 checkpoint 的过程当中,主要有两种异常:JM 异常和 TM 异常,咱们将分状况阐述。

2.4.1 JM 异常

JM 端主要记录 StateHandle 以及文件的引用计数,引用计数相关数据不须要持久化到外存中,所以不须要特殊的处理,也不须要考虑 transaction 等相关操做,若是 JM 发送 failover,则能够直接从最近一次 complete checkpoint 恢复,并重建引用计数便可。

2.4.2 TM 异常

TM 异常能够分为两种:1)该文件在以前 checkpoint 中已经汇报过给 JM;2)文件还没有汇报过给 JM,咱们会分状况阐述。

  1. 文件已经汇报过给 JM
    文件汇报过给 JM,所以在 JM 端有文件的引用计数,文件的删除由 JM 控制,当文件的引用计数变为 0 以后,JM 将删除该文件。
  2. 文件还没有汇报给 JM
    该文件暂时还没有汇报过给 JM,该文件再也不被使用,也不会被 JM 感知,成为孤儿文件。这种状况暂时有外围工具统一进行清理。

2.5 取消 TM 端 snapshot

像前面章节所说,咱们须要在 checkpoint 超时/失败时,取消 TM 端的 snapshot,而 Flink 则没有相应的通知机制,如今 FLINK-8871[5] 在追踪相应的优化,咱们在内部增长了相关实现,当 checkpoint 失败时会发送 RPC 数据给 TM,TM 端接受到相应的 RPC 消息后,会取消相应的 snapshot。

3 兼容性

小文件合并功能支持从以前的版本无缝迁移过来。从以前的 checkpoint restore 的的步骤以下:

  1. 每一个 TM 分到本身须要 restore 的 state handle
  2. TM 从远程下载 state handle 对应的数据
  3. 从本地进行恢复

小文件合并主要影响的是第 2 步,从远程下载对应数据的时候对不一样的 StateHandle 进行适配,所以不影响总体的兼容性。

4 优点和不足

  • 优点:大幅度下降 HDFS 的
  • 压力:包括 RPC 压力以及 NameNode 内存的压力
  • 不足:不支持 State 多线程上传的功能(State 上传暂时不是 checkpoint 的瓶颈)

5 线上环境的结果

在该方案上线后,对 Namenode 的压力大幅下降,下面的截图来自线上生产集群,从数据来看,文件建立和关闭的 RPC 有明显降低,RPC 的响应时间也有大幅度下降,确保顺利度过双十一。

rpc_all.png
rpc_close.png
rpc_create.png
rpc_holdlock.png

上云就看云栖号,点此查看更多

本文为阿里云原创内容,未经容许不得转载。

相关文章
相关标签/搜索