一、Operator Statejava
二、Keyed Statenode
三、状态的表现形式web
四、Operator State与Keyed State的Redistribute(从新分配)数据库
1)、Operator State Redistribute Redistributeapache
2)、Keyed State的Redistributewindows
概念后端
特色: api
基本原理:数据结构
全局异步化是snapshot的核心机制 并发
Flink分布式快照的核心概念之一就是数据栅栏(barrier)。这些barrier被插入到数据流中,做为数据流的一部分和数据一块儿向下流动。Barrier不会干扰正常数据,数据严格有序。一个barrier把数据流分割成两部分:一部 分进入到当前快照,另外一部分进入下一个快照。每个barrier都带有快照ID,而且barrier以前的数据都进入了 此快照。Barrier不会干扰数据流处理,因此很是轻量。多个不同快照的多个barrier会在流中同时出现,即多个 快照可能同时建立。
使用Checkpointing的前提条件:
checkpoint:
Restore:
一、开启checkPoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //start a checkpoint every 1000 ms 1000-checkpoint时间间隔 env.enableCheckpointing(1000); //advanced options: checkpoint保证形式 //set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //make sure 500 ms of progress happen between checkpoints 两次间隔最小时间,若是上次没有完成会等待完成在执行下一次 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //checkpoints have to complete within one minute,or are discarded ;超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); //allow only one checkpoint to be in progress at the same time; checkpoint 并行度 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //enable externalized checkpoints which are retained after job cancellation;任务结束,checkpoint是否保留 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
二、CheckpointConfig设置说明
//set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
默认状况下,检查点不被保留,仅用于从故障中恢复做业。能够启用外部持久化检查点,同时指定保留策略
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在做业取消时保留检查点。注意,在这种状况下,必须在取消后手动清理检查点状态。
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION看成业被cancel时,删除检查点。检查点状态仅在做业失败时可用。
//checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000);
//make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
三、选择State Backend
目前Flink自带三个开箱即用State Backend:
StateBackend backend = new MemoryStateBackend(10*1024*1024,false); env.setStateBackend(backend);
限制
单个State的大小默认限制为5MB,能够在MemoryStateBackend的构造函数中增长
不论如何配置,State大小都没法大于akka.framesize(JobManager和TaskManager之间发送的最大消息 的大小默认是10MB)
JobManager必须有足够的内存大小
FsStateBackend
FsStateBackend须要配置一个文件系统的URL, 如 "hdfs://namenode:40010/flink/checkpoint"。
FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot 写入文件系 统目录下的文件中。文件的路径等元数据会传递给JobManager,存在其内存中。
FsStateBackend可使用异步的方式进行快照(默认开启),推荐使用异步的方式避免阻塞。若是不但愿异 步能够在构造的时候传入false(也能够经过全局配置文件指定),以下:
StateBackend backend = new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",false); env.setStateBackend(backend);
RocksDBStateBackend
RocksDBStateBackend须要配置一个文件系统的URL来, 如"hdfs://namenode:40010/flink/checkpoint"
RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,(默认状况下)存储在TaskManager数据 目录中,在Checkpoint时,整个RocksDB数据库将被Checkpointed到配置的文件系统和目录中。文件的路径 等元数据会传递给JobManager,存在其内存中。
RocksDBStateBackend老是执行异步快照
限制
RocksDB JNI API是基于byte[],所以key和value最大支持大小为2^31个字节(2GB)。RocksDB自身在 支持较大value时候有问题
# The backend that will be used to store operator state checkpoints state.backend: filesystem #Directory.for storing checkpoints state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
四、配置恢复策略
Flink支持不同的重启策略,这些策略控制在出现故障时如何从新启动job
一、operatorState的checkPoint容错案例:
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer /** * OperatorState的checkPoint容错恢复 * 想知道两次事件 xxd 之间,一共发生多少次其余事件,分别是什么事件 * 事件流:xxd a a a a a f d d xxd ad d s s d xxd… * 当事件流中出现字母e时触发容错 * 输出: * (8,a a a a a f d d) * (6,ad d s s d) */ object OperatorStateRecovery { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置对象 val config = new Configuration() //开启spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,不然打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,不然打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少个slot config.setString("taskmanager.numberOfTaskSlots", "4") // 获取local运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //设置全局并行度为1,好让全部数据都跑到一个task中,以方便测试 env.setParallelism(1) //隔多长时间执行一次ck 毫秒 env.enableCheckpointing(1000L) val checkpointConfig: CheckpointConfig = env.getCheckpointConfig //保存EXACTLY_ONCE checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //每次ck之间的间隔,不会重叠 checkpointConfig.setMinPauseBetweenCheckpoints(2000L) //每次ck的超时时间 checkpointConfig.setCheckpointTimeout(10L) //若是ck执行失败,程序是否中止 checkpointConfig.setFailOnCheckpointingErrors(true) //job在执行CANCE的时候是否删除ck数据 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //指定保存ck的存储模式 val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) //异步同步 // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false) // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true) env.setStateBackend(stateBackend) //恢复策略,恢复三次,间隔0秒 env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(0, TimeUnit.SECONDS) // delay ) ) val input: DataStream[String] = env.socketTextStream("localhost", 6666) input .flatMap(new OperatorStateRecoveryRichFunction) .print() env.execute() } } //因为使用了本地状态因此须要checkpoint的snapshotState方法把本地状态放到托管状态中 class OperatorStateRecoveryRichFunction extends RichFlatMapFunction[String, (Int, String)] with CheckpointedFunction { //托管状态 @transient private var checkPointCountList: ListState[String] = _ //原始状态 private var list: ListBuffer[String] = new ListBuffer[String] //flatMap函数处理逻辑 override def flatMap(value: String, out: Collector[(Int, String)]): Unit = { if (value == "xxd") { if (list.size > 0) { val outString: String = list.foldLeft("")(_ + " " + _) out.collect((list.size, outString)) list.clear() } } else if (value == "e") { 1 / 0 } else { list += value } } //再checkpoint时存储,把正在处理的原始状态的数据保存到托管状态中 override def snapshotState(context: FunctionSnapshotContext): Unit = { checkPointCountList.clear() list.foreach(f => checkPointCountList.add(f)) println(s"snapshotState:${ list }, Time=${System.currentTimeMillis()}") } //从statebackend中恢复保存的托管状态,并未来数据放到程序处理的原始状态中 // 出错一次就调用一次这里,能调用几回是根据setRestartStrategy设置的 override def initializeState(context: FunctionInitializationContext): Unit = { val lsd: ListStateDescriptor[String] = new ListStateDescriptor[String]("xxdListState", TypeInformation.of(new TypeHint[String] {})) checkPointCountList = context.getOperatorStateStore.getListState(lsd) if (context.isRestored) {// 出错恢复 import scala.collection.convert.wrapAll._ for (e <- checkPointCountList.get()) { list += e } } println(s"initializeState:${list},Time=${System.currentTimeMillis()}") } }
二、Keyed State容错实现方法
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class) stateDescriptor.enableTimeToLive(ttlConfig);
Refresh策略(默认是OnCreateAndWrite):设置如何更新keyedState的最后访问时间
StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不过时
StateTtlConfig.UpdateType.OnCreateAndWrite - 每次写操做均更新State的最后访问时间(Create、 Update)
StateTtlConfig.UpdateType.OnReadAndWrite - 每次读写操做均更新State的最后访问时间
KeyedState的checkPoint容错恢复 :
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import scala.collection.mutable /** * KeyedState的checkPoint容错恢复 * 将输入格式为"字符串 数字"的字符串转换成(字符串,数字)的元组类型 * 事件流:xxd 666 * 当事件流中出现"任意字符串 888"时触发容错 * 输出: * (xxd,666) */ object KeyedStateRecovery { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置对象 val config = new Configuration() //开启spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,不然打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,不然打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少个slot config.setString("taskmanager.numberOfTaskSlots", "4") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //并行度设置为1,是想让全部的key都跑到一个task中,以方便测试 env.setParallelism(1) //隔多长时间执行一次ck env.enableCheckpointing(1000L) val checkpointConfig: CheckpointConfig = env.getCheckpointConfig //保存EXACTLY_ONCE checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //每次ck之间的间隔,不会重叠 checkpointConfig.setMinPauseBetweenCheckpoints(2000L) //每次ck的超时时间 checkpointConfig.setCheckpointTimeout(10L) //若是ck执行失败,程序是否中止 checkpointConfig.setFailOnCheckpointingErrors(true) //job在执行CANCE的时候是否删除ck数据 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //指定保存ck的存储模式 val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false) // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true) env.setStateBackend(stateBackend) //恢复策略 env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(3, TimeUnit.SECONDS) // delay ) ) val input: DataStream[String] = env.socketTextStream("localhost", 6666) //由于KeyedStateRichFunctionString中使用了keyState,因此它必须在keyBy算子的后面 input .map(f => { val strings: mutable.ArrayOps[String] = f.split(" ") (strings(0), strings(1).toInt) }) .keyBy(0) .flatMap(new KeyedStateRecoveryRichFunctionString) .print() env.execute() } } //因为没有使用本地的状态因此不须要实现checkpoint接口 class KeyedStateRecoveryRichFunctionString extends RichFlatMapFunction[(String, Int), (String, Int)] { //ValueState是Key的state类型,是只能存在于KeyedStream的operator中 @transient private var sum: ValueState[(String, Int)] = null override def flatMap(value: (String, Int), out: Collector[(String, Int)]): Unit = { println(s"state value:${sum.value()}") //当value值为888时,触发异常 if (value._2 != 888) { sum.clear() sum.update(value) out.collect(value) } else { 1 / 0 } } //在operator启动时执行一次 //若是operator出现异常,在恢复operator时会被再次执行 override def open(parameters: Configuration): Unit = { //keyState的TTL策略 val ttlConfig = StateTtlConfig //keyState的超时时间为10秒 .newBuilder(Time.seconds(10)) //当建立和更新时,从新计时超时时间 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //失败时不返回keyState的值 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //失败时返回keyState的值 // .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //ttl的时间处理等级目前只支持ProcessingTime .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime) .build //从runtimeContext中得到ck时保存的状态 val descriptor = new ValueStateDescriptor[(String, Int)]("xxdValueState", TypeInformation.of(new TypeHint[(String, Int)] {})) descriptor.enableTimeToLive(ttlConfig) sum = getRuntimeContext.getState(descriptor) } }
概念:
Savepoint由两部分组成:
与Checkpoint的区别:
做用: