本文由Flink 博客 翻译而来,为了叙述的可读性和流畅性,笔者作了少许的修改。html
Apache Flink是为了“有状态”的处理流式数据创建的。那么,在流式计算程序中,状态的含义是什么? 我在前面的博客中作了“状态”以及 “有状态的流式处理”的定义。这里回顾一下,状态指的是,在程序中,Operator将过去处理过的event信息保存在内存中, 这样能够在以后的处理中使用。git
“状态”是一个基础的功能,使得在流式计算中复杂的用户使用场景成为可能。在Flink 文档中列举 了一些例子。github
可是,只有“状态”拥有容错能力,这样才能在生产环境使用。“容错性”意味着,即便有软件或者机器的故障,最终的计算结果也是精确的,没有数据丢失也没有重复处理。apache
Flink的容错特性很是强大,它不只对软件和机器的负载很小,而且也提供了“端到端仅一次”的消息传递保证。缓存
Flink程序容错机制的核心是检查点。Flink的检查点是一个全局的、异步的程序快照,它周期性的生成并送到持久化存储(通常使用分布式系统)。 当发生故障时,Flink使用最新的检查点进行重启。一些Flink的用户在程序“状态”中保存了GB甚至TB的数据。这些用户反馈在大量 的状态下,建立检查点一般很慢而且耗资源,这也是为何Flink在 1.3版本开始引入“增量式的检查点”。并发
在引入“增量式的检查点”以前,每个Flink的检查点都保存了程序完整的状态。后来咱们意识到在大部分状况下这是没必要要的,由于上一次和此次的检查点以前 ,状态发生了很大的变化,因此咱们建立了“增量式的检查点”。增量式的检查点仅保存过去和如今状态的差别部分。异步
增量式的检查点能够为拥有大量状态的程序带来很大的提高。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上下降 到了30秒左右。由于增量式的检查点不须要每次把完整的状态发送到存储中。分布式
如今只能经过RocksDB state back-end来获取增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。这样, Flink 增量式检查点的数据不会无限制的增大,它会自动合并老的检查点数据并清理掉。性能
想要在程序中使用增量式的检查点,我建议详细的阅读Flink 检查点的官方文档 总的来讲,要启用这个机制,能够以下设置:测试
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(filebackend, true));//第二个参数为true
默认的,Flink保留一个完整的检查点,若是你须要保留更多,能够经过以下的配置设置:
state.checkpoints.num-retained
Flink 增量式的检查点以“RocksDB”为基础,RocksDB是一个基于 LSM树的KV存储,新的数据保存在内存中,称为memtable。若是Key相同,后到的数据将覆盖以前的数据,一旦memtable写满了,RocksDB将数据压缩并写入到磁盘。memtable的数据持久化到磁盘后,他们就变成了不可变的sstable。
RocksDB会在后台执行compaction,合并sstable并删除其中重复的数据。以后RocksDB删除原来的sstable,替换成新合成的ssttable,这个sstable包含了以前的sstable中的信息。
在这个基础之上,Flink跟踪前一个checkpoint建立和删除的RocksDB sstable文件,由于sstable是不可变的,Flink能够所以计算出 状态有哪些改变。为了达到这个目标,Flink在RocksDB上触发了一个刷新操做,强制将memtable刷新到磁盘上。这个操做在Flink中是同步的,其余的操做是异步的,不会阻塞数据处理。
Flink 的checkpoint会将新的sstable发送到持久化存储(例如HDFS,S3)中,同时保留引用。Flink不会发送全部的sstable, 一些数据在以前的checkpoint存在而且写入到持久化存储中了,这样只须要增长引用次数就能够了。由于compaction的做用,一些sstable会合并成一个sstable并删除这些sstable,这也是为何Flink能够减小checkpoint的历史文件。
为了分析checkpoint的数据变动,而上传整理过的sstable是多余的(下文会有描述,这里的意思是以前已经上传过的,不须要再次上传)。Flink处理这种状况,仅带来一点点开销。这个过程很重要,由于在任务须要重启的时候,Flink只须要保留较少的历史文件。
假设有一个子任务,拥有一个keyed state的operator,checkpoint最多保留2个。上面的图片描述了每一个checkpoint对应的RocksDB 的状态,它引用到的文件,以及在checkpoint完成后共享状态中的count值。
checkpoint ‘CP2’,本地的RocksDB目录有两个sstable文件,这些文件是新生成的,因而Flink将它们传到了checkpoint 对应的存储目录。当checkpoint完成后,Flink在共享状态中建立两个实体,并将count设为1。在这个共享状态中,这个key 由operator、subtask,原始的sstable名字组成,value为sstable实际存储目录。
checkpoint‘CP2’,RocksDB有2个老的sstable文件,又建立了2个新的sstable文件。Flink将这两个新的sstable传到 持久化存储中,而后引用他们。当checkpoint完成后,Flink将全部的引用的相应计数加1。
checkpoint‘CP3’,RocksDB的compaction将sstable-(1), sstable-(2), sstable-(3) 合并成 sstable-(1,2,3),而后删除 原始的sstable。这个合并后的文件包含了和以前源文件同样的信息,而且清理掉了重复的部分。sstable-(4)还保留着,而后有一个 新生成的sstable-(5)。Flink将新的 sstable-(1,2,3)以及 sstable-(5)传到持久化存储中, sstable-(4)仍被‘CP2’引用,因此 将计数增长1。如今有了3个checkpoint,'CP1','CP2','CP3',超过了预设的保留数目2,因此CP1被删除。做为删除的一部分, CP1对应的文件(sstable-(1)、sstable-(2)) 的引用计数减1。
checkpoint‘CP4’,RocksDB将sstable-(4), sstable-(5), 新的 sstable-(6) 合并成 sstable-(4,5,6)。Flink将新合并 的 sstable-(4,5,6)发送到持久化存储中,sstable-(1,2,3)、sstable-(4,5,6) 的引用计数增长1。因为再次到达了checkpoint的 保留数目,‘CP2’将被删除,‘CP2’对应的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用计数减1。因为‘CP2’对应 的文件的引用计数达到0,这些文件将被删除。
因为Flink能够并行的执行多个checkpoint,有时候前面的checkpoint尚未完成,后面的新的checkpoint就启动了。所以,在 使用增量式的checkpoint的时候,你须要考虑使用哪个checkpoint启动。Flink在使用checkpoint以前须要checkpoint协调器的确认, 因此不会使用那些被删除的checkpoint。
若是使用增量式的checkpoint,那么在错误恢复的时候,不须要考虑不少的配置项。一旦发生了错误,Flink的JobManager会告诉 task须要从最新的checkpoint中恢复,它能够是全量的或者是增量的。以后TaskManager从分布式系统中下载checkpoint文件, 而后从中恢复状态。
增量式的checkpoint能为拥有大量状态的程序带来较大的提高,但还有一些trade-off须要考虑。总的来讲,增量式减小了checkpoint操做的时间,可是相对的,从checkpoint中恢复可能更耗时,具体状况须要根据应用程序包含的状态大小而定。相对的,若是程序只是部分失败,Flink TaskManager须要从多个checkpoint中读取数据,这时候使用全量的checkpoint来恢复数据可能更加耗时。同时,因为新的checkpoint可能引用到老的checkpoint,这样老的checkpoint就不能被删除,这样下去,历史的版本数据会愈来愈大。须要考虑使用分布式来存储checkpoint,另外还须要考虑读取带来的带宽消耗。
还有一些便利性和性能的trade-off,能够经过阅读Flink 文档 了解更多。