flink系列-九、flink的状态与容错

一、理解State(状态)

1.一、State

  • 对象的状态
    • Flink中的状态:通常指一个具体的task/operator某时刻在内存中的状态(例如某属性的值)
    • 注意:State和Checkpointing 不要搞混 
    • checkpoint则表示了一个Flink Job,在一个特定时刻的一份全状态快照,即包含一个job下全部task/operator 某时刻的状态 
  • 状态的做用
    • 增量计算 
    • 聚合操做 
    • 机器学习训练模式 
    • 等等 
    • 容错 
    • Job故障重启 
    • 升级

1.二、状态的分类

一、Operator Statejava

  • 绑定到特定operator并行实例,每一个operator的并行实例维护一个状态
  • 与key无关
  • 例如:一个并行度为3的source,若是只考虑一个算子须要一个逻辑状态的情形,那么他就有3operator个状态
  • 支持的数据类型
    • ListState

二、Keyed Statenode

  • 基于KeyedStream之上的状态,dataStream.keyBy(),只能在做用于KeyedStrem上的function/Operator里使用
  • KeyBy以后的Operator State,可理解为分区过的Operator State
  • 每一个并行keyed Operator的每一个实例的每一个key有一个Keyed State:即就是 一个惟一的状态,因为每一个key属于一个keyed operator的并行实例,所以咱们能够将其简单地理解为 <operator,key>

  

  • 支持的数据结构 
    • ValueState:保留一个能够更新和检索的值 
      • update(T) 
      • value() 
    • ListState<T>:保存一个元素列表 
      • add(T) 
      • addAll(List) 
      • get(T) 
      • clear() 
    • ReducingState<T>:保存一个值,该值表示添加到该状态全部值的聚合。 
      • add(T) 
    • AggregatingState<IN,OUT><in,out>:保存一个值,该值表示添加到该状态的全部值的聚合。(与ReducingState 相反,聚合类型添加到该状态的元素能够有不同类型) 
      • add(T) 
    • FoldingState<T,ACC><t,acc>:不推荐使用 
      • add(T) 
    • MapState<UK,UV><uk,uv>:保存一个映射列表 
      • put(UK,UV) 
      • putAll(Map<uk,uv>) 
      • get(UK) 

 三、状态的表现形式web

  • Keyed State和Operator State,能够以两种形式存在:原始状态和托管状态。
    • managed(托管状态):
      • 托管状态是指Flink框架管理的状态,如ValueState,ListState,MapState等。 
      • 经过框架提供的接口来更新和管理状态的值 
      • 不须要序列化 
    • raw(原始状态) 原始状态是由用户自行管理的具体的数据结构,Flink在作checkpoint的时候,使用byte[]来读写状态内 容,对其内部数据结构一无所知
      • 须要序列化
      • 一般在DataStream上的状态推荐使用托管的状态,当用户自定义operator时,会使用到原始状态。
      • 大多数都是托管状态,除非自定义实现。

四、Operator State与Keyed State的Redistribute(从新分配)数据库

1)、Operator State Redistribute Redistributeapache

  • 当Operator改变并发度的时候(Rescale),会触发状态的Redistribute,即Operator State里的 数据会从新分配到Operator的Task实例
  • 例如:某Operator的并行度由3改成2

  • 不同数据结构的动态扩展方式不同样:
    • ListState:并发度在改变的时候,会将并发上的每一个List都取出,而后把这些List合并到一个新的List,然 后根据元素的个数在均匀分配给新的Task
    • UnionListState:相比于ListState更加灵活,把划分的方式交给用户去作,当改变并发的时候,会将原来 的List拼接起来。而后不作划分,直接交给用户(每一个Task给全量的状态,用户本身划分)
    • BroadcastState:如大表和小表作Join时,小表能够直接广播给大表的分区,在每一个并发上的数据都是完 全一致的。作的更新也相同,当改变并发的时候,把这些数据COPY到新的Task便可。
    • 以上是Flink Operator States提供的3种扩展方式,用户能够根据本身的需求作选择。

2)、Keyed State的Redistributewindows

  • Keyed State Redistribute
    • Key被Redistribute哪一个task,他对应的Keyed State就被Redistribute到哪一个Task
    • Keyed State Redistribute是基于Key Group来作分配的:
      • 将key分为group
      • 每一个key分配到惟一的group 
      • 将group分配给task实例 
      • KeyGroup由最大并行度的大小所决定的 
      • Keyed State最终分配到哪一个Task:group ID和taskID是从0开始算的 
        • hash=hash(key) 
        • KG=hash % numOfKeyGroups 
        • Subtask=KG* taskNum / numOfKeyGroups

2、CheckPoint

 2.一、状态容错

  • 有了状态天然须要状态容错,不然状态就失去意义了
  • Flink状态容错的机制就是checkpoint

概念后端

  • 所谓checkpoint,就是在某一时刻,将全部task的状态作一个快照(snapshot),而后存储到State Backend (有全量 和 增量)
  • 一种连续性绘制数据流状态的机制(周期性的),该机制确保即便出现故障,程序的状态最终也将为数据流中的每一条记录提供exactly once(只处理一次)的语意保证(只能保证flink系统内,对于sink和source须要依赖的外部的组件一同保证)
  • 全局快照,持久化保存全部的task / operator的State

特色: api

  • 轻量级容错机制
  • 可异步
  • 全量 vs 增量
  • Barrier机制(保证exactly-once 语义)
  • 失败状况可回滚至最近一次成功的checkpoint(自动)
  • 周期性(无需人工干预)

基本原理:数据结构

  • 经过往source 注入barrier
  • barrier做为checkpoint的标志
  • barrier
    • 全局异步化是snapshot的核心机制 并发

    • Flink分布式快照的核心概念之一就是数据栅栏(barrier)。这些barrier被插入到数据流中,做为数据流的一部分和数据一块儿向下流动。Barrier不会干扰正常数据,数据严格有序。一个barrier把数据流分割成两部分:一部 分进入到当前快照,另外一部分进入下一个快照。每个barrier都带有快照ID,而且barrier以前的数据都进入了 此快照。Barrier不会干扰数据流处理,因此很是轻量。多个不同快照的多个barrier会在流中同时出现,即多个 快照可能同时建立。

使用Checkpointing的前提条件:

  • 在必定时间内可回溯的datasource(故障时能够回溯数据),常见的:
    • 通常是可持久化的消息队列:例如Kafka、RabbitMQ、Amazon Kinesis、Google PubSub
    • 也能够是文件系统:HDFS、S三、GFS、NFS、Ceph
  • 可持久化存储State的存储系统,一般使用分布式文件系统(Checkpointing就是把job的全部状态都周期性持 久化到存储里)
    • 通常是HDFS、S三、GFS、NFS、Ceph

2.二、状态容错示意图

checkpoint:

 

Restore:

  • 恢复全部状态
  • 设置source的位置(例如:Kafka的offset)

 2.三、使用CheckPoint

 一、开启checkPoint

  • checkPoint默认是禁用的
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//start a checkpoint every 1000 ms   1000-checkpoint时间间隔
env.enableCheckpointing(1000);
//advanced options: checkpoint保证形式
//set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//make sure 500 ms of progress happen between checkpoints 两次间隔最小时间,若是上次没有完成会等待完成在执行下一次
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoints have to complete within one minute,or are discarded ;超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
//allow only one checkpoint to be in progress at the same time; checkpoint 并行度
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//enable externalized checkpoints which are retained after job cancellation;任务结束,checkpoint是否保留
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

二、CheckpointConfig设置说明

  • checkpointMode 
    • //set mode to exactly-once (this is the default)
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 保留策略 
    • 默认状况下,检查点不被保留,仅用于从故障中恢复做业。能够启用外部持久化检查点,同时指定保留策略

    • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在做业取消时保留检查点。注意,在这种状况下,必须在取消后手动清理检查点状态。

    • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION看成业被cancel时,删除检查点。检查点状态仅在做业失败时可用。

  • checkpointing的超时时间:超过期间没有完成则会被终止
    • //checkpoints have to complete within one minute, or are discarded
      env.getCheckpointConfig().setCheckpointTimeout(60000);
  • checkpointing最小间隔:用于指定上一个checkpoint完成以后最小等多久能够出发另外一个checkpoint,当指 定这个参数时,maxConcurrentCheckpoints的值为1
    • //make sure 500 ms of progress happen between checkpoints
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • maxConcurrentCheckpoints:指定运行中的checkpoint最多能够有多少个(设定checkpointing最小间隔时本 参数即为1) 
    • //allow only one checkpoint to be in progress at the same time
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,若是设 置为false,则task会拒绝checkpoint而后继续运行
    • env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

三、选择State Backend 

  • State Backend就是用来保存快照的地方
  • 用来在Checkpointing机制中持久化全部状态的一致性快照,这些状态包括:
    • 非用户定义的状态:例如,timers、非用户自定义的stateful operators(connectors,windows)
    • 用户定义的状态:就是前面讲的用户自定义的stateful operato所使用的Keyed State and Operator State

目前Flink自带三个开箱即用State Backend: 

  • MemoryStateBackend(默认)
    • MemoryStateBackend在Java堆上维护状态。Key/value状态和窗口运算符使用哈希表存储值和计时器等
    • Checkpoint时,MemoryStateBackend对State作一次快照,并在向JobManager发送Checkpoint确认完 成的消息中带上此快照数据,而后快照就会存储在JobManager的堆内存中 
    • MemoryStateBackend可使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。若是 不但愿异步,能够在构造的时候传入false(也能够经过全局配置文件指定),以下
      • StateBackend backend = new MemoryStateBackend(10*1024*1024,false);
        env.setStateBackend(backend);
    • 限制

      • 单个State的大小默认限制为5MB,能够在MemoryStateBackend的构造函数中增长

      • 不论如何配置,State大小都没法大于akka.framesize(JobManager和TaskManager之间发送的最大消息 的大小默认是10MB)

      • JobManager必须有足够的内存大小

    •  适用场景 
      • 本地开发和调试 
      • 小状态job,如只使用Map、FlatMap、Filter...或Kafka Consumer
  • FsStateBackend

    • FsStateBackend须要配置一个文件系统的URL, 如 "hdfs://namenode:40010/flink/checkpoint"。

    • FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot 写入文件系 统目录下的文件中。文件的路径等元数据会传递给JobManager,存在其内存中。

    • FsStateBackend可使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。若是不但愿异 步能够在构造的时候传入false(也能够经过全局配置文件指定),以下:

      • StateBackend backend = new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",false);
        env.setStateBackend(backend);
    • 适用场景 
      • 大状态、长窗口、大键/值状态的job 
      • 全部高可用性的状况 
  • RocksDBStateBackend

    • RocksDBStateBackend须要配置一个文件系统的URL来, 如"hdfs://namenode:40010/flink/checkpoint"

    • RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,(默认状况下)存储在TaskManager数据 目录中,在Checkpoint时,整个RocksDB数据库将被Checkpointed到配置的文件系统和目录中。文件的路径 等元数据会传递给JobManager,存在其内存中。

    • RocksDBStateBackend老是执行异步快照

    • 限制

      • RocksDB JNI API是基于byte[],所以key和value最大支持大小为2^31个字节(2GB)。RocksDB自身在 支持较大value时候有问题

    • 适用场景
      • 超大状态,超长窗口、大键/值状态的job 
      • 全部高可用性的状况 
    • 与前两种状态后端对比: 
      • 目前只有RocksDBStateBackend支持增量checkpoint(默认全量) 
      • 状态保存在数据库中,即便用RockDB能够保存的状态量仅受可用磁盘空间量的限制,相比其余的状态后 端可保存更大的状态,但开销更大(读/写须要反序列化/序列化去检索/存储状态),吞吐受到限制 
  • 三种StateBackend总结以下:

  • 配置StateBackend 
    • 全局配置(配置文件conf/flink-conf.yaml),设置集群保存checkpoint类型和存储路径
    • # The backend that will be used to store operator state checkpoints
      state.backend: filesystem
      #Directory.for storing checkpoints
      state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints
    • 每一个job单独配置State Backend(可覆盖全局配置) ,设置计算任务的
    • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

四、配置恢复策略

Flink支持不同的重启策略,这些策略控制在出现故障时如何从新启动job

  • 若是没有启用checkpointing,则使用无重启(no restart)策略。
  • 若是启用了checkpointing,但没有配置重启策略,则使用固定延迟(fixed-delay)策略,其中尝试重启 次数是Integer > MAX_VALUE
  • 重启策略能够在flink-conf.yaml中配置,表示全局的配置。也能够在应用代码中动态指定,会覆盖全局配 置

 2.四、checkpoint demo

一、operatorState的checkPoint容错案例:

import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**
  * OperatorState的checkPoint容错恢复
  * 想知道两次事件 xxd 之间,一共发生多少次其余事件,分别是什么事件
  * 事件流:xxd a a a a a f d d xxd ad d s s d xxd…
  * 当事件流中出现字母e时触发容错
  * 输出:
  * (8,a a a a a f d d)
  * (6,ad d s s d)
  */
object OperatorStateRecovery {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    //生成配置对象
    val config = new Configuration()
    //开启spark-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件,不然打印日志到控制台
    config.setString("web.log.path", "/tmp/logs/flink_log")
    //配置taskManager的日志文件,不然打印日志到控制台
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log")
    //配置tm有多少个slot
    config.setString("taskmanager.numberOfTaskSlots", "4")
    // 获取local运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //设置全局并行度为1,好让全部数据都跑到一个task中,以方便测试
    env.setParallelism(1)
    //隔多长时间执行一次ck  毫秒
    env.enableCheckpointing(1000L)
    val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
    //保存EXACTLY_ONCE
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //每次ck之间的间隔,不会重叠
    checkpointConfig.setMinPauseBetweenCheckpoints(2000L)
    //每次ck的超时时间
    checkpointConfig.setCheckpointTimeout(10L)
    //若是ck执行失败,程序是否中止
    checkpointConfig.setFailOnCheckpointingErrors(true)
    //job在执行CANCE的时候是否删除ck数据
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
    //指定保存ck的存储模式
    val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) //异步同步
    // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false)
    // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true)
    env.setStateBackend(stateBackend)
    //恢复策略,恢复三次,间隔0秒
    env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(
        3, // number of restart attempts
        Time.of(0, TimeUnit.SECONDS) // delay
      )
    )
    val input: DataStream[String] = env.socketTextStream("localhost", 6666)
    input
      .flatMap(new OperatorStateRecoveryRichFunction)
      .print()
    env.execute()
  }
}

//因为使用了本地状态因此须要checkpoint的snapshotState方法把本地状态放到托管状态中
class OperatorStateRecoveryRichFunction extends RichFlatMapFunction[String, (Int, String)] with CheckpointedFunction {

  //托管状态
  @transient 
  private var checkPointCountList: ListState[String] = _
  //原始状态
  private var list: ListBuffer[String] = new ListBuffer[String]

  //flatMap函数处理逻辑
  override def flatMap(value: String, out: Collector[(Int, String)]): Unit = {
    if (value == "xxd") {
      if (list.size > 0) {
        val outString: String = list.foldLeft("")(_ + " " + _)
        out.collect((list.size, outString))
        list.clear()
      }
    } else if (value == "e") {
      1 / 0
    } else {
      list += value
    }
  }

  //再checkpoint时存储,把正在处理的原始状态的数据保存到托管状态中
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkPointCountList.clear()
    list.foreach(f => checkPointCountList.add(f))
    println(s"snapshotState:${
      list
    }, Time=${System.currentTimeMillis()}")
  }

  //从statebackend中恢复保存的托管状态,并未来数据放到程序处理的原始状态中
  // 出错一次就调用一次这里,能调用几回是根据setRestartStrategy设置的
  override def initializeState(context: FunctionInitializationContext): Unit = {
    val lsd: ListStateDescriptor[String] = new ListStateDescriptor[String]("xxdListState", TypeInformation.of(new TypeHint[String] {}))
    checkPointCountList = context.getOperatorStateStore.getListState(lsd)
    if (context.isRestored) {// 出错恢复
      import scala.collection.convert.wrapAll._
      for (e <- checkPointCountList.get()) {
        list += e
      }
    }
    println(s"initializeState:${list},Time=${System.currentTimeMillis()}")
  }
}

二、Keyed State容错实现方法

  • Keyed State之过时超时策略
    • 因为Keyed State太多,因此flink提供了针对Keyed State TTL的设置
    • 任何类型的keyed State均可以设置TTL。若是TTL已配置,且状态已过时,则将以最佳方式处理
    • 全部State collection都支持条目级别的TTL,即list、map中的条目独立expire
    • 用法
      • StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class)
        stateDescriptor.enableTimeToLive(ttlConfig);
    • Refresh策略(默认是OnCreateAndWrite):设置如何更新keyedState的最后访问时间

      • StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不过时

      • StateTtlConfig.UpdateType.OnCreateAndWrite - 每次写操做均更新State的最后访问时间(Create、 Update)

      • StateTtlConfig.UpdateType.OnReadAndWrite - 每次读写操做均更新State的最后访问时间

    • 状态可见性(默认是NeverReturnExpired):设置是否返回过时的值(过时还没有清理,此时正好被访问)
      • StateTtlConfig.StateVisibility.NeverReturnExpired - 永不返回过时状态 
      • StateTtlConfig.StateVisibility.ReturnExpiredlfNotCleanedUp - 能够返回过时但还没有清理的状态值 
    • TTL time等级
      • setTimeCharacteristic(TimeCharacteristic timeCharacteristic) 
      • 目前只支持ProcessingTime
  • Keyed State之过时状态清理
    • 清理策略
    • 默认:已通过期的数据被显示读取时才会清理(可能会致使状态愈来愈大) 
    • FULL_STATE_SCAN_SNAPSHOT:在checkpoint时清理full snapshot中的expired state 
      • CleanupFullSnapshot() 
      • 不适用于在RocksDB state backend上的incremental checkpointing

KeyedState的checkPoint容错恢复 :

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

import scala.collection.mutable


/**
  * KeyedState的checkPoint容错恢复
  * 将输入格式为"字符串 数字"的字符串转换成(字符串,数字)的元组类型
  * 事件流:xxd 666
  * 当事件流中出现"任意字符串 888"时触发容错
  * 输出:
  * (xxd,666)
  */
object KeyedStateRecovery {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    //生成配置对象
    val config = new Configuration()
    //开启spark-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件,不然打印日志到控制台
    config.setString("web.log.path", "/tmp/logs/flink_log")
    //配置taskManager的日志文件,不然打印日志到控制台
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log")
    //配置tm有多少个slot
    config.setString("taskmanager.numberOfTaskSlots", "4")
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //并行度设置为1,是想让全部的key都跑到一个task中,以方便测试
    env.setParallelism(1)
    //隔多长时间执行一次ck
    env.enableCheckpointing(1000L)
    val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
    //保存EXACTLY_ONCE
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //每次ck之间的间隔,不会重叠
    checkpointConfig.setMinPauseBetweenCheckpoints(2000L)
    //每次ck的超时时间
    checkpointConfig.setCheckpointTimeout(10L)
    //若是ck执行失败,程序是否中止
    checkpointConfig.setFailOnCheckpointingErrors(true)
    //job在执行CANCE的时候是否删除ck数据
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
    //指定保存ck的存储模式
    val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true)
    // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false)
    // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true)
    env.setStateBackend(stateBackend)
    //恢复策略
    env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(
        3, // number of restart attempts
        Time.of(3, TimeUnit.SECONDS) // delay
      )
    )
    val input: DataStream[String] = env.socketTextStream("localhost", 6666)
    //由于KeyedStateRichFunctionString中使用了keyState,因此它必须在keyBy算子的后面
    input
      .map(f => {
        val strings: mutable.ArrayOps[String] = f.split(" ")
        (strings(0), strings(1).toInt)
      })
      .keyBy(0)
      .flatMap(new KeyedStateRecoveryRichFunctionString)
      .print()
    env.execute()
  }
}

//因为没有使用本地的状态因此不须要实现checkpoint接口
class KeyedStateRecoveryRichFunctionString extends RichFlatMapFunction[(String, Int), (String, Int)] {
  //ValueState是Key的state类型,是只能存在于KeyedStream的operator中
  @transient private var sum: ValueState[(String, Int)] = null

  override def flatMap(value: (String, Int), out: Collector[(String, Int)]): Unit = {
    println(s"state value:${sum.value()}")
    //当value值为888时,触发异常
    if (value._2 != 888) {
      sum.clear()
      sum.update(value)
      out.collect(value)
    } else {
      1 / 0
    }
  }

  //在operator启动时执行一次
  //若是operator出现异常,在恢复operator时会被再次执行
  override def open(parameters: Configuration): Unit = {
    //keyState的TTL策略
    val ttlConfig = StateTtlConfig
      //keyState的超时时间为10秒
      .newBuilder(Time.seconds(10))
      //当建立和更新时,从新计时超时时间
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      //失败时不返回keyState的值
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      //失败时返回keyState的值
//      .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
      //ttl的时间处理等级目前只支持ProcessingTime
      .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
      .build
    //从runtimeContext中得到ck时保存的状态
    val descriptor = new ValueStateDescriptor[(String, Int)]("xxdValueState", TypeInformation.of(new TypeHint[(String, Int)] {}))
    descriptor.enableTimeToLive(ttlConfig)
    sum = getRuntimeContext.getState(descriptor)
  }
}

3、SavePoint

概念:

  • savepoint能够理解为是一种特殊的checkpoint,savepoint就是指向checkpoint的一个指针,实际上也是 使用经过checkpointing机制建立的streaming job的一致性快照,能够保存数据源的offset、并行操做状态 也就是流处理过程当中的状态历史版本。须要手动触发,并且不会过时,不会被覆盖,除非手动删除。正常 状况下的线上环境是不须要设置savepoint的。除非对job或集群作出重大改动的时候, 须要进行测试运 行。
  • 能够从应用在过去的任意作了savepoint的时刻开始继续消费,具备能够replay的功能

Savepoint由两部分组成:

  • 数据目录:稳定存储上的目录,里面的二进制文件是streaming job状态的快照
  • 元数据文件:指向数据目录中属于当前Savepoint的数据文件的指针(绝对路径)

与Checkpoint的区别:

  • Savepoint至关于备份(类比数据库备份)、Checkpoint至关于recovery log
  • Checkpoint是Flink自动建立的"recovery log"用于故障自动恢复,由Flink建立,不须要用户交互。用户 cancel做业时就删除,除非启动了保留机制(External Checkpoint)
  • Savepoint由用户建立,拥有和删除,保存点在做业终止后仍然存在。

做用:

  • job开发新版本(更改job graph、更改并行度等等),应用从新发布
  • Flink版本的更新
  • 业务迁移,集群须要迁移,不允许数据丢失
相关文章
相关标签/搜索