前文有提到过Spark Streaming事务是如何保证exactly once的语义的。app
从spark core程序来说,读取固定数据来源好比hdfs中,spark只是作为一个计算框架。负载均衡
而在流处理中,只是多了一个时间维度。框架
若在某一时刻,知道所需处理数据的来源,直接读取,而不用被动的接收(Receiver),那就是和普通的Spark 程序没什么差异了。函数
本文将着重Kafka中direct方式的读取,以案例切入,跟踪源码分析。源码分析
入口是KafkaUtils,先建立了一个回调函数定义,再获取到kafka集群,并获取到起始偏移量,最后建立一个DirectKafkaInputDStream,用于建立RDD。spa
// KafkaUtils.scala line 473 def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
KafkaCluster 在实例化时没有任何动做,只是单纯的建立对象。.net
接下来获取每一个partition的偏移量,scala
DStream建立以后,整个DAG回溯的lineage以下:code
DirectKafkaInputDStream -> MappedDStream > FlatMappedDStream -> MappedDStream -> ShuffledDStream -> ForEachDStream对象
当DAG回溯到DirectKafkaInputDStream时,会调用compute。建立KafkaRDD,而且将最新的偏移量保存,以便下次计算新的偏移量。
从当RDD进入计算时,会调用compute。,此处的offsetRanges就是Kafka的TopicAndPartition和对应的偏移量。最后结果就是kafka有多少个partition,spark就会有多少个partition与之对应。
直接抓Kafka数据的方式与Receiver的方式的对比: