Kafka 0.10的Spark Streaming集成设计与0.8 Direct Stream方法相似。 它提供了简单的并行性,Kafka分区和Spark分区之间的1:1对应关系,以及对偏移量和元数据的访问。 可是,因为较新的集成使用新的Kafka消费者API而不是简单的API,因此在使用上存在显着差别。 这个版本的集成被标记为实验,因此API可能会有变化。java
对于使用SBT / Maven项目定义的Scala / Java应用程序,请将您的流应用程序与如下工件连接起来。正则表达式
1 groupId = org.apache.spark 2 artifactId = spark-streaming-kafka-0-10_2.11 3 version = 2.2.0
不要在org.apache.kafka构件上手动添加依赖项(例如kafka-clients)。 spark-streaming-kafka-0-10已经具备适当的传递依赖性,不一样的版本可能在诊断的方式上不兼容。apache
请注意,导入的名称空间包含版本org.apache.spark.streaming.kafka010编程
1 import org.apache.kafka.clients.consumer.ConsumerRecord 2 import org.apache.kafka.common.serialization.StringDeserializer 3 import org.apache.spark.streaming.kafka010._ 4 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 5 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 6 7 val kafkaParams = Map[String, Object]( 8 "bootstrap.servers" -> "localhost:9092,anotherhost:9092", 9 "key.deserializer" -> classOf[StringDeserializer], 10 "value.deserializer" -> classOf[StringDeserializer], 11 "group.id" -> "use_a_separate_group_id_for_each_stream", 12 "auto.offset.reset" -> "latest", 13 "enable.auto.commit" -> (false: java.lang.Boolean) 14 ) 15 16 val topics = Array("topicA", "topicB") 17 val stream = KafkaUtils.createDirectStream[String, String]( 18 streamingContext, 19 PreferConsistent, 20 Subscribe[String, String](topics, kafkaParams) 21 ) 22 23 stream.map(record => (record.key, record.value))
流中的每一个项目都是一个ConsumerRecord(消费者记录)bootstrap
有关可能的kafkaParams,请参阅Kafka使用者配置文档。 若是您的Spark批处理持续时间大于默认Kafka心跳会话超时(30秒),请适当增长heartbeat.interval.ms和session.timeout.ms。 对于大于5分钟的批次,这将须要更改代理上的group.max.session.timeout.ms。 请注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的“存储偏移量”。缓存
新的Kafka消费者API将预取消息到缓冲区中。所以,性能方面的缘由是Spark集成将缓存的消费者保留在执行者上(而不是为每一个批次从新建立它们),而且倾向于在具备相应使用者的主机位置上调度分区。安全
在大多数状况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这会将分区平均分配给可用的执行者。若是您的执行者(executors )与您的Kafka经纪人(brokers)位于同一个主机上,请使用PreferBrokers,它将优先为该分区的Kafka领导安排分区。最后,若是分区间负载存在明显偏移,请使用PreferFixed。这容许您指定分区到主机的明确映射(任何未指定的分区将使用一致的位置)。session
消费者缓存的默认最大大小为64.若是您但愿处理多于(执行者数量为64 *)的Kafka分区,则能够经过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。ide
若是您想禁用Kafka使用者的缓存,则能够将spark.streaming.kafka.consumer.cache.enabled设置为false。可能须要禁用缓存来解决SPARK-19185中描述的问题。 SPARK-19185解决后,该属性可能会在更高版本的Spark中删除。函数
缓存由topicpartition和group.id键入,所以每次调用createDirectStream时都要使用一个单独的group.id。
新的Kafka消费者API有许多不一样的方式来指定主题,其中一些须要大量的后对象实例化设置。 ConsumerStrategies提供了一个抽象,容许Spark从检查点从新启动后便可得到正确配置的使用者。
ConsumerStrategies.Subscribe,如上所示,容许您订阅固定的主题集合。 SubscribePattern容许您使用正则表达式来指定感兴趣的主题。 请注意,与0.8集成不一样,使用Subscribe或SubscribePattern应该在正在运行的流中添加分区。 最后,Assign容许你指定一个固定的分区集合。 全部这三种策略都有重载的构造函数,容许您指定特定分区的起始偏移量。
若是您具备以上选项不能知足的特定消费者设置需求,则ConsumerStrategy是能够扩展的公共类。
若是您有一个更适合批处理的用例,则能够为已定义的偏移范围建立一个RDD。
1 // Import dependencies and create kafka params as in Create Direct Stream above 2 3 val offsetRanges = Array( 4 // topic, partition, inclusive starting offset, exclusive ending offset 5 OffsetRange("test", 0, 0, 100), 6 OffsetRange("test", 1, 0, 100) 7 ) 8 9 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
请注意,您不能使用PreferBrokers,由于若是没有流,则不存在驱动程序方面的消费者为您自动查找代理元数据。 若有必要,请使用PreferFixed与您本身的元数据查找。
1 stream.foreachRDD { rdd => 2 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 3 rdd.foreachPartition { iter => 4 val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 5 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 6 } 7 }
请注意,HasOffsetRanges的类型转换只会在createDirectStream的结果中调用的第一个方法中完成,而不是稍后向下的一系列方法。 请注意,RDD分区与Kafka分区之间的一对一映射在任何混洗或从新分区的方法(例如, reduceByKey()或window()。
Kafka交付语义在失败的状况下取决于如何以及什么时候存储偏移量。 spark输出操做至少一次。 因此,若是你想要至关于一次的语义,你必须在幂等输出以后存储偏移量,或者在输出中存储原子事务的偏移量。 经过这种集成,您能够选择3个选项,以提升可靠性(和代码复杂度),以便如何存储偏移量。
若是启用了Spark检查点,偏移量将被存储在检查点中。 这很容易启用,但也有缺点。 你的输出操做必须是幂等的,由于你将获得重复的输出; 转换不是一种选择。 此外,若是应用程序代码已更改,则没法从检查点恢复。 对于计划升级,能够经过在旧代码的同时运行新代码来缓解这种状况(由于不管如何输出须要是幂等的,它们不该该发生冲突)。 可是对于须要更改代码的意外故障,除非您有另外的方法来识别已知的良好起始偏移量,不然将会丢失数据。
Kafka有一个偏移提交API,用于在特殊的Kafka主题中存储偏移量。 默认状况下,新的使用者将按期自动提交偏移量。 这几乎确定不是你想要的,由于由消费者成功轮询的消息可能尚未致使Spark输出操做,致使未定义的语义。 这就是为何上面的流示例将“enable.auto.commit”设置为false的缘由。 可是,在知道输出已存储以后,可使用commitAsync API将偏移量提交给Kafka。 与检查点相比,好处在于,不管应用程序代码如何变化,Kafka都是耐用的商店。 然而,Kafka不是转换型的,因此你的输出仍然是幂等的。
1 stream.foreachRDD { rdd => 2 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 3 4 // some time later, after outputs have completed 5 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) 6 }
和HasOffsetRanges同样,若是调用createDirectStream的结果,而不是在转换以后,转换为CanCommitOffsets将会成功。 commitAsync调用是线程安全的,可是若是你想要有意义的语义,则必须在输出以后进行。
对于支持事务的数据存储,即便在出现故障的状况下,也能够在同一个事务中保存偏移量,以保持二者同步。 若是仔细检测重复或跳过的偏移量范围,回滚事务将防止重复或丢失的消息影响结果。 这给出了至关于一次的语义。 甚至可使用这种策略,即便对于一般难以产生幂等性的聚合产生的输出也是如此。
1 // The details depend on your data store, but the general idea looks like this 2 3 // begin from the the offsets committed to the database 4 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => 5 new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") 6 }.toMap 7 8 val stream = KafkaUtils.createDirectStream[String, String]( 9 streamingContext, 10 PreferConsistent, 11 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) 12 ) 13 14 stream.foreachRDD { rdd => 15 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 16 17 val results = yourCalculation(rdd) 18 19 // begin your transaction 20 21 // update results 22 // update offsets where the end of existing offsets matches the beginning of this batch of offsets 23 // assert that offsets were updated correctly 24 25 // end your transaction 26 }
新的Kafka使用者支持SSL。 要启用它,请在传递给createDirectStream / createRDD以前适当地设置kafkaParams。 请注意,这仅适用于Spark和Kafkabroker之间的沟通; 您仍负责单独确保Spark节点间通讯。
1 val kafkaParams = Map[String, Object]( 2 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS 3 "security.protocol" -> "SSL", 4 "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks", 5 "ssl.truststore.password" -> "test1234", 6 "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks", 7 "ssl.keystore.password" -> "test1234", 8 "ssl.key.password" -> "test1234" 9 )
与任何Spark应用程序同样,spark-submit用于启动您的应用程序。
对于Scala和Java应用程序,若是您使用SBT或Maven进行项目管理,请将spark-streaming-kafka-0-10_2.11及其依赖项打包到应用程序JAR中。 确保spark-core_2.11和spark-streaming_2.11被标记为提供的依赖关系,由于这些已经存在于Spark安装中。 而后使用spark-submit启动您的应用程序(请参阅主编程指南中的部署)。