Flink提供了Kafka connector用于消费/生产Apache Kafka topic的数据。Flink的Kafka consumer集成了checkpoint机制以提供精确一次的处理语义。在具体的实现过程当中,Flink不依赖于Kafka内置的消费组位移管理,而是在内部自行记录和维护consumer的位移。html
用户在使用时须要根据Kafka版原本选择相应的connector,以下表所示:java
Maven依赖 | 支持的最低Flink版本 | Kafka客户端类名 | 说明 |
flink-connector-kafka-0.8_2.10 | 1.0.0 | FlinkKafkaConsumer08apache FlinkKafkaProducer08bootstrap |
使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在内部会提交位移到Zookeeper |
flink-connector-kafka-0.9_2.10 | 1.0.0 | FlinkKafkaConsumer09api FlinkKafkaProducer09数组 |
使用Kafka新版本consumer |
flink-connector-kafka-0.10_2.10 | 1.2.0 | FlinkKafkaConsumer010maven FlinkKafkaProducer010函数 |
支持使用Kafka 0.10.0.0版本新引入的内置时间戳信息 |
而后,将上面对应的connector依赖加入到maven项目中,好比:性能
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>ui
Kafka Consumer
Flink kafka connector使用的consumer取决于用户使用的是老版本consumer仍是新版本consumer,新旧两个版本对应的connector类名是不一样的,分别是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它们都支持同时消费多个topic。
该Connector的构造函数包含如下几个字段:
下面给出一个实例:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
DeserializationSchema
Flink的Kafka consumer须要依靠用户指定的解序列化器来将二进制的数据转换成Java对象。DeserializationSchema接口就是作这件事情的,该接口中的deserialize方法做用于每条Kafka消息上,并把转换的结果发往Flink的下游operator。
一般状况下,用户直接继承AbstractDeserializationSchema来建立新的deserializer,也能够实现DeserializationSchema接口,只不过要自行实现getProducedType方法。
若是要同时解序列化Kafka消息的key和value,则须要实现KeyedDeserializationSchema接口,由于该接口的deserialize方法同时包含了key和value的字节数组。
Flink默认提供了几种deserializer:
一旦在解序列化过程当中出现错误,Flink提供了两个应对方法——1. 在deserialize方法中抛出异常,使得整个做业失败并重启;2. 返回null告诉Flink Kafka connector跳过这条异常消息。值得注意的是,因为consumer是高度容错的,若是采用第一种方式会让consumer再次尝试deserialize这条有问题的消息。所以假若deserializer再次失败,程序可能陷入一个死循环并不断进行错误重试。
Kafka consumer起始位移配置
Flink的Kafka consumer容许用户配置Kafka consumer的起始读取位移,以下列代码所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); myConsumer.setStartFromEarliest(); // start from the earliest record possible myConsumer.setStartFromLatest(); // start from the latest record myConsumer.setStartFromGroupOffsets(); // the default behaviour DataStream<String> stream = env.addSource(myConsumer); ...
全部版本的Flink Kafka consumer均可以使用上面的方法来设定起始位移。
Flink也支持用户自行指定位移,方法以下:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
上面的例子中,consumer将从用户指定的位移处开始读取消息。这里的位移记录的是下一条待消费消息的位移,而不是最新的已消费消息的位移。值得注意的是,若是待消费分区的位移不在保存的位移映射中,Flink Kafka connector会使用默认的组位移策略(即setStartFromGroupOffsets())。
另外,当任务自动地从失败中恢复或手动地从savepoint中恢复时,上述这些设置位移的方法是不生效的。在恢复时,每一个Kafka分区的起始位移都是由保存在savepoint或checkpoint中的位移来决定的。
Kafka consumer容错性
一旦启用了Flink的检查点机制(checkpointing),Flink Kafka消费者会按期地对其消费的topic作checkpoint以保存它消费的位移以及其余操做的状态。一旦出现失败,Flink将会恢复streaming程序到最新的checkpoint状态,而后从新从Kafka消费数据,从新读取的位置就是保存在checkpoint中的位移。
checkpoint的间隔决定了程序容错性的程度,它直接肯定了在程序崩溃时,程序回溯到的最久状态。
若是要使用启动容错性的Kafka消费者,按期对拓扑进行checkpoint就是很是必要的,实现方法以下面代码所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 每5秒作一次checkpoint
须要注意的是,只有槽位(slot)充足Flink才会重启拓扑,所以一旦拓扑因没法链接TaskManager而崩溃,仍然须要有足够的slot才能重启拓扑。若是使用YARN的话,Flink可以自动地重启丢失的YARN容器。
若是没有启用checkpoint,那么Kafka consumer会按期地向Zookeeper提交位移。
Kafka consumer位移提交
Flink Kafka consumer能够自行设置位移提交的行为。固然,它不依赖于这些已提交的位移来实现容错性。这些提交位移只是供监控使用。
配置位移提交的方法各异,主要依赖因而否启用了checkpointing机制:
Kafka consumer时间戳提取/水位生成
一般,事件或记录的时间戳信息是封装在消息体中。至于水位,用户能够选择按期地发生水位,也能够基于某些特定的Kafka消息来生成水位——这分别就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用场景。
用户也可以自定义时间戳提取器/水位生成器,具体方法参见这里,而后按照下面的方式传递给consumer:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();
在内部,Flink会为每一个Kafka分区都执行一个对应的assigner实例。一旦指定了这样的assigner,对于每条Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法会被调用来给消息分配时间戳,而getCurrentWatermark()方法(定时生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基于特定条件)会被调用以肯定是否发送新的水位值。