本文主要研究下flink的checkpoint配置html
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
单位milliseconds
),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也能够指定为CheckpointingMode.AT_LEAST_ONCE大概几毫秒
)能够使用CheckpointingMode.AT_LEAST_ONCE,其余大部分应用使用CheckpointingMode.EXACTLY_ONCE就能够单位milliseconds
),超时没完成就会被abort掉大于1的值不起做用
)#============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false
默认为true
),有些不支持async或者只支持async的state backend可能会忽略这个参数用于指定checkpoint执行的超时时间,单位milliseconds
),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成以后最小等多久能够出发另外一个checkpoint
),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多能够有多少个,若是有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起做用
),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state没法自动清理,可是在job canceled的时候能够配置是删除仍是保留state
)