Checkpoint 与 state 的关系
Checkpoint 是从 source 触发到下游全部节点完成的一次全局操做。下图能够有一个对 Checkpoint 的直观感觉,红框里面能够看到一共触发了 569K 次 Checkpoint,而后所有都成功完成,没有 fail 的。java
state 其实就是 Checkpoint 所作的主要持久化备份的主要数据,看下图的具体数据统计,其 state 也就 9kb 大小 。算法
什么是 state缓存
咱们接下来看什么是 state。先看一个很是经典的 word count 代码,这段代码会去监控本地的 9000 端口的数据并对网络端口输入进行词频统计,咱们本地行动 netcat,而后在终端输入 hello world,执行程序会输出什么?网络
答案很明显,(hello, 1) 和 (word,1)并发
那么问题来了,若是再次在终端输入 hello world,程序会输入什么?框架
答案其实也很明显,(hello, 2) 和 (world, 2)。为何 Flink 知道以前已经处理过一次 hello world,这就是 state 发挥做用了,这里是被称为 keyed state 存储了以前须要统计的数据,因此帮助 Flink 知道 hello 和 world 分别出现过一次。异步
回顾一下刚才这段 word count 代码。keyby 接口的调用会建立 keyed stream 对 key 进行划分,这是使用 keyed state 的前提。在此以后,sum 方法会调用内置的 StreamGroupedReduce 实现。分布式
什么是 keyed state函数
对于 keyed state,有两个特色:spa
对于如何理解已经分区的概念,咱们须要看一下 keyby 的语义,你们能够看到下图左边有三个并发,右边也是三个并发,左边的词进来以后,经过 keyby 会进行相应的分发。例如对于 hello word,hello 这个词经过 hash 运算永远只会到右下方并发的 task 上面去。
什么是 operator state
再看一段使用 operator state 的 word count 代码:
这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state。根据 state 类型作一个分类以下图:
除了从这种分类的角度,还有一种分类的角度是从 Flink 是否直接接管:
在实际生产中,都只推荐使用 managed state,本文将围绕该话题进行讨论。
如何在 Flink 中使用 state
下图就前文 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:
下图则对 word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:
Checkpoint 的执行机制
在介绍 Checkpoint 的执行机制前,咱们须要了解一下 state 的存储,由于 state 是 Checkpoint 进行持久化备份的主要角色。
Statebackend 的分类
下图阐释了目前 Flink 内置的三类 state backend,其中MemoryStateBackend和FsStateBackend在运行时都是存储在 java heap 中的,只有在执行 Checkpoint 时,FsStateBackend才会将数据以文件格式持久化到远程存储上。而RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的 LSM DB)对 state 进行存储。
对于HeapKeyedStateBackend,有两种实现:
特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Checkpoint 序列化数据阶段默认有最大 5 MB 数据的限制
对于RocksDBKeyedStateBackend,每一个 state 都存储在一个单独的 column family 内,其中 keyGroup,Key 和 Namespace 进行序列化存储在 DB 做为 key。
Checkpoint 执行机制详解
本小节将对 Checkpoint 的执行流程逐步拆解进行讲解,下图左侧是 Checkpoint Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个 sink 组成的 Flink 做业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。
Checkpoint 的 EXACTLY_ONCE 语义
为了实现 EXACTLY ONCE 语义,Flink 经过一个 input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成以后再进行处理。而对于 AT LEAST ONCE 语义,无需缓存收集到的数据,会对后续直接处理,因此致使 restore 时,数据可能会被屡次处理。下图是官网文档里面就 Checkpoint align 的示意图:
须要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程能够作到 EXACTLY ONCE,端到端的 EXACTLY ONCE 须要 source 和 sink 支持。
Savepoint 与 Checkpoint 的区别
做业恢复时,两者都可以使用,主要区别以下:
Savepoint | Externalized |
---|---|
用户经过命令触发,由用户管理其建立与删除 | Checkpoint 完成时,在用户给定的外部持久化存储保存 |
标准化格式存储,容许做业升级或者配置变动 | 看成业 FAILED(或者 CANCELED)时,外部存储的 Checkpoint 会保留下来 |
用户在恢复时须要提供用于恢复做业状态的 savepoint 路径 | 用户在恢复时须要提供用于恢复的做业状态的 Checkpoint 路径 |
本文为云栖社区原创内容,未经容许不得转载。