Structured Streaming最主要的生产环境应用场景就是配合kafka作实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本。就在前一个月,咱们才从0.9升级到0.10,终于能够尝试structured streaming的不少用法,很开心~html
若是是maven工程,直接添加对应的kafka的jar包便可:java
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency>
读取的时候,能够读取某个topic,也能够读取多个topic,还能够指定topic的通配符形式:sql
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
关于Kafka的offset,structured streaming默认提供了几种方式:apache
val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
读取后的数据的Schema是固定的,包含的列以下:json
Column | Type | 说明 |
---|---|---|
key | binary | 信息的key |
value | binary | 信息的value(咱们本身的数据) |
topic | string | 主题 |
partition | int | 分区 |
offset | long | 偏移值 |
timestamp | long | 时间戳 |
timestampType | int | 类型 |
不管是流的形式,仍是批的形式,都须要一些必要的参数:bootstrap
其余比较重要的参数有:数组
Apache kafka仅支持“至少一次”的语义,所以,不管是流处理仍是批处理,数据都有可能重复。好比,当出现失败的时候,structured streaming会尝试重试,可是不会肯定broker那端是否已经处理以及持久化该数据。可是若是query成功,那么能够判定的是,数据至少写入了一次。比较常见的作法是,在后续处理kafka数据时,再进行额外的去重,关于这点,其实structured streaming有专门的解决方案。服务器
保存数据时的schema:maven
下面是sink输出必需要有的参数:post
// 基于配置指定topic val ds = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .start() // 在字段中包含topic val ds = df .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .start()
跟流处理其实同样
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .save() df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .save()
针对Kafka的特殊处理,能够经过DataStreamReader.option进行设置。
关于(详细的kafka配置能够参考consumer的官方文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)
注意下面的参数是不能被设置的,不然kafka会抛出异常: