https://blog.csdn.net/rlnLo2pNEfx9c/article/details/79648890 css
SparkStreaming与kafka010整合html
读本文以前。请先阅读以前文章:java
必读:再讲Spark与kafka 0.8.2.1+整合
apache
Spark Streaming与kafka 0.10的整合,和0.8版本号的direct Stream方式很是像。Kafka的分区和spark的分区是一一相应的,可以获取offsets和元数据。bootstrap
API使用起来没有显著的差异。这个整合版本号标记为experimental。因此API有可能改变。api
project依赖缓存
首先,加入依赖。安全
groupId = org.apache.sparksession
artifactId = spark-streaming-kafka-0-10_2.11app
version = 2.2.1
不要手动加入org.apache.kafka相关的依赖。如kafka-clients。
spark-streaming-kafka-0-10已经包括相关的依赖了,不一样的版本号会有不一样程度的不兼容。
代码案例
首先导入包正确的包org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
ssc = new StreamingContext(sparkConf, Milliseconds(1000))
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
kafka的參数,请參考kafka官网。假设。你的spark批次时间超过了kafka的心跳时间(30s),需要添加heartbeat.interval.ms和session.timeout.ms。好比。批处理时间是5min,那么就需要调整group.max.session.timeout.ms。注意。样例中是将enable.auto.commit设置为了false。
LocationStrategies(本地策略)
新版本号的消费者API会预取消息入buffer。
所以,为了提高性能,在Executor端缓存消费者(而不是每个批次又一次建立)是很是有必要的,优先调度那些分区到已经有了合适消费者主机上。
在很是多状况下,你需要像上文同样使用LocationStrategies.PreferConsistent,这个參数会将分区尽可能均匀地分配到所有的可以Executor上去。
假设。你的Executor和kafka broker在同一台机器上,可以用PreferBrokers。这将优先将分区调度到kafka分区leader所在的主机上。最后,分区间负荷有明显的倾斜,可以用PreferFixed。这个赞成你指定一个明白的分区到主机的映射(没有指定的分区将会使用连续的地址)。
消费者缓存的数目默认最大值是64。假设你但愿处理超过(64*excutor数目)kafka分区。spark.streaming.kafka.consumer.cache.maxCapacity这个參数可以帮助你改动这个值。
假设你想禁止kafka消费者缓存,可以将spark.streaming.kafka.consumer.cache.enabled改动为false。
禁止缓存缓存可能需要解决SPARK-19185描写叙述的问题。一旦这个bug解决。这个属性将会在后期的spark版本号中移除。
Cache是依照topicpartition和groupid进行分组的,因此每次调用creaDirectStream的时候要单独设置group.id。
ConsumerStrategies(消费策略)
新的kafka消费者api有多个不一样的方法去指定消费者,当中有些方法需要考虑post-object-instantiation设置。
ConsumerStrategies提供了一个抽象,它赞成spark可以得到正确配置的消费者。即便从Checkpoint从新启动以后。
ConsumerStrategies.Subscribe,如上面展现的同样,赞成你订阅一组固定的集合的主题。
SubscribePattern赞成你使用正则来指定本身感兴趣的主题。注意,跟0.8整合不一样的是,使用subscribe或者subscribepattern在执行stream期间应相应到加入分区。
事实上,Assign执行你指定固定分区的集合。这三种策略都有重载构造函数。赞成您指定特定分区的起始偏移量。
ConsumerStrategy是一个public类。赞成你进行本身定义策略。
建立kafkaRDD
类似于spark streaming的批处理,现在你可以经过指定本身定义偏移范围本身建立kafkaRDD。
def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
val kp = new JHashMap[String, Object]()
kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
kp.put("key.deserializer", classOf[StringDeserializer])
kp.put("value.deserializer", classOf[StringDeserializer])
kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
extra.foreach(e => kp.put(e._1, e._2))
kp
}
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
注意。在这里是不能使用PreferBrokers的。因为不是流处理的话就没有driver端的消费者帮助你寻找元数据。必须使用PreferFixed,而后本身指定元数据
你们可以进入createRDD里面。看其源代码。事实上就是依据你的參数封装成了RDD,跟流式批处理是一致的。
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]] = {
val preferredHosts = locationStrategy match {
case PreferBrokers =>
throw new AssertionError(
"If you want to prefer brokers, you must provide a mapping using PreferFixed " +
"A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
case PreferFixed(hostMap) => hostMap
}
val kp = new ju.HashMap[String, Object](kafkaParams)
fixKafkaParams(kp)
val osr = offsetRanges.clone()
new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
}
获取偏移
Spark Streaming与kafka整合是执行你获取其消费的偏移的,详细方法例如如下:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
注意。HashOffsetRanges只在spark计算链条的開始才干类型转换成功。要知道kafka分区和spark分区的一一相应关系在Shuffle后就会丧失,比方reduceByKey()或者window()。
存储偏移
Kafka在有可能存在任务失败的状况下的从消息传输语义(至少一次。最多一次,刚好一次)是取决于什么时候存储offset。Spark输出操做是至少一次传输语义。因此,假设你想实现只一次的消费语义,你必需要么在密等输出后存储offset,要么就是offset的存储和结果输出是一次事务。
现在kafka有了3种方式,来提升可靠性(以及代码复杂性),用于存储偏移量。
1, Checkpoint
假设使能了Checkpoint,offset被存储到Checkpoint。
这个尽管很是easy作到,但是也有一些缺点。因为会屡次输出结果,因此结果输出必须是知足幂等性。
同一时候事务性不可选。另外,假设代码变动,你是不可以从Checkpoint恢复的。针对代码升级更新操做,你可以同一时候执行你的新任务和旧任务(因为你的输出结果是幂等性)。对于之外的故障,并且同一时候代码变动了,确定会丢失数据的,除非另有方式来识别启动消费的偏移。
2。 Kafka自身
Kafka提供的有api。可以将offset提交到指定的kafkatopic。默认状况下,新的消费者会周期性的本身主动提交offset到kafka。但是有些状况下,这也会有些问题,因为消息可能已经被消费者从kafka拉去出来。但是spark还没处理,这样的状况下会致使一些错误。
这也是为何样例中stream将enable.auto.commit设置为了false。
然而在已经提交spark输出结果以后。你可以手动提交偏移到kafka。
相对于Checkpoint,offset存储到kafka的优势是:kafka既是一个容错的存储系统,也是可以避免代码变动带来的麻烦。提交offset到kafka和结果输出也不是一次事务,因此也要求你的输出结果是知足幂等性。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
因为带有HasOffsetRanges。到CanCommitOffsets的转换将会在刚执行createDirectStream以后成功,而不是通过各类操做算子后。
commitAsync是线程安全的。必须在结果提交后进行执行。
3。 本身定义存储位置
对于输出解雇支持事务的状况,可以将offset和输出结果在同一个事务内部提交,这样即便在失败的状况下也可以保证二者同步。
假设您关心检測反复或跳过的偏移范围。回滚事务可以防止反复或丢失的消息。
这至关于一次语义。也可以使用这样的策略,甚至是聚合所产生的输出,聚合产生的输出通常是很是难生成幂等的。代码演示样例
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
SSL/TLS配置使用
新的kafka消费者支持SSL。只需要在执行createDirectStream / createRDD以前设置kafkaParams。
注意。这只应用与Spark和kafkabroker之间的通信。仍然负责分别确保节点间通讯的安全。
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Spark相关书籍,请进入浪尖微店。
推荐阅读: