[Flink原理介绍第四篇】:Flink的Checkpoint和Savepoint介绍

原文:http://www.javashuo.com/article/p-ktaazqlj-ks.htmlhtml

http://www.javashuo.com/article/p-swtbcvco-ge.htmlnode

https://www.jianshu.com/p/8e74c7cdd463apache

https://blog.csdn.net/u013014724/article/details/84800255缓存

第一部分:Flink的Checkpoint数据结构

1. Flink Checkpoint原理介绍app

Checkpoint是Flink实现容错机制最核心的功能,它可以根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据按期持久化存储下来,当Flink程序一旦意外崩溃时,从新运行程序时能够有选择地从这些Snapshot进行恢复,从而修正由于故障带来的程序数据状态中断。这里,咱们简单理解一下Flink Checkpoint机制,如官网下图所示:分布式

Checkpoint指定触发生成时间间隔后,每当须要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一块儿流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。由于一个Operator可能存在多个输入的Stream,而每一个Stream中都会存在对应的Barrier,该Operator要等到全部的输入Stream中的Barrier都到达。当全部Stream中的Barrier都已经到达该Operator,这时全部的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待全部Barrier到达的过程当中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,做为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去做为这次Checkpoint的结果数据。函数

 

2. Checkpoint的简单设置oop

开启Checkpoint功能,有两种方式。其一是在conf/flink_conf.yaml中作系统设置;其二是针对任务再代码里灵活配置。可是我我的推荐第二种方式,针对当前任务设置,设置代码以下所示:ui

 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2 // 设置保存点的保存路径,这里是保存在hdfs中
 3 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
 4 CheckpointConfig config = env.getCheckpointConfig();
 5 // 任务流取消和故障应保留检查点
 6 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 7 // 保存点模式:exactly_once
 8 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 9 // 触发保存点的时间间隔
10 config.setCheckpointInterval(60000);

 

上面调用enableExternalizedCheckpoints设置为ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际须要恢复到指定的Checkpoint处理。上面代码配置了执行Checkpointing的时间间隔为1分钟。

3. 保存多个Checkpoint
默认状况下,若是设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,能够从最近的这个Checkpoint来进行恢复。可是,若是咱们但愿保留多个Checkpoint,并可以根据实际须要选择其中一个进行恢复,这样会更加灵活,好比,咱们发现最近4个小时数据记录处理有问题,但愿将整个状态还原到4小时以前。
Flink能够支持保留多个Checkpoint,须要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,指定最多须要保存Checkpoint的个数:

state.checkpoints.num-retained: 20
这样设置之后,运行Flink Job,并查看对应的Checkpoint在HDFS上存储的文件目录,示例以下所示:

 1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/
 2 Found 22 items
 3 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858
 4 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859
 5 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860
 6 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861
 7 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862
 8 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863
 9 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864
10 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865
11 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866
12 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867
13 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868
14 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869
15 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870
16 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871
17 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872
18 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873
19 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874
20 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875
21 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876
22 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877
23 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared
24 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可见,咱们配置了state.checkpoints.num-retained的值为20,只保留了最近的20个Checkpoint。若是但愿会退到某个Checkpoint点,只须要指定对应的某个Checkpoint路径便可实现。

4.从Checkpoint进行恢复
若是Flink程序异常失败,或者最近一段时间内数据处理错误,咱们能够将程序从某一个Checkpoint点,好比chk-860进行回放,执行以下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据,以下所示:

1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
2 Found 6 items
3 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
4 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
5 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
6 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
7 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
8 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

 

从上面咱们能够看到,前面Flink Job的ID为582e17d2cc343e6c56255d111bae0191,全部的Checkpoint文件都在以Job ID为名称的目录里面,当Job停掉后,从新从某个Checkpoint点(chk-860)进行恢复时,从新生成Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的Checkpoint编号会从该次运行基于的编号继续连续生成:chk-86一、chk-86二、chk-863等等。

第二部分: Flink的Savepoint
1.Flink的Savepoint原理介绍
Savepoint会在Flink Job以外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpoint机制来建立一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。

Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来建立或者修改获得的状态数据;另外一种是系统状态(System State),他们是指做为Operator计算一部分的数据Buffer等状态数据,好比在使用Window Function时,在Window内部缓存Streaming数据记录。为了可以在建立Savepoint过程当中,惟一识别对应的Operator的状态数据,Flink提供了API来为程序中每一个Operator设置ID,这样能够在后续更新/升级程序的时候,能够在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。固然,若是咱们不指定Operator ID,Flink也会咱们自动生成对应的Operator状态ID。
并且,强烈建议手动为每一个Operator设置ID,即便将来Flink应用程序可能会改动很大,好比替换原来的Operator实现、增长新的Operator、删除Operator等等,至少咱们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,因此若是将来Flink程序改动比较大,尤为是对应的须要操做的内存数据结构都变化了,可能根本就没法从原来旧的Savepoint正确地恢复。

下面,咱们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码以下所示:

 1 DataStream<String> stream = env.
 2   // 有状态的source ID (例如:Kafka)
 3   .addSource(new StatefulSource())
 4   .uid("source-id") // source操做的ID
 5   .shuffle()
 6   // 有状态的mapper ID
 7   .map(new StatefulMapper())
 8   .uid("mapper-id") // mapper的ID 
 9   // 无状态sink打印
10   .print(); // 自动生成ID

2.建立Savepoint
建立一个Savepoint,须要指定对应Savepoint目录,有两种方式来指定:
一种是,须要配置Savepoint的默认路径,须要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,设置Savepoint存储目录,例如以下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另外一种是,在手动执行savepoint命令的时候,指定Savepoint存储目录,命令格式以下所示:

bin/flink savepoint :jobId [:targetDirectory]
例如,正在运行的Flink Job对应的ID为40dcc6d2ba90f13930abce295de8d038,使用默认state.savepoints.dir配置指定的Savepoint目录,执行以下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
能够看到,在目录hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。
为正在运行的Flink Job指定一个目录存储Savepoint数据,执行以下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
能够看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

3.从Savepoint恢复
如今,咱们能够停掉Job 40dcc6d2ba90f13930abce295de8d038,而后经过Savepoint命令来恢复Job运行,命令格式以下所示:

bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint为例,恢复Job运行,执行以下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
能够看到,启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd。

4. Savepoint目录结构
下面,咱们看一下Savepoint目录下面存储内容的结构,以下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路径中,11bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串,而后savepoint-11bbc5-bd967f90709b做为存储这次Savepoint数据的根目录,最后savepoint-11bbc5-bd967f90709b目录下面_metadata文件包含了Savepoint的元数据信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目录下面其它文件的路径,这些文件内容都是序列化的状态信息。

参考

http://shiyanjun.cn/archives/1855.html
https://www.colabug.com/2259405.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html

相关文章
相关标签/搜索