以前文章 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用到了 Flink 自带的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一种状况,那么若是咱们有多个地方须要这份经过 Flink 转换后的数据,是否是又要咱们继续写个 sink 的插件呢?确实,因此 Flink 里面就默认支持了很多 sink,好比也支持 Kafka sink connector(FlinkKafkaProducer),那么这篇文章咱们就讲讲如何将数据写入到 Kafka。java
Flink 里面支持 Kafka 0.八、0.九、0.十、0.11 ,之后有时间能够分析下源码的实现。git
这里咱们须要安装下 Kafka,请对应添加对应的 Flink Kafka connector 依赖的版本,这里咱们使用的是 0.11 版本:github
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency>
这里就不写这块内容了,能够参考我之前的文章 Kafka 安装及快速入门。apache
这里咱们演示把其余 Kafka 集群中 topic 数据原样写入到本身本地起的 Kafka 中去。微信
kafka.brokers=xxx:9092,xxx:9092,xxx:9092 kafka.group.id=metrics-group-test kafka.zookeeper.connect=xxx:2181 metrics.topic=xxx stream.parallelism=5 kafka.sink.brokers=localhost:9092 kafka.sink.topic=metric-test stream.checkpoint.interval=1000 stream.checkpoint.enable=false stream.sink.parallelism=5
目前咱们先看下本地 Kafka 是否有这个 metric-test topic 呢?须要执行下这个命令:学习
bin/kafka-topics.sh --list --zookeeper localhost:2181
能够看到本地的 Kafka 是没有任何 topic 的,若是等下咱们的程序运行起来后,再次执行这个命令出现 metric-test topic,那么证实个人程序确实起做用了,已经将其余集群的 Kafka 数据写入到本地 Kafka 了。ui
Main.javaspa
public class Main { public static void main(String[] args) throws Exception{ final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env); data.addSink(new FlinkKafkaProducer011<Metrics>( parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new MetricSchema() )).name("flink-connectors-kafka") .setParallelism(parameterTool.getInt("stream.sink.parallelism")); env.execute("flink learning connectors kafka"); } }
启动程序,查看运行结果,不段执行上面命令,查看是否有新的 topic 出来:插件
执行命令能够查看该 topic 的信息:code
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test
上面代码咱们使用 Flink Kafka Producer 只传了三个参数:brokerList、topicId、serializationSchema(序列化)
其实也能够传入多个参数进去,如今有的参数用的是默认参数,由于这个内容比较多,后面能够抽出一篇文章单独来说。
本篇文章写了 Flink 读取其余 Kafka 集群的数据,而后写入到本地的 Kafka 上。我在 Flink 这层没作什么数据转换,只是原样的将数据转发了下,若是大家有什么其余的需求,是能够在 Flink 这层将数据进行各类转换操做,好比这篇文章中的一些转换:《从0到1学习Flink》—— Flink Data transformation(转换),而后将转换后的数据发到 Kafka 上去。
本文原创地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未经容许禁止转载。
微信公众号:zhisheng
另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号了。你能够加个人微信:zhisheng_tian,而后回复关键字:Flink 便可无条件获取到。
https://github.com/zhisheng17/flink-learning/
之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客
一、《从0到1学习Flink》—— Apache Flink 介绍
二、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
三、《从0到1学习Flink》—— Flink 配置文件详解
四、《从0到1学习Flink》—— Data Source 介绍
五、《从0到1学习Flink》—— 如何自定义 Data Source ?
六、《从0到1学习Flink》—— Data Sink 介绍
七、《从0到1学习Flink》—— 如何自定义 Data Sink ?
八、《从0到1学习Flink》—— Flink Data transformation(转换)
九、《从0到1学习Flink》—— 介绍Flink中的Stream Windows
十、《从0到1学习Flink》—— Flink 中的几种 Time 详解
十一、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch