浪尖 浪尖聊大数据java
spark streaming消费kafka,你们都知道有两种方式,也是面试考基本功常问的:面试
a.基于receiver的机制。这个是spark streaming最基本的方式,spark streaming的receiver会定时生成block,默认是200ms,而后每一个批次生成blockrdd,分区数就是block数。架构以下:apache
b.direct API。这种api就是spark streaming会每一个批次生成一个kafkardd,而后kafkardd的分区数,由spark streaming消费的kafkatopic分区数决定。过程以下:
kafkardd与消费的kafka分区数的关系以下:bootstrap
2.常见积压问题api
kafka的producer生产数据到kafka,正常状况下,企业中应该是轮询或者随机,以保证kafka分区之间数据是均衡的。数组
在这个前提之下,通常状况下,假如针对你的数据量,kafka分区数设计合理。实时任务,如spark streaming或者flink,有没有长时间的停掉,那么通常不会有有积压。架构
消息积压的场景:框架
a.任务挂掉。好比,周五任务挂了,有没有写自动拉起脚本,周一早上才处理。那么spark streaming消费的数据至关于滞后两天。这个确实新手会遇到。ide
周末不加班,估计会被骂。函数
b.kafka分区数设少了。其实,kafka单分区生产消息的速度qps仍是很高的,可是消费者因为业务逻辑复杂度的不一样,会有不一样的时间消耗,就会出现消费滞后的状况。
c.kafka消息的key不均匀,致使分区间数据不均衡。kafka生产消息支持指定key,用key携带写信息,可是key要均匀,不然会出现kafka的分区间数据不均衡。
上面三种积压状况,企业中很常见,那么如何处理数据积压呢?
通常解决办法,针对性的有如下几种:
a.任务挂掉致使的消费滞后。
任务启动从最新的消费,历史数据采用离线修补。
最重要的是故障拉起脚本要有,还要就是实时框架异常处理能力要强,避免数据不规范致使的不能拉起。
b.任务挂掉致使的消费滞后。
任务启动从上次提交处消费处理,可是要增长任务的处理能力,好比增长资源,让任务能尽量的遇上消费最新数据。
c.kafka分区少了。
假设数据量大,直接增长kafka分区是根本,可是也能够对kafkardd进行repartition,增长一次shuffle。
d.个别分区不均衡。
能够生产者处能够给key加随机后缀,使其均衡。也能够对kafkardd进行repartition。
3.浪尖的骚操做
其实,以上都不是你们想要的,由于spark streaming生产的kafkardd的分区数,彻底能够是大于kakfa分区数的。
其实,常常阅读源码或者星球的看过浪尖的源码视频的朋友应该了解,rdd的分区数,是由rdd的getPartitions函数决定。好比kafkardd的getPartitions方法实现以下:
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) }.toArray }
offsetRanges其实就是一个数组:
val offsetRanges: Array[OffsetRange],
OffsetRange存储一个kafka分区元数据及其offset范围,而后进行map操做,转化为KafkaRDDPartition。实际上,咱们能够在这里下手,将map改成flatmap,而后对offsetrange的范围进行拆分,可是这个会引起一个问题,浪尖在这里就不赘述了,你能够测测。
其实,咱们能够在offsetRange生成的时候作下转换。位置是DirectKafkaInputDstream的compute方法。具体实现:
首先,浪尖实现中增长了三个配置,分别是:
是否开启自动重分区分区 sparkConf.set("enable.auto.repartition","true") 避免没必要要的重分区操做,增长个阈值,只有该批次要消费的kafka的分区内数据大于该阈值才进行拆分 sparkConf.set("per.partition.offsetrange.threshold","300") 拆分后,每一个kafkardd 的分区数据量。 sparkConf.set("per.partition.after.partition.size","100")
而后,在DirectKafkaInputDstream里获取着三个配置,方法以下:
val repartitionStep = _ssc.conf.getInt("per.partition.offsetrange.size",1000) val repartitionThreshold = _ssc.conf.getLong("per.partition.offsetrange.threshold",1000) val enableRepartition = _ssc.conf.getBoolean("enable.auto.repartition",false) 对offsetRanges生成的过程进行改造,只须要增长7行源码便可。 val offsetRanges = untilOffsets.flatMap{ case (tp, uo) => val fo = currentOffsets(tp) val delta = uo -fo if(enableRepartition&&(repartitionThreshold < delta)){ val offsets = fo to uo by repartitionStep offsets.map(each =>{ val tmpOffset = each + repartitionStep OffsetRange(tp.topic, tp.partition, each, Math.min(tmpOffset,uo)) }).toList }else{ Array(OffsetRange(tp.topic, tp.partition, fo, uo)) } }
测试的主函数以下:
import bigdata.spark.config.Config import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /* 1. 直接消费新数据,数据离线修补。 2. repartition(10---->100),给足够多的资源,以便任务逐渐消除滞后的数据。 3. directDstream api 生成的是kafkardd,该rdd与kafka分区一一对应。 */ object kafka010Repartition { def main(args: Array[String]) { // 建立一个批处理时间是2s的context 要增长环境变量 val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") sparkConf.set("enable.auto.repartition","true") sparkConf.set("per.partition.offsetrange.threshold","300") sparkConf.set("per.partition.offsetrange.step","100") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 使用broker和topic建立DirectStream val topicsSet = "test1".split(",").toSet val kafkaParams = Map[String, Object]("bootstrap.servers" -> Config.kafkaHost, "key.deserializer"->classOf[StringDeserializer], "value.deserializer"-> classOf[StringDeserializer], "group.id"->"test1", "auto.offset.reset" -> "earliest", "enable.auto.commit"->(false: java.lang.Boolean)) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) messages.transform(rdd=>{ println("partition.size : "+rdd.getNumPartitions) rdd }).foreachRDD(rdd=>{ // rdd.foreachPartition(each=>println(111)) val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach(o=>{ println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") }) }) ssc.start() ssc.awaitTermination() } }
结果以下:
partition.size : 67 test1 0 447 547 test1 0 547 647 test1 0 647 747 test1 0 747 847 test1 0 847 947 test1 0 947 1047 test1 0 1047 1147 test1 0 1147 1247 test1 0 1247 1347 test1 0 1347 1447 test1 0 1447 1547 test1 0 1547 1647 test1 0 1647 1747 test1 0 1747 1847 test1 0 1847 1947 test1 0 1947 2047 test1 0 2047 2147 test1 0 2147 2247 test1 0 2247 2347 test1 0 2347 2447 test1 0 2447 2547 test1 0 2547 2647 test1 0 2647 2747 test1 0 2747 2847 test1 0 2847 2947 test1 0 2947 3047 test1 0 3047 3147 test1 0 3147 3247 test1 0 3247 3347 test1 0 3347 3447 test1 0 3447 3547 test1 0 3547 3647 test1 0 3647 3747 test1 0 3747 3847 test1 0 3847 3947 test1 0 3947 4047 test1 0 4047 4147 test1 0 4147 4247 test1 0 4247 4347 test1 0 4347 4447 test1 0 4447 4547 test1 0 4547 4647 test1 0 4647 4747 test1 0 4747 4847 test1 0 4847 4947 test1 0 4947 5047 test1 0 5047 5147 test1 0 5147 5247 test1 0 5247 5347 test1 0 5347 5447 test1 0 5447 5547 test1 0 5547 5647 test1 0 5647 5747 test1 0 5747 5847 test1 0 5847 5947 test1 0 5947 6047 test1 0 6047 6147 test1 0 6147 6247 test1 0 6247 6347 test1 0 6347 6447 test1 0 6447 6547 test1 0 6547 6647 test1 0 6647 6747 test1 0 6747 6847 test1 0 6847 6947 test1 0 6947 7047 test1 0 7047 7124
【完】