「Flink」Flink的状态管理与容错

在Flink中的每一个函数和运算符都是有状态的。在处理过程当中能够用状态来存储数据,这样能够利用状态来构建复杂操做。为了让状态容错,Flink须要设置checkpoint状态。Flink程序是经过checkpoint来保证容错,经过checkpoint机制,Flink可恢复做业的状态和计算位置。html

checkpoint检查点

前提条件

Flink的checkpoin机制须要与流和状态的持久化存储交互,通常它要求:redis

  • 一个持久化的数据源
    • 当Flink程序出现问题时,能够经过checkpoint持久化存储中恢复,而后从出错的地方开始从新消费数据
    • 该数据源能够在必定时间内重跑数据,例如:Kafka、RabbitMQ或者文件系统HDFS、S三、…
  • 状态的持久存储
    • 状态须要永久的保存下来,一般是分布式文件系统(例如:HDFS、S三、GFS、…)

启用和配置检查点

默认状况,Flink是禁用检查点。要启用检查点,调用数据库

// 启用检查点
// 单位:毫秒
env.enableCheckpointing(1000);

在启用检查点时,还能够配置检查点的其余参数。apache

  • exactly-one or at-least-once(仅一次或者至少一次)
    • 大多数程序都是设置为exactly-once,只有在某些超低延迟的应用(例如:始终要求是毫秒级的应用)
    • 经过查看源码,咱们看到,Flink默认是 exactly-once
      • public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
  • checkpoint timeout(检查点超时时间)
    • 检查点超过规定的时间就会自动终止
  • minimum time between checkpoints
    • 检查点之间的最小时间
    • 下一个检查点将在上一个检查点完成后5秒钟启动
    • 检查点最小间隔时间不会受检查点间隔更容易配置
  • number of concureent checkpoint
    • 检查点的并发数目。默认状况一个检查点在运行时不会触发另外一个检查点,这样能够确保Flink不会花太多时间在checkpoint上,并确保流能够有效进行。
    • 能够设置多个重叠的checkpoint,这对允许有必定延迟,并但愿较频繁的检查(100ms)来从新处理故障是有用的
  • externalized checkpoint
    • 外部检查点
    • 能够将检查点设置为外部持久化,这样检查点的元数据将写入持久存储,而且但做业运行失败是不会自动清理
    • 这样能够作双重保险
  • fail/continue task on checkpoint errors
    • 检查点执行发生错误,是否执行任务。
    • 默认状况,若是checkpoint失败,任务也将失败
  • perfer checkpoint for recovery
    • 即时最近有更多的savepoint可用于恢复,flink依然会选择使用最近一次的checkpoint来进行错误恢复

参考配置:后端

        // --------
        // 配置checkpoint
        // 启用检查点
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

选择状态后端存储

Flink的checkpoint机制能够存储计时器和有状态operation的全部快照,包括:链接器、窗口或者用户自定义状态。具体checkpoint存储在哪儿(例如:是JobManager内存、文件系统或者数据库),依赖于状态后端的配置。数据结构

默认状况,状态保存在TaskManager的内存中,检查点存储在TM的内存中。为了适当地保存大状态,Flink支持其余的存储。咱们能够经过:并发

StreamExecutionEnvironment.setStateBackend(…)机器学习

来指定存储方式分布式

Flink状态管理

状态的应用场景:函数

  • 当应用程序想要按照某种模式搜索某些事件时,状态能够保存迄今全部的事件序列
  • 当每分钟/小时/天须要对流数据进行聚合,状态能够保存挂起的聚合
  • 当在数据流上训练机器学习模型时,状态能够用来保存某一类参数的版本
  • 当须要管理历史数据时,状态容许访问过去历史数据

Flink状态能够保存在堆内、或者是堆外。Flink也能够管理应用程序的状态,必要时也能够溢出到磁盘,若是应用要保持很是大的状态,能够不修改程序逻辑状况下配置状态后端存储。

Flink状态分类

Flink中有两种基本的状态:

  • Keyed State
  • Operator State

Keyed State

Keyed State一般和key相关,仅仅在KeyedStream的方法和算子中使用。能够把 Keyed State看做是分区,并且每个key仅出如今一个分区内。逻辑上每一个 keyed-state和惟一元组<算子并发实例, key>绑定,因为每一个key仅属于算子的一个并发,所以能够简化为<算子, key>

Operator State

对于 Operator State来讲,每一个Operator State和一个并发实例绑定。Kafka connector是Flink中使用operator state的一个很好的示例。每一个Kafka消费者的并发在Operator State中维护一个 topic partition到offset的映射关系。

Operator state在Flink做业的并发改变后,会从新分发状态,分发的策略和keyed stated不同。

Raw State与Managed State

Keyed Stated和Operator State分别有两种形式:managed 和 raw

Managed State是由Flink运行时管理的数据结构来表示的,例如:内部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink运行时会对这些状态进行编码并写入Checkpoint。

Raw State则保存在本身的数据结构中。checkpoint的时候,Flink并不知道状态里面具体的内容,仅仅写入一串字节序列到checkpoint中。

全部的DataStream的function均可以使用managed state,但raw state只能在实现算子时使用。因为Flink能够在修改并发时更好的分发状态数据,而且可以更好的管理内存,由于讲义使用 managed state.

使用Managed Keyed State

Managed keyed state接口提供不一样类型的状态访问接口,这些状态都做用在当前输入数据的key下。这些状态仅可在KeyedStream上使用,能够经过 stream.keyBy(…)获得KeyedStream。

全部支持的状态类型以下:

  • ValueState<T>
    • 保存一个能够更新和获取的值,算子接收到的每一个key均可能对应一个值
    • 能够经过update(T)进行更新,经过value()获取
  • ListState<T>
    • 保存一个元素的列表,能够往这个列表中追加数据,并在当前列表上检索
    • 能够经过 add(T)或者addAll(List<T>)进行追加元素
    • 经过get()获取整个列表
    • 经过 update(List<T>)覆盖当前列表
  • ReducingState<T>
    • 保存一个单值,表示添加到状态的全部值的聚合。接口与ListState相似
  • AggregatingState<IN, OUT>
    • 保存一个单值,表示添加到状态的全部值的聚合
    • 与ReducingState相反的是,聚合类型可能与添加到状态的元素类型不一样。接口与ListState相似
  • FoldingState<T, ACC>(后续将过时)
    • 保存一个单值,白搜狐添加到状态的全部值的集合
    • 与ReducingState相反的是,聚合类型可能与添加到状态的元素类型不一样。接口与ListState相似
  • MapState<UK, UV>
    • 维护一个映射列表,能够添加键值到状态中,能够获取当前映射的迭代器
    • 使用put、putAll添加映射,使用 get检索特定key

注意:

  • 这些状态对象仅用于状态交互。状态自己不必定存储在内存中,还有可能保存在磁盘或者其余位置
  • 从状态中获取的值取决于输入元素说表明的key,所以,在不一样key上调用同一个接口,可能获得不一样的值

使用Managed Operator State

能够经过实现 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>接口来使用Managed Operator State。

CheckpointedFunction接口:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

在Flink进行checkpoint时,会调用snapshotstate(),用户自定义函数初始化时会调用 initializeState。初始化包括第一次自定义函数初始化和从以前的 checkpoint 回复。所以,initializeState 中应该也包括状态恢复的逻辑。

Managed Operator State以list的形式存在,这些状态是一个可序列化对象的集合List,彼此独立,方便在改变并发后进行状态的从新分派。换句话说,这些对象是从新分配 non-keyed state的最细粒度。根据状态的不一样访问方式,有如下两种分配模式:

  • Even-split redistribution
    • 每一个算子都存储一个列表形式的状态集合,整个状态由全部的列表拼接而成
    • 但做业恢复或者从新分配时,整个状态按照算子的并行度均匀分配
  • Union redistribution
    • 每一个算子保存一个列表形式的状态集合,整个状态由全部的列表拼接而成
    • 但做业恢复或者从新分配时,每一个算子都将得到全部的状态数据

ListCheckpointed接口:

ListCheckpointed接口是CheckpointedFunction接口的精简版,仅支持 even-split redistribution的list state

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

snapshotState()须要返回一个将写入到checkpoint的对象列表, restoreState则须要处理恢复回来的对象列表。


参考文献:

Flink官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html

相关文章
相关标签/搜索