众所周知 Flink 是当前普遍使用的计算引擎,Flink 使用 checkpoint 机制进行容错处理[1],Flink 的 checkpoint 会将状态快照备份到分布式存储系统,供后续恢复使用。在 Alibaba 内部咱们使用的存储主要是 HDFS,当同一个集群的 Job 到达必定数量后,会对 HDFS 形成很是大的压力,本文将介绍一种大幅度下降 HDFS 压力的方法 -- 小文件合并。node
无论使用 FsStateBackend、RocksDBStateBackend 仍是 NiagaraStateBackend,Flink 在进行 checkpoint 的时候,TM 会将状态快照写到分布式文件系统中,而后将文件句柄发给 JM,JM 完成全局 checkpoint 快照的存储,以下图所示。安全
对于全量 checkpoint 来讲,TM 将每一个 checkpoint 内部的数据都写到同一个文件,而对于 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]来讲,则会将每一个 sst 文件写到一个分布式系统的文件内。看成业量很大,且做业的并发很大时,则会对底层 HDFS 造成很是大的压力:1)大量的 RPC 请求会影响 RPC 的响应时间(以下图所示);2)大量文件对 NameNode 内存形成很大压力。多线程
在 Flink 中曾经尝试使用 ByteStreamStateHandle 来解决小文件多的问题[3],将小于必定阈值的 state 直接发送到 JM,由 JM 统一写到分布式文件中,从而避免在 TM 端生成小文件。可是这个方案有必定的局限性,阈值设置过小,还会有不少小文件生成,阈值设置太大,则会致使 JM 内存消耗太多有 OOM 的风险。并发
针对上面的问题咱们提出一种解决方案 -- 小文件合并。
在原来的实现中,每一个 sst 文件会打开一个
CheckpointOutputStream,每一个 CheckpointOutputStream 对应一个 FSDataOutputStream,将本地文件写往一个分布式文件,而后关闭 FSDataOutputStream,生成一个 StateHandle。以下图所示:异步
小文件合并则会重用打开的 FSDataOutputStream,直至文件大小达到预设的阈值为止,换句话说多个 sst 文件会重用同一个 DFS 上的文件,每一个 sst 文件占用 DFS 文件中的一部分,最终多个 StateHandle 共用一个物理文件,以下图所示。分布式
在接下来的章节中咱们会描述实现的细节,其中须要重点考虑的地方包括:工具
在第 3 节中阐述了小文件合并方案与现有方案的兼容性;第 4 节则会描述小文件合并方案的优点和不足;最后在第 5 节咱们展现在生产环境下取得的效果。优化
本节中咱们会详细描述整个小文件合并的细节,以及其中的设计要点。
这里咱们大体回忆一下 TM 端 Snapshot 的过程阿里云
其中上传 sst 文件到分布式存储系统在上面的第三步,同一个 checkpoint 内的文件顺序上传,多个 checkpoint 的文件上传可能同时进行。spa
Flink 天生支持并发 checkpoint,所以小文件合并方案也须要可以支持并发 checkpoint,若是不一样 checkpoint 的 sst 文件同时写往一个分布式文件,则会致使文件内容损坏,后续没法从该文件进行 restore。
在 FLINK-11937[4] 的提案中,咱们会将每一个 checkpoint 的 state 文件写到同一个 HDFS 文件,不一样 checkpoint 的 state 写到不一样的 HDFS 文件 -- 换句话说,HDFS 文件不跨 Checkpoint 共用,从而避免了多个客户端同时写入同一个文件的状况。
后续咱们会继续推动跨 Checkpoint 共用文件的方案,固然在跨 Checkpoint 共用文件的方案中,并行的 Checkpoint 也会写往不一样的 HDFS 文件。
复用底层文件以后,咱们使用引用计数追踪文件的使用状况,在文件引用数降为 0 的状况下删除文件。可是在某些状况下,文件引用数为 0 的时候,并不表明文件不会被继续使用,可能致使文件误删。下面咱们会详>细描述开启并发 checkpoint 后可能致使文件误删的状况,以及解决方案。
咱们如下图为例,maxConcurrentlyCheckpoint = 2
上图中共有 3 个 checkpoint,其中 chk-1 已经完成,chk-2 和 chk-3 都基于 chk-1 进行,chk-2 在 chk-3 前完成,chk-3 在注册4.sst
的时候发现,发现4.sst
在 chk-2 中已经注册过,会重用 chk-2 中4.sst
对应的 stateHandle,而后取消 chk-3 中的4.sst
的注册,而且删除 stateHandle,在处理完 chk-3 中4.sst
以后,该 stateHandle 对应的分布式文件的引用计数为 0,若是咱们这个时候删除分布式文件,则会同时删除5.sst
对应的内容,致使后续没法从 chk-3 恢复。
这里的问题是如何在stateHandle
对应的分布式文件引用计数降为 0 的时候正确判断是否还会继续引用该文件,所以在整个 checkpoint 完成处理以后再判断某个分布式文件可否删除,若是真个 checkpoint 完成发现文件没有被引用,则能够安全删除,不然不进行删除。
使用小文件合并方案后,每一个 sst 文件对应分布式文件中的一个 segment,以下图所示
文件仅能在全部 segment 都再也不使用时进行删除,上图中有 4 个 segment,仅 segment-4 被使用,可是整个文件都不能删除,其中 segment[1-3] 的空间被浪费掉了,从实际生产环境中的数据可知,总体的空间放大率(实际占用的空间 / 真实有用的空间)在 1.3 - 1.6 之间。
为了解决空间放大的问题,在 TM 端起异步线程对放大率超过阈值的文件进行压缩。并且仅对已经关闭的文件进行压缩。
整个压缩的流程以下所示:
在 checkpoint 的过程当中,主要有两种异常:JM 异常和 TM 异常,咱们将分状况阐述。
JM 端主要记录 StateHandle 以及文件的引用计数,引用计数相关数据不须要持久化到外存中,所以不须要特殊的处理,也不须要考虑 transaction 等相关操做,若是 JM 发送 failover,则能够直接从最近一次 complete checkpoint 恢复,并重建引用计数便可。
TM 异常能够分为两种:1)该文件在以前 checkpoint 中已经汇报过给 JM;2)文件还没有汇报过给 JM,咱们会分状况阐述。
像前面章节所说,咱们须要在 checkpoint 超时/失败时,取消 TM 端的 snapshot,而 Flink 则没有相应的通知机制,如今 FLINK-8871[5] 在追踪相应的优化,咱们在内部增长了相关实现,当 checkpoint 失败时会发送 RPC 数据给 TM,TM 端接受到相应的 RPC 消息后,会取消相应的 snapshot。
小文件合并功能支持从以前的版本无缝迁移过来。从以前的 checkpoint restore 的的步骤以下:
小文件合并主要影响的是第 2 步,从远程下载对应数据的时候对不一样的 StateHandle 进行适配,所以不影响总体的兼容性。
在该方案上线后,对 Namenode 的压力大幅下降,下面的截图来自线上生产集群,从数据来看,文件建立和关闭的 RPC 有明显降低,RPC 的响应时间也有大幅度下降,确保顺利度过双十一。
上云就看云栖号,点此查看更多!
本文为阿里云原创内容,未经容许不得转载。