flink版本:1.12java
阿里云kafka: 2.2.0正则表达式
端到端(End to End)的精确一次,它指的是 Flink 应用从 Source 端开始到 Sink 端结束,数据必须通过的起始点和结束点。Flink 自身是没法保证外部系统“精确一次”语义的,flink只能保证计算的精确一次性,存储的一致性必须由外部保证,外部系统必须支持“精确一次”语义;而后借助 Flink 提供的分布式快照和两阶段提交才能实现。apache
Flink 经过 CheckPoint 机制来按期保存计算任务的快照,这个快照中主要包含两个重要的数据: 1.整个计算任务的状态。这个状态主要是计算任务中,每一个子任务在计算过程当中须要保存的临时状态数据。 2.数据源的位置信息。这个信息记录了在数据源的这个流中已经计算了哪些数据。若是数据源是 Kafka 的主题,这个位置信息就是 Kafka 主题中的消费位置。bootstrap
有了 CheckPoint,当计算任务失败重启的时候,能够从最近的一个 CheckPoint 恢复计算任务。具体的作法是,每一个子任务先从 CheckPoint 中读取并恢复本身的状态,而后整个计算任务从 CheckPoint 中记录的数据源位置开始消费数据,只要这个恢复位置和 CheckPoint 中每一个子任务的状态是彻底对应的。api
每一个 Flink 的 CheckPoint 对应一个 Kafka 事务。Flink 在建立一个 CheckPoint 的时候,同时开启一个 Kafka 的事务,完成 CheckPoint 同时提交 Kafka 的事务。当计算任务重启的时候,在 Flink 中计算任务会恢复到上一个 CheckPoint,这个 CheckPoint 正好对应 Kafka 上一个成功提交的事务。未完成的 CheckPoint 和未提交的事务中的消息都会被丢弃,这样就实现了端到端的 Exactly Once。markdown
package org.example
import java.time.Duration
import java.util.Properties
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
object KafkaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Exactly-Once
env.enableCheckpointing(10 * 1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val backend: FsStateBackend = new FsStateBackend("file:///tmp/checkpoint",true)
env.setStateBackend(backend)
// Kafka 配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxxx:9092")
properties.setProperty("group.id", "dev-miaozhen-metric-group")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("enable.auto.commit", "false")
// 这个配置只是对于后序的consumer有做用,让它只消费已commited的数据
properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
val myCosumer = new FlinkKafkaConsumer[String](
// 主题发现 按照正则表达式自动发现topic
java.util.regex.Pattern.compile("miaozhen_raw_new[0-9]{0,}"), new SimpleStringSchema(), properties)
// 这种状态下提交到kafka的 offset能够忽略,不起做用
myCosumer.setCommitOffsetsOnCheckpoints(true)
// 从数据源生成watermark,这样的watermark更精准
val watermark: WatermarkStrategy[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
myCosumer.assignTimestampsAndWatermarks(watermark)
val producerProperty = new Properties()
producerProperty.setProperty("bootstrap.servers", "xxxx:9092")
producerProperty.setProperty("group.id", "dev-miaozhen-metric-group")
producerProperty.setProperty("transaction.timeout.ms", "60000")
//producerProperty.put(ProducerConfig.ACKS_CONFIG, "all") // 设置producer的ack传输配置
// 保证kafka的全局事务性,而不单单是分区事务性
producerProperty.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
producerProperty.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0")
//producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "10000")
val myProducer = new FlinkKafkaProducer[String](
"exactly_once", // target topic
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), // serialization schema
producerProperty,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
val stream:DataStream[String] = env.addSource(myCosumer)
stream.map { text =>
val arr: Array[String] = text.split(",")
SensorReading(arr(0).trim, arr(1).toLong, arr(2).toDouble).toString
}.addSink(myProducer)
env.execute()
}
}
复制代码
kafka自身的exacly-once保证能够参考developer.aliyun.com/article/768… 分为幂等性producer和事务性producer.其中幂等性producer指的是分区的幂等性,经过sequence nums的递增来保证,事务性producer指的是topic'的幂等性,经过PID的惟一性来保证。app