Flink Kafa Connector是Flink内置的Kafka链接器,它包含了从Kafka Topic读入数据的Flink Kafka Consumer
以及向Kafka Topic写出数据的Flink Kafka Producer
,除此以外Flink Kafa Connector基于Flink Checkpoint机制提供了完善的容错能力。本文从Flink Kafka Connector的基本使用到Kafka在Flink中端到端的容错原理展开讨论。html
在Flink中使用Kafka Connector时须要依赖Kafka的版本,Flink针对不一样的Kafka版本提供了对应的Connector实现。java
既然Flink对不一样版本的Kafka有不一样实现,在使用时须要注意区分,根据使用环境引入正确的依赖关系。算法
1<dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>${flink_kafka_connector_version}</artifactId>
4 <version>${flink_version}</version>
5</dependency>复制代码
在上面的依赖配置中${flink_version}指使用Flink的版本,${flink_connector_kafka_version}
指依赖的Kafka connector版本对应的artifactId。下表描述了截止目前为止Kafka服务版本与Flink Connector之间的对应关系。
Flink官网内容Apache Kafka Connector(ci.apache.org/projects/fl…)中也有详细的说明。apache
从Flink 1.7版本开始为Kafka 1.0.0及以上版本提供了全新的的Kafka Connector支持,若是使用的Kafka版本在1.0.0及以上能够忽略因Kafka版本差别带来的依赖变化。bootstrap
明确了使用的Kafka版本后就能够编写一个基于Flink Kafka读/写的应用程序「本文讨论内容所有基于Flink 1.7版本和Kafka 1.1.0版本」。根据上面描述的对应关系在工程中添加Kafka Connector依赖。
后端
1<dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>flink-connector-kafka_2.11</artifactId>
4 <version>1.7.0</version>
5</dependency>复制代码
下面的代码片断是从Kafka Topic「flink_kafka_poc_input」中消费数据,再写入Kafka Topic「flink_kafka_poc_output」的简单示例。示例中除了读/写Kafka Topic外,没有作其余的逻辑处理。bash
1public static void main(String[] args) {
2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3
4 /** 初始化Consumer配置 */
5 Properties consumerConfig = new Properties();
6 consumerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
7 consumerConfig.setProperty("group.id", "flink_poc_k110_consumer");
8
9 /** 初始化Kafka Consumer */
10 FlinkKafkaConsumer<String> flinkKafkaConsumer =
11 new FlinkKafkaConsumer<String>(
12 "flink_kafka_poc_input",
13 new SimpleStringSchema(),
14 consumerConfig
15 );
16 /** 将Kafka Consumer加入到流处理 */
17 DataStream<String> stream = env.addSource(flinkKafkaConsumer);
18
19 /** 初始化Producer配置 */
20 Properties producerConfig = new Properties();
21 producerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
22
23 /** 初始化Kafka Producer */
24 FlinkKafkaProducer<String> myProducer =
25 new FlinkKafkaProducer<String>(
26 "flink_kafka_poc_output",
27 new MapSerialization(),
28 producerConfig
29 );
30 /** 将Kafka Producer加入到流处理 */
31 stream.addSink(myProducer);
32
33 /** 执行 */
34 env.execute();
35}
36
37class MapSerialization implements SerializationSchema<String> {
38 public byte[] serialize(String element) {
39 return element.getBytes();
40 }
41}
复制代码
Flink API使用起来确实很是简单,调用addSource
方法和addSink
方法就能够将初始化好的FlinkKafkaConsumer
和FlinkKafkaProducer
加入到流处理中。execute
执行后,KafkaConsumer和KafkaProducer就能够开始正常工做了。网络
众所周知,Flink支持Exactly-once semantics。什么意思呢?翻译过来就是「刚好一次语义」。流处理系统中,数据源源不断的流入到系统、被处理、最后输出结果。咱们都不但愿系统因人为或外部因素产生任何意想不到的结果。对于Exactly-once语义达到的目的是指即便系统被人为中止、因故障shutdown、无端关机等任何因素中止运行状态时,对于系统中的每条数据不会被重复处理也不会少处理。
数据结构
Flink宣称支持Exactly-once其针对的是Flink应用内部的数据流处理。但Flink应用内部要想处理数据首先要有数据流入到Flink应用,其次Flink应用对数据处理完毕后也理应对数据作后续的输出。在Flink中数据的流入称为Source,数据的后续输出称为Sink,对于Source和Sink彻底依靠外部系统支撑(好比Kafka)。分布式
Flink自身是没法保证外部系统的Exactly-once语义。但这样一来其实并不能称为完整的Exactly-once,或者说Flink并不能保证端到端Exactly-once。而对于数据精准性要求极高的系统必需要保证端到端的Exactly-once,所谓端到端是指Flink应用从Source一端开始到Sink一端结束,数据必经的起始和结束两个端点。
那么如何实现端到端的Exactly-once呢?Flink应用所依赖的外部系统须要提供Exactly-once支撑,并结合Flink提供的Checkpoint机制和Two Phase Commit才能实现Flink端到端的Exactly-once。对于Source和Sink的容错保障,Flink官方给出了具体说明:
Fault Tolerance Guarantees of Data Sources and Sinks(ci.apache.org/projects/fl…)
在讨论基于Kafka端到端的Exactly-once以前先简单了解一下Flink Checkpoint,详细内容在《Flink Checkpoint原理》中有作讨论。Flink Checkpoint是Flink用来实现应用一致性快照的核心机制,当Flink因故障或其余缘由重启后能够经过最后一次成功的Checkpoint将应用恢复到当时的状态。若是在应用中启用了Checkpoint,会由JobManager按指定时间间隔触发Checkpoint,Flink应用内全部带状态的Operator会处理每一轮Checkpoint生命周期内的几个状态。
CheckpointedFunction
接口定义。Task启动时获取应用中全部实现了CheckpointedFunction
的Operator,并触发执行initializeState
方法。在方法的实现中通常都是从状态后端将快照状态恢复。CheckpointedFunction
接口定义。JobManager会按期发起Checkpoint,Task接收到Checkpoint后获取应用中全部实现了CheckpointedFunction
的Operator并触发执行对应的snapshotState
方法。1public interface CheckpointedFunction {
2 void snapshotState(FunctionSnapshotContext context) throws Exception;
3 void initializeState(FunctionInitializationContext context) throws Exception;
4}复制代码
CheckpointListener
接口定义。当基于同一个轮次(checkpointId相同)的Checkpoint快照所有处理成功后获取应用中全部实现了CheckpointListener
的Operator并触发执行notifyCheckpointComplete
方法。触发notifyCheckpointComplete
方法时携带的checkpointId参数用来告诉Operator哪一轮Checkpoint已经完成。1public interface CheckpointListener {
2 void notifyCheckpointComplete(long checkpointId) throws Exception;
3}复制代码
Kafka是很是收欢迎的分布式消息系统,在Flink中它能够做为Source,同时也能够做为Sink。Kafka 0.11.0及以上版本提供了对事务的支持,这让Flink应用搭载Kafka实现端到端的exactly-once成为了可能。下面咱们就来深刻了解提供了事务支持的Kafka是如何与Flink结合实现端到端exactly-once的。
本文忽略了Barrier机制,因此示例和图中都以单线程为例。Barrier在《Flink Checkpoint原理》有较多讨论。
Flink Kafka Consumer
Kafka自身提供了可重复消费消息的能力,Flink结合Kafka的这个特性以及自身Checkpoint机制,得以实现Flink Kafka Consumer的容错。
Flink Kafka Consumer是Flink应用从Kafka获取数据流消息的一个实现。除了数据流获取、数据发送下游算子这些基本功能外它还提供了完善的容错机制。这些特性依赖了其内部的一些组件以及内置的数据结构协同处理完成。这里,咱们先简单了解这些组件和内置数据结构的职责,再结合Flink 运行时 和 故障恢复时 两个不一样的处理时机来看一看它们之间是如何协同工做的。
AbstractPartitionDiscoverer
负责得到指定topic的元数据信息,并将获取到的topic元数据信息封装成KafkaTopicPartition
集合。String topic
和int partition
两个主要属性。假设topic A有2个分区,经过组件AbstractPartitionDiscoverer
处理后将获得由两个KafkaTopicPartition
对象组成的集合:KafkaTopicPartition(topic:A, partition:0)
和KafkaTopicPartition(topic:A, partition:1)
AbstractFetcher
组件负责完成这部分功能。除此以外Fetcher还负责offset的提交、KafkaTopicPartitionState
结构的数据维护。KafkaTopicPartitionState
是一个很是核心的数据结构,基于内部的4个基本属性,Flink Kafka Consumer维护了topic、partition、已消费offset、待提交offset的关联关系。Flink Kafka Consumer的容错机制依赖了这些数据。KafkaTopicPartitionState
还有两个子类,一个是支持PunctuatedWatermark
的实现,另外一个是支持PeriodicWatermark
的实现,这两个子类在原有基础上扩展了对水印的支持,咱们这里不作过多讨论。ListState<Tuple2<KafkaTopicPartition, Long>>
定义了状态存储结构,在这里Long表示的是offset类型,因此实际上就是使用KafkaTopicPartition
和offset组成了一个对儿,再添加到状态后端集合。KafkaTopicPartition
,value是对应已消费的offset。恢复成功后,应用恢复到故障前Flink Kafka Consumer消费的offset,并继续执行任务,就好像什么都没发生同样。咱们假设Flink应用正常运行,Flink Kafka Consumer消费topic为Topic-A
,Topic-A
只有一个partition。在运行期间,主要作了这么几件事
接下来咱们再结合消息真正开始处理后,KafkaTopicPartitionState结构中的数据变化。
能够看到,随着应用的运行,KafkaTopicPartitionState
中的offset属性值发生了变化,它记录了已经发送到下游算子消息在Kafka中的offset。在这里因为消息P0-C
已经发送到下游算子,因此KafkaTopicPartitionState.offset
变动为2。
FlinkKafkaConsumer
实现了CheckpointedFunction
,因此它具有快照状态(snapshotState)的能力。在实现中,snapshotState具体干了这么两件事下图描述当一轮Checkpoint开始时FlinkKafkaConsumer
的处理过程。在例子中,FlinkKafkaConsumer已经将offset=3的P0-D
消息发送到下游,当checkpoint触发时将topic=Topic-A;partition=0;offset=3做为最后的状态持久化到外部存储。
待提交offset
的Map集合,其中key是CheckpointId。FlinkKafkaConsumer
当前运行状态持久化,即将topic、partition、offset持久化。一旦出现故障,就能够根据最新持久化的快照进行恢复。下图描述当一轮Checkpoint开始时FlinkKafkaConsumer
的处理过程。在例子中,FlinkKafkaConsumer已经将offset=3的P0-D
消息发送到下游,当checkpoint触发时将topic=Topic-A;partition=0;offset=3做为最后的状态持久化到外部存储。
CheckpointListener.notifyCheckpointComplete(checkpointId)
通知算子Checkpoint完成,参数checkpointId指明了本次通知是基于哪一轮Checkpoint。在FlinkKafkaConsumer
的实现中,接到Checkpoint完成通知后会变动KafkaTopicPartitionState.commitedOffset
属性值。最后再将变动后的commitedOffset提交到Kafka brokers或Zookeeper。topic=Topic-A;partition=0;offset=3
的状态作了快照,在真正提交offset时是将快照的offset + 1
做为结果提交的。「源代码KafkaFetcher.java 207行
doCommitInternalOffsetsToKafka方法」Flink应用崩溃后,开始进入恢复模式。假设Flink Kafka Consumer最后一次成功的快照状态是topic=Topic-A;partition=0;offset=3
,在恢复期间按照下面的前后顺序执行处理。
CheckpointedFunction.initializeState
接口定义。在FlinkKafkaConsumer
的实现中,从状态后端得到快照并写入到内部存储结构TreeMap,其中key是由KafkaTopicPartition
表示的topic与partition,value为offset。下图描述的是故障恢复的第一个阶段,从状态后端得到快照,并恢复到内部存储。待消费信息
。若是应用须要从快照恢复状态,则从待恢复状态
中初始化这个Map结构。下图是该阶段从快照恢复的处理过程。function初始化阶段兼容了正常启动和状态恢复时offset的初始化。对于正常启动过程,StartupMode
的设置决定待消费信息
中的结果。该模式共有5种,默认为StartupMode.GROUP_OFFSETS
。
FlinkKafkaConsumer
运行起来,下图描述了这个阶段的处理流程这里对图中两个步骤作个描述
topic=Topic-A;partition=0;offset=3
初始化Flink Kafka Consumer
内部维护的Kafka处理状态。由于是恢复流程,因此这个内部维护的处理状态也应该随着快照恢复。状态后端offset + 1
。在例子中,消费Kafka数据前将offset重置为4,因此状态恢复后KafkaConsumer是从offset=4位置开始消费。「源代码KafkaConsumerThread.java 428行
」总结
上述的3个步骤是恢复期间主要的处理流程,一旦恢复逻辑执行成功,后续处理流程与正常运行期间一致。最后对FlinkKafkaConsumer用一句话作个总结。
「将offset提交权交给FlinkKafkaConsumer,其内部维护Kafka消费及提交的状态。基于Kafka可重复消费能力并配合Checkpoint机制和状态后端存储能力,就能实现FlinkKafkaConsumer容错性,即Source端的Exactly-once语义」。
Flink Kafka Producer
Flink Kafka Producer是Flink应用向Kafka写出数据的一个实现。在Kafka 0.11.0及以上版本中提供了事务支持,这让Flink搭载Kafka的事务特性能够轻松实现Sink端的Exactly-once语义。关于Kafka事务特性在《Kafka幂等与事务》中作了详细讨论。
在Flink Kafka Producer中,有一个很是重要的组件FlinkKafkaInternalProducer
,这个组件代理了Kafka客户端org.apache.kafka.clients.producer.KafkaProducer
,它为Flink Kafka Producer操做Kafka提供了强有力的支撑。在这个组件内部,除了代理方法外,还提供了一些关键操做。我的认为,Flink Kafka Sink可以实现Exactly-once语义除了须要Kafka支持事务特性外,同时也离不开FlinkKafkaInternalProducer
组件提供的支持,尤为是下面这些关键操做:
FlinkKafkaInternalProducer
组件中最关键的处理当属事务重置,事务重置由resumeTransaction方法实现「源代码FlinkKafkaInternalProducer.java
144行」。因为Kafka客户端未暴露针对事务操做的API,因此在这个方法内部,大量的使用了反射。方法中使用反射得到KafkaProducer依赖的transactionManager对象,并将状态后端快照的属性值恢复到transactionManager对象中,这样以达到让Flink Kafka Producer应用恢复到重启前的状态。下面咱们结合Flink 运行时 和 故障恢复 两个不一样的处理时机来了解Flink Kafka Producer内部如何工做。
运行时
咱们假设Flink应用正常运行,Flink Kafka Producer正常接收上游数据并写到Topic-B的Topic中,Topic-B只有一个partition。在运行期间,主要作如下几件事:
FlinkKafkaProducer
,FlinkKafkaProducer
接到数据后封装ProducerRecord
对象并调用Kafka客户端KafkaProducer.send
方法将ProducerRecord
对象写入缓冲「源代码FlinkKafkaProducer.java 616行」。下图是该阶段的描述:FlinkKafkaProducer
做为Kafka Sink,它继承抽象类TwoPhaseCommitSinkFunction
,根据名字就能知道,这个抽象类主要实现两阶段提交
。为了集成Flink Checkpoint机制,抽象类实现了CheckpointedFunction
和CheckpointListener
,所以它具有快照状态(snapshotState)能力。状态快照处理具体作了下面三件事:isolation.level=read_committed
,那么此时这个消费端还没法poll到flush的数据,由于这些数据还没有commit。何时commit呢?在快照结束处理
阶段进行commit,后面会提到。TransactionHolder
表示的事务对象。TransactionHolder
对象内部记录了transactionalId、producerId、epoch以及Kafka客户端kafkaProducer的引用。下图是状态快照处理阶段处理过程
TwoPhaseCommitSinkFunction
实现了CheckpointListener
,应用中全部算子的快照处理成功后会收到基于某轮Checkpoint完成的通知。当FlinkKafkaProducer
收到通知后,主要任务就是提交上一阶段产生的事务,而具体要提交哪些事务是从上一阶段生成的待提交事务集合中获取的。图中第4步执行成功后,flush到Kafka的数据从UNCOMMITTED变动为COMMITTED,这意味着此时消费端能够poll到这批数据了。
2PC(两阶段提交)理论的两个阶段分别对应了FlinkKafkaProducer的状态快照处理
阶段和快照结束处理
阶段,前者是经过Kafka的事务初始化、事务开启、flush等操做预提交事务,后者是经过Kafka的commit操做真正执行事务提交。
Flink应用崩溃后,FlinkKafkaProducer
开始进入恢复模式。下图为应用崩溃前的状态描述:
在恢复期间主要的处理在状态初始化阶段。当Flink任务重启时会触发状态初始化,此时应用与Kafka已经断开了链接。但在运行期间可能存在数据flush还没有提交的状况。
若是想从新提交这些数据须要从状态后端恢复当时KafkaProducer持有的事务对象,具体一点就是恢复当时事务的transactionalId、producerId、epoch。这个时候就用到了FlinkKafkaInternalProducer
组件中的事务重置,在状态初始化时从状态后端得到这些事务信息,并重置到当前KafkaProducer中,再执行commit操做。这样就能够恢复任务重启前的状态,Topic-B的消费端依然能够poll到应用恢复后提交的数据。
须要注意的是:若是这个重置并提交的动做失败了,可能会形成数据丢失。
下图描述的是状态初始化阶段的处理流程:
总结
FlinkKafkaProducer
故障恢复期间,状态初始化是比较重要的处理阶段。这个阶段在Kafka事务特性的强有力支撑下,实现了事务状态的恢复,而且使得状态存储占用空间最小。依赖Flink提供的TwoPhaseCommitSinkFunction
实现类,咱们本身也能够对Sink作更多的扩展。
本文做者:TalkingData 史天舒 封面来源于网络,若有侵权请联系删除