本文基于Flink1.9版本简述如何链接Kafka。java
咱们知道能够本身来开发Source 和 Sink ,可是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。git
预约义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。github
预约义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。数据库
链接器能够和多种多样的第三方系统进行交互。目前支持如下系统:apache
请记住,在使用一种链接器时,一般须要额外的第三方组件,好比:数据存储服务器或者消息队列。bootstrap
Apache Bahir 中定义了其余一些链接器api
使用connector并非惟一能够使数据进入或者流出Flink的方式。一种常见的模式是从外部数据库或者 Web 服务查询数据获得初始数据流,而后经过 Map
或者 FlatMap
对初始数据流进行丰富和加强,这里要使用Flink的异步IO。服务器
而向外部存储推送大量数据时会致使 I/O 瓶颈问题出现。在这种场景下,若是对数据的读操做远少于写操做,可让外部应用从 Flink 拉取所需的数据,须要用到Flink的可查询状态接口。异步
本文重点介绍Apache Kafka Connectorsocket
此链接器提供对Apache Kafka提供的事件流的访问。
Flink提供特殊的Kafka链接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不彻底依赖Kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。
下表为不一样版本的kafka与Flink Kafka Consumer的对应关系。
Maven Dependency |
Supported since | Consumer and Producer Class name |
Kafka version |
---|---|---|---|
flink-connector-kafka-0.8_2.11 |
1.0.0 |
FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x |
flink-connector-kafka-0.9_2.11 |
1.0.0 |
FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x |
flink-connector-kafka-0.10_2.11 | 1.2.0 |
FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x |
flink-connector-kafka-0.11_2.11 | 1.4.0 |
FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x |
flink-connector-kafka_2.11 |
1.7.0 |
FlinkKafkaConsumer FlinkKafkaProducer | >= 1.0.0 |
而从最新的Flink1.9.0版本开始,使用Kafka 2.2.0客户端。
下面简述使用步骤。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>复制代码
能够参照 Kafka入门宝典(详细截图版)
从Flink 1.7开始,它不跟踪特定的Kafka主要版本。相反,它在Flink发布时跟踪最新版本的Kafka。若是您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka链接器。若是使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的链接器。
升级Connect要注意Flink升级做业,同时
uid
)。 stop --withSavepoint
)。 引入依赖后,实例化新的source(FlinkKafkaConsumer
)和sink(FlinkKafkaProducer
)。
先分步骤介绍构建过程,文末附Flink1.9链接Kafka完整代码。
Kafka consumer 根据版本分别叫作FlinkKafkaConsumer08 FlinkKafkaConsumer09等等Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer。
java示例代码以下:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));复制代码
scala:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
.print()复制代码
必须有的:
1.topic名称
2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema
3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还须要 “zookeeper.connect”)
java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour
//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<String> stream = env.addSource(myConsumer);复制代码
scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
val stream = env.addSource(myConsumer)复制代码
启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式按期检查其全部Kafka偏移以及其余操做的状态。若是做业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始从新使用Kafka的记录。
若是禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动按期偏移提交功能。
若是启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs复制代码
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs复制代码
Flink Kafka Consumer支持发现动态建立的Kafka分区,并使用一次性保证消费它们。
还能够使用正则:
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
...复制代码
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
...复制代码
在许多状况下,记录的时间戳(显式或隐式)嵌入记录自己。另外,用户可能想要周期性地或以不规则的方式发出水印。
咱们能够定义好Timestamp Extractors / Watermark Emitters,经过如下方式将其传递给您的消费者:
java
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();复制代码
scala
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
.addSource(myConsumer)
.print()复制代码
Kafka Producer 根据版本分别叫作FlinkProducer011 FlinkKafkaProducer010等等Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。
java
DataStream<String> stream = ...;
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);复制代码
scala
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)
stream.addSink(myProducer)复制代码
须要指定broker list , topic,序列化类。
自定义分区:默认状况下,将使用FlinkFixedPartitioner
将每一个Flink Kafka Producer并行子任务映射到单个Kafka分区。
能够实现FlinkKafkaPartitioner类自定义分区。
Flink1.9消费Kafka完整代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
//构建FlinkKafkaConsumer
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
//指定偏移量
myConsumer.setStartFromEarliest();
DataStream<String> stream = env
.addSource(myConsumer);
env.enableCheckpointing(5000);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
}复制代码