import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by hadoop on 2016/4/18. * 背景描述 * * 一、DT大数据梦工厂微信公众号DT_Spark * 二、IMF晚8点大数据实战YY直播频道号:68917580 * 三、新浪微博:http://www.weibo.com/ilovepains */ object HostSerachTop extends App{ val conf = new SparkConf().setAppName("SparkStreamingOnHDFS") if(args.length == 0) conf.setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(30)) val blackList = Array(("hadoop", true) , ("mahout", true), ("spark", false)) val backListRDD = ssc.sparkContext.parallelize(blackList) val hosttedStream = ssc.socketTextStream("192.168.74.132", 9000, StorageLevel.MEMORY_AND_DISK_SER_2) val searchWord = hosttedStream.map(_.split(" ")(1)).map(item => (item, 1)) //val hostedWords = searchWord.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60)) val hostedWords = searchWord.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,(v1: Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20)) hostedWords.transform(item => { val top5 = item.map(pair => (pair._2, pair._1)) .sortByKey(false) .map(pair => (pair._1, pair._2)) top5 }).print() /** * 用户搜索的格式简化为 name item */ ssc.start(10) ssc.awaitTermination() ssc.stop(true, true) }
咱们从print方法入口,从源码出发:apache
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { //做用于RDD上的函数 def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } //生成ForEachDStream而且注册输出流 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
继续跟踪源码:微信
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { //生成ForEachDStream,而且传入foreachFunc(就是以前咱们调用print方法中的foreachFunc,以后会做用于RDD上) new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
咱们继续看ForEachDStream中的generateJob中的方法,能够看到这里是做用于RDD之上的,对RDD调用action算子就会触发job的提交,到此咱们有了初步的印象,ForEachDStream是最终的RDD,咱们传入的行动算子会做用于ForEachDStream中的RDD,下面咱们深刻的探讨这个RDD是怎么生成的。app
从上面的探讨咱们知道Dstream最终生成了RDD用来触发做业的执行:socket
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } //将做用于RDD上的函数封装成一个JOB,因此咱们若是不对RDD调用RDD的行动算子是不会向SparkContext提交任务的 //Dstream只负责生成RDD并封装成JOB Some(new Job(time, jobFunc)) case None => None } }
前面咱们探讨了generateJob会被JobScheduler调用并执行JOB中的函数,也就是jobFunc这个函数,也就是说生成任务的时候RDD已经生成了而且是最后一个RDD。咱们关注parent.getOrCompute这个方法:ide
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { //继续回溯调用compute方法 PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } //生成RDD,注意每一个Dstream都有本身的RDD generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
以咱们的代码为为例最后一个Dstream是ForEachDStream,而它依赖的Dstream是ShuffledDStream..图:函数
最终落到SocketInputDstream,而每一个Dstream调用compute方法时都是调用Dstream中的getOrCompute,这样回溯Dstream,也就是每一个Dstream都有本身的generatedRDDs,也就是说咱们对Dstream上的操做,最终生成了RDD,从最开始的RDD开始,做用一个一个算子,例如map、flatMap等函数,生成了对应的RDD,而咱们最终只须要最后一个RDD就能计算了,RDD记录了Lineage。下面咱们研究generatedRDDs究竟是怎么被实例化的?oop
因为数据来源的Dstream是ReceiverInputDStream,咱们从ReceiverInputDStream开始,当咱们对ReceiverInputDStream调用compute方法时,会调用它的父类的ReceiverInputDStream中的compute方法:大数据
/** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) //根据BlockInfos 生成RDD,BlockInfos 信息是receiverTracker中保存的 // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
createBlockRDD:ui
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } }
上面就是最开始的RDD了,咱们思考一个问题算子怎么做用到最开始的RDD之上?this
class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
也就是说每次调用父类的compute函数的时候都会生成当前Dstream的RDD,而且做用于咱们传入的算子,返回子类须要的RDD,最终咱们对最后一个RDD调用行动算子来,触发做业的提交!