Flink自己为了保证其高可用的特性,以及保证做用的Exactly Once的快速恢复,进而提供了一套强大的Checkpoint机制。Checkpoint机制是Flink可靠性的基石,能够保证Flink集群在某个算子由于某些缘由(如异常退出)出现故障时,可以将整个应用流图的状态恢复到故障以前的某一状态,保 证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法 (分布式快照算法)。node
每一个须要checkpoint的应用在启动时,Flink的JobManager为其建立一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制做。面试
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
复制代码
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
复制代码
Flink的检查点是一个全局的、异步的程序快照,它周期性的生成并送到持久化存储(通常使用分布式系统)。当发生故障时,Flink使用最新的检查点进行重启。一些Flink的用户在程序“状态”中保存了GB甚至TB的数据。这些用户反馈在大量 的状态下,建立检查点一般很慢而且耗资源,这也是为何Flink在 1.3版本开始引入“增量式的检查点”。算法
在引入“增量式的检查点”以前,每个Flink的检查点都保存了程序完整的状态。后来咱们意识到在大部分状况下这是没必要要的,由于上一次和此次的检查点以前 ,状态发生了很大的变化,因此咱们建立了“增量式的检查点”。增量式的检查点仅保存过去和如今状态的差别部分。架构
增量式的检查点能够为拥有大量状态的程序带来很大的提高。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上下降 到了30秒左右。由于增量式的检查点不须要每次把完整的状态发送到存储中。app
如今只能经过RocksDB state back-end来获取增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。这样Flink增量式检查点的数据不会无限制的增大,它会自动合并老的检查点数据并清理掉。异步
要启用这个机制,能够以下设置:RocksDBStateBackend backend =new RocksDBStateBackend(filebackend, true);async
Flink 增量式的检查点以“RocksDB”为基础,RocksDB是一个基于 LSM树的KV存储,新的数据保存在内存中,称为memtable。若是Key相同,后到的数据将覆盖以前的数据,一旦memtable写满了,RocksDB将数据压缩并写入到磁盘。memtable的数据持久化到磁盘后,他们就变成了不可变的sstable。分布式
RocksDB会在后台执行compaction,合并sstable并删除其中重复的数据。以后RocksDB删除原来的sstable,替换成新合成的ssttable,这个sstable包含了以前的sstable中的信息。测试
在这个基础之上,Flink跟踪前一个checkpoint建立和删除的RocksDB sstable文件,由于sstable是不可变的,Flink能够所以计算出 状态有哪些改变。为了达到这个目标,Flink在RocksDB上触发了一个刷新操做,强制将memtable刷新到磁盘上。这个操做在Flink中是同步的,其余的操做是异步的,不会阻塞数据处理。大数据
Flink 的checkpoint会将新的sstable发送到持久化存储(例如HDFS,S3)中,同时保留引用。Flink不会发送全部的sstable, 一些数据在以前的checkpoint存在而且写入到持久化存储中了,这样只须要增长引用次数就能够了。由于compaction的做用,一些sstable会合并成一个sstable并删除这些sstable,这也是为何Flink能够减小checkpoint的历史文件。
为了分析checkpoint的数据变动,而上传整理过的sstable是多余的(这里的意思是以前已经上传过的,不须要再次上传)。Flink处理这种状况,仅带来一点点开销。这个过程很重要,由于在任务须要重启的时候,Flink只须要保留较少的历史文件。
假设有一个子任务,拥有一个keyed state的operator,checkpoint最多保留2个。上面的图片描述了每一个checkpoint对应的RocksDB 的状态,它引用到的文件,以及在checkpoint完成后共享状态中的count值。
checkpoint ‘CP2’,本地的RocksDB目录有两个sstable文件,这些文件是新生成的,因而Flink将它们传到了checkpoint 对应的存储目录。当checkpoint完成后,Flink在共享状态中建立两个实体,并将count设为1。在这个共享状态中,这个key 由operator、subtask,原始的sstable名字组成,value为sstable实际存储目录。
checkpoint‘CP2’,RocksDB有2个老的sstable文件,又建立了2个新的sstable文件。Flink将这两个新的sstable传到 持久化存储中,而后引用他们。当checkpoint完成后,Flink将全部的引用的相应计数加1。
checkpoint‘CP3’,RocksDB的compaction将sstable-(1), sstable-(2), sstable-(3) 合并成 sstable-(1,2,3),而后删除 原始的sstable。这个合并后的文件包含了和以前源文件同样的信息,而且清理掉了重复的部分。sstable-(4)还保留着,而后有一个 新生成的sstable-(5)。Flink将新的 sstable-(1,2,3)以及 sstable-(5)传到持久化存储中, sstable-(4)仍被‘CP2’引用,因此 将计数增长1。如今有了3个checkpoint,'CP1','CP2','CP3',超过了预设的保留数目2,因此CP1被删除。做为删除的一部分, CP1对应的文件(sstable-(1)、sstable-(2)) 的引用计数减1。
checkpoint‘CP4’,RocksDB将sstable-(4), sstable-(5), 新的 sstable-(6) 合并成 sstable-(4,5,6)。Flink将新合并 的 sstable-(4,5,6)发送到持久化存储中,sstable-(1,2,3)、sstable-(4,5,6) 的引用计数增长1。因为再次到达了checkpoint的 保留数目,‘CP2’将被删除,‘CP2’对应的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用计数减1。因为‘CP2’对应 的文件的引用计数达到0,这些文件将被删除。
若是使用增量式的checkpoint,那么在错误恢复的时候,不须要考虑不少的配置项。一旦发生了错误,Flink的JobManager会告诉 task须要从最新的checkpoint中恢复,它能够是全量的或者是增量的。以后TaskManager从分布式系统中下载checkpoint文件, 而后从中恢复状态。
增量式的checkpoint能为拥有大量状态的程序带来较大的提高,但还有一些trade-off须要考虑。总的来讲,增量式减小了checkpoint操做的时间,可是相对的,从checkpoint中恢复可能更耗时,具体状况须要根据应用程序包含的状态大小而定。相对的,若是程序只是部分失败,Flink TaskManager须要从多个checkpoint中读取数据,这时候使用全量的checkpoint来恢复数据可能更加耗时。同时,因为新的checkpoint可能引用到老的checkpoint,这样老的checkpoint就不能被删除,这样下去,历史的版本数据会愈来愈大。须要考虑使用分布式来存储checkpoint,另外还须要考虑读取带来的带宽消耗。
声明:本号全部文章除特殊注明,都为原创,公众号读者拥有优先阅读权,未经做者本人容许不得转载,不然追究侵权责任。
关注个人公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,肯定真的不来了解一下吗?
备注:全部内容首发公众号,这里不保证明时性和完整性,你们扫描文末二维码关注哦~