Flink学习--Checkpoint 的应用实践

文章来自:https://ververica.cn/develope...
做者:唐云(茶干)java

Checkpoint 与 state 的关系

Checkpoint 是从 source 触发到下游全部节点完成的一次全局操做。下图能够有一个对 Checkpoint 的直观感觉,红框里面能够看到一共触发了 569K 次 Checkpoint,而后所有都成功完成,没有 fail 的。node

state 其实就是 Checkpoint 所作的主要持久化备份的主要数据,看下图的具体数据统计,其 state 也就 9kb 大小 。算法

什么是 state

咱们接下来看什么是 state。先看一个很是经典的 word count 代码,这段代码会去监控本地的 9000 端口的数据并对网络端口输入进行词频统计,咱们本地行动 netcat,而后在终端输入 hello world,执行程序会输出什么?数据库

答案很明显,**(hello, 1)** 和 **(word,1)**缓存

那么问题来了,若是再次在终端输入 hello world,程序会输入什么?网络

答案其实也很明显,**(hello, 2)** 和 **(world, 2)**。为何 Flink 知道以前已经处理过一次 hello world,这就是 state 发挥做用了,这里是被称为 keyed state 存储了以前须要统计的数据,因此帮助 Flink 知道 hello 和 world 分别出现过一次。并发

回顾一下刚才这段 word count 代码。keyby 接口的调用会建立 keyed stream 对 key 进行划分,这是使用 keyed state 的前提。在此以后,sum 方法会调用内置的 StreamGroupedReduce 实现。app

什么是 keyed state

对于 keyed state,有两个特色:框架

  • 只能应用于 KeyedStream 的函数与操做中,例如 Keyed UDF, window state
  • keyed state 是已经分区/划分好的,每个 key 只能属于某一个 keyed state

对于如何理解已经分区的概念,咱们须要看一下 keyby 的语义,你们能够看到下图左边有三个并发,右边也是三个并发,左边的词进来以后,经过 keyby 会进行相应的分发。例如对于 hello word,hello 这个词经过 hash 运算永远只会到右下方并发的 task 上面去。异步

什么是 operator state

  • 又称为 non-keyed state,每个 operator state 都仅与一个 operator 的实例绑定。
  • 常见的 operator state 是 source state,例如记录当前 source 的 offset

再看一段使用 operator state 的 word count 代码:

这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state。根据 state 类型作一个分类以下图:

除了从这种分类的角度,还有一种分类的角度是从 Flink 是否直接接管:

  • Managed State:由 Flink 管理的 state,刚才举例的全部 state 均是 managed state
  • Raw State:Flink 仅提供 stream 能够进行存储数据,对 Flink 而言 raw state 只是一些 bytes

在实际生产中,都只推荐使用 managed state,本文将围绕该话题进行讨论。

如何在 Flink 中使用 state

下图就前文 word count 的 sum 所使用的**StreamGroupedReduce**类为例讲解了如何在代码中使用 keyed state:

下图则对 word count 示例中的**FromElementsFunction**类进行详解并分享如何在代码中使用 operator state:

Checkpoint 的执行机制

在介绍 Checkpoint 的执行机制前,咱们须要了解一下 state 的存储,由于 state 是 Checkpoint 进行持久化备份的主要角色。

Statebackend 的分类

下图阐释了目前 Flink 内置的三类 state backend,其中**MemoryStateBackend****FsStateBackend**在运行时都是存储在 java heap 中的,只有在执行 Checkpoint 时,**FsStateBackend**才会将数据以文件格式持久化到远程存储上。

**RocksDBStateBackend**则借用了 RocksDB(内存磁盘混合的 LSM DB)对 state 进行存储。

MemoryStateBackend

MemoryStateBackend 将工做状态数据保存在 taskmanager 的 java 内存中。key/value 状态和 window 算子使用哈希表存储数值和触发器。进行快照时(checkpointing),生成的快照数据将和 checkpoint ACK 消息一块儿发送给 jobmanager,jobmanager 将收到的全部快照保存在 java 内存中。
MemoryStateBackend 如今被默认配置成异步的,这样避免阻塞主线程的 pipline 处理。
MemoryStateBackend 的状态存取的速度都很是快,可是不适合在生产环境中使用。这是由于 MemoryStateBackend 有如下限制:

  • 每一个 state 的默认大小被限制为 5 MB(这个值能够经过 MemoryStateBackend 构造函数设置)
  • 每一个 task 的全部 state 数据 (一个 task 可能包含一个 pipline 中的多个 Operator) 大小不能超过 RPC 系统的帧大小(akka.framesize,默认 10MB)
  • jobmanager 收到的 state 数据总和不能超过 jobmanager 内存

MemoryStateBackend 适合的场景:

  • 本地开发和调试
  • 状态很小的做业

下图表示了 MemoryStateBackend 的数据存储位置:
@MemoryStateBackend state 存储位置 | center
值得说明的是,当触发 savepoint 时,jobmanager 会把快照数据持久化到外部存储。

FsStateBackend

FsStateBackend 须要配置一个 checkpoint 路径,例如“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,咱们通常配置为 hdfs 目录
FsStateBackend 将工做状态数据保存在 taskmanager 的 java 内存中。进行快照时,再将快照数据写入上面配置的路径,而后将写入的文件路径告知 jobmanager。jobmanager 中保存全部状态的元数据信息(在 HA 模式下,元数据会写入 checkpoint 目录)。
FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的 pipline 处理。能够经过 FsStateBackend 构造函数取消该模式:

new FsStateBackend(path, false);

FsStateBackend 适合的场景:

  • 大状态、长窗口、大键值(键或者值很大)状态的做业
  • 适合高可用方案

@FsStateBackend state 存储位置 | center

RocksDBStateBackend

RocksDBStateBackend 也须要配置一个 checkpoint 路径,例如:“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,通常配置为 hdfs 路径。
RocksDB 是一种可嵌入的持久型的 key-value 存储引擎,提供 ACID 支持。由 Facebook 基于 levelDB 开发,使用 LSM 存储引擎,是内存和磁盘混合存储。
RocksDBStateBackend 将工做状态保存在 taskmanager 的 RocksDB 数据库中;checkpoint 时,RocksDB 中的全部数据会被传输到配置的文件目录,少许元数据信息保存在 jobmanager 内存中( HA 模式下,会保存在 checkpoint 目录)。
RocksDBStateBackend 使用异步方式进行快照。
RocksDBStateBackend 的限制:

  • 因为 RocksDB 的 JNI bridge API 是基于 byte[] 的,RocksDBStateBackend 支持的每一个 key 或者每一个 value 的最大值不超过 2^31 bytes((2GB))。
  • 要注意的是,有 merge 操做的状态(例如 ListState),可能会在运行过程当中超过 2^31 bytes,致使程序失败。

RocksDBStateBackend 适用于如下场景:

  • 超大状态、超长窗口(天)、大键值状态的做业
  • 适合高可用模式

使用 RocksDBStateBackend 时,可以限制状态大小的是 taskmanager 磁盘空间(相对于 FsStateBackend 状态大小限制于 taskmanager 内存 )。这也致使 RocksDBStateBackend 的吞吐比其余两个要低一些。由于 RocksDB 的状态数据的读写都要通过反序列化/序列化。

RocksDBStateBackend 是目前三者中惟一支持增量 checkpoint 的。

@ RocksDBStateBackend 存储位置 | center

statebackend 如何保存 managed keyed/operator state

对于HeapKeyedStateBackend,有两种实现:

  • 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap
  • 仅支持同步 Checkpoint:存储格式 NestedStateMap

特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Checkpoint 序列化数据阶段默认有最大 5 MB数据的限制

对于RocksDBKeyedStateBackend,每一个 state 都存储在一个单独的 column family 内,其中 keyGroup,Key 和 Namespace 进行序列化存储在 DB 做为 key。

Checkpoint 执行机制详解

本小节将对 Checkpoint 的执行流程逐步拆解进行讲解,下图左侧是 Checkpoint Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个 sink 组成的 Flink 做业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。

a. 第一步,Checkpoint Coordinator 向全部 source 节点 trigger Checkpoint;。

b. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到全部 input 的 barrier 才会执行相应的 Checkpoint。

c. 第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。

d. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 以后,会执行本地快照,这里特意展现了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),而后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。

e. 一样的,sink 节点在完成本身的 Checkpoint 以后,会将 state handle 返回通知 Coordinator。

f. 最后,当 Checkpoint coordinator 收集齐全部 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

Checkpoint 的 EXACTLY_ONCE 语义

为了实现 EXACTLY ONCE 语义,Flink 经过一个 input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成以后再进行处理。而对于 AT LEAST ONCE 语义,无需缓存收集到的数据,会对后续直接处理,因此致使 restore 时,数据可能会被屡次处理。下图是官网文档里面就 Checkpoint align 的示意图:

须要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程能够作到 EXACTLY ONCE,端到端的 EXACTLY ONCE 须要 source 和 sink 支持。

Savepoint 与 Checkpoint 的区别

做业恢复时,两者都可以使用,主要区别以下:

Savepoint Externalized Checkpoint
用户经过命令触发,由用户管理其建立与删除 Checkpoint 完成时,在用户给定的外部持久化存储保存
标准化格式存储,容许做业升级或者配置变动 看成业 FAILED(或者CANCELED)时,外部存储的 Checkpoint 会保留下来
用户在恢复时须要提供用于恢复做业状态的 savepoint 路径 用户在恢复时须要提供用于恢复的做业状态的 Checkpoint 路径
相关文章
相关标签/搜索