做者:个推数据研发工程师 学长网络
随着大数据的快速发展,业务场景愈来愈复杂,离线式的批处理框架MapReduce已经不能知足业务,大量的场景须要实时的数据处理结果来进行分析、决策。Spark Streaming是一种分布式的大数据实时计算框架,他提供了动态的,高吞吐量的,可容错的流式数据处理,不只能够实现用户行为分析,还能在金融、舆情分析、网络监控等方面发挥做用。个推开发者服务——消息推送“应景推送”正是应用了Spark Streaming技术,基于大数据分析人群属性,同时利用LBS地理围栏技术,实时触发精准消息推送,实现用户的精细化运营。此外,个推在应用Spark Streaming作实时处理kafka数据时,采用Direct模式代替Receiver模式的手段,实现了资源优化和程序稳定性提高。架构
本文将从Spark Streaming获取kafka数据的两种模式入手,结合个推实践,带你解读Receiver和Direct模式的原理和特色,以及从Receiver模式到Direct模式的优化对比。 框架
该模式下:分布式
与receiver模式相似,不一样在于executor中没有receiver组件,从kafka拉去数据的方式不一样。性能
该模式下:测试
个推使用Spark Streaming作实时处理kafka数据,先前使用的是receiver模式;大数据
receiver有如下特色:优化
因为以上特色,receiver模式下会形成必定的资源浪费;使用checkpoint保存状态, 若是须要升级程序,则会致使checkpoint没法使用;第3点receiver模式下会致使程序不太稳定;而且若是设置receiver数量不合理也会形成性能瓶颈在receiver。为了优化资源和程序稳定性,应将receiver模式改形成direct模式。spa
1. 修改InputDStream的建立 code
将receiver的:
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
改为direct的:
val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
2. 手动维护offset
receiver模式代码:
(receiver模式不须要手动维护offset,而是内部经过kafka consumer high level API 提交到kafka/zk保存)
kafkaStream.map { ... }.foreachRDD { rdd => // 数据处理 doCompute(rdd) }
direct模式代码:
directKafkaStream.map { ... }.foreachRDD { rdd => // 获取当前rdd数据对应的offset val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 数据处理 doCompute(rdd) // 本身实现保存offset commitOffsets(offsetRanges) }
1. 在receiver模式下:
spark.streaming.backpressure.enabled 含义: 是否启用 SparkStreaming内部的backpressure机制, 默认值:false ,表示禁用 spark.streaming.backpressure.initialRate 含义: receiver 为第一个batch接收数据时的比率 spark.streaming.receiver.maxRate 含义: receiver接收数据的最大比率,若是设置值<=0, 则receiver接收数据比率不受限制 spark.streaming.kafka.maxRatePerPartition 含义: 从每一个kafka partition中读取数据的最大比率
8. speculation机制
spark内置speculation机制,推测job中的运行特别慢的task,将这些task kill,并从新调度这些task执行。 默认speculation机制是关闭的,经过如下配置参数开启:
spark.speculation=true
注意:在有些状况下,开启speculation反而效果很差,好比:streaming程序消费多个topic时,从kafka读取数据直接处理,没有从新分区,这时若是多个topic的partition的数据量相差较大那么可能会致使正常执行更大数据量的task会被认为执行缓慢,而被中途kill掉,这种状况下可能致使batch的处理时间反而变长;能够经过repartition来解决这个问题,可是要衡量repartition的时间;而在streaming程序中由于batch时间特别短,因此数据量通常较小,因此repartition的时间短,不像spark_batch一次处理大量数据一旦repartition则会特别久,因此最终仍是要根据具体状况测试来决定。
将Receiver模式改为Direct模式,实现了资源优化,提高了程序的稳定性,缺点是须要本身管理offset,操做相对复杂。将来,个推将不断探索和优化Spark Streaming技术,发挥其强大的数据处理能力,为建设实时数仓提供保障。