上文已经从源码分析了Receiver接收的数据交由BlockManager管理,整个数据接收流都已经运转起来了,那么让咱们回到分析JobScheduler的博客中。apache
// JobScheduler.scala line 62 def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
前面好几篇博客都是 由 receiverTracker.start() 延展开。延展完毕后,继续下一步。app
// JobScheduler.scala line 83 jobGenerator.start()
jobGenerator的实例化过程,前面已经分析过。深刻下源码了解到。socket
// JobGenerator.scala line 78 /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
// JobGenerator.scala line 189 /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
将DStreamGraph.startide
// DStreamGraph.scala line 39 def start(time: Time) { this.synchronized { require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart) inputStreams.par.foreach(_.start()) } }
至此,只是作了一些简单的初始化,并无让数据处理起来。函数
再回到JobGenerator。此时,将循环定时器启动,oop
// JobGenerator.scala line 193 timer.start(startTime.milliseconds)
循环定时器启动;读者是否是很熟悉,是否是在哪见过这个循环定时器?源码分析
没错,就是BlockGenerator.scala line 105 、109 ,两个线程,其中一个是循环定时器,定时将数据放入待push队列中。post
// RecurringTimer.scala line 59 def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime }
具体的逻辑是在构造是传入的方法:longTime => eventLoop.post(GenerateJobs(new Time(longTime)));ui
输入是Long,this
方法体是eventLoop.post(GenerateJobs(new Time(longTime)))
// JobGenerator.scala line 58 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
只要线程状态不是stopped,一直循环。
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { // RecurringTimer.scala line 27 private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } // RecurringTimer.scala line 56 /** * Start at the given start time. */ def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } // RecurringTimer.scala line 92 private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } // RecurringTimer.scala line 100 /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } // ...一些代码 }
定时发送GenerateJobs 类型的事件消息,eventLoop.post中将事件消息加入到eventQueue中
// EventLoop.scala line 102 def post(event: E): Unit = { eventQueue.put(event) }
同时,此EventLoop中的另外一个成员变量 eventThread。会一直从队列中取事件消息,将此事件做为参数调用onReceive。而此onReceive在实例化时被override了。
// JobGenerator.scala line 86 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start()
onReceive调用的是
// JobGenerator.scala line 177 /** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) // 其余case class } }
GenerateJobs case class 是匹配到 generateJobs(time:Time) 来处理
// JobGenerator.scala line 240 /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
上述代码不是特别容易理解。细细拆分:咋一看觉得是try{} catch{case ... },仔细一看,是Try{}match{}
追踪下代码,原来Try是大写的,是一个伴生对象,apply接收的参数是一个方法,返回Try的实例。在scala.util.Try.scala 代码以下:
// scala.util.Try.scala line 155 object Try { /** Constructs a `Try` using the by-name parameter. This * method will ensure any non-fatal exception is caught and a * `Failure` object is returned. */ def apply[T](r: => T): Try[T] = try Success(r) catch { case NonFatal(e) => Failure(e) } }
Try有两个子类,都是case class 。分别是Success和Failure。如图。
再返回调用处,Try中的代码块最后执行的是 graph.generateJobs(time) 。跟踪下:
返回的是outputStream.generateJob(time)。
// DStreamGraph.scala line 111 def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
从前文可知,outputStream其实都是ForEachDStream。进入ForEachDStream,override了generateJob。
// ForEachDStream.scala line 46 override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
那么ForEachDStream的parent是什么呢?看下咱们的案例:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据 val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce words.print() // 打印结果 ssc.start() // 启动 ssc.awaitTermination() ssc.stop(true) } }
按照前文的描述:本例中 DStream的依赖是 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream
笔者扫描了下DStream及其全部子类,发现只有DStream有 getOrCompute,没有一个子类override了此方法。如此一来,是ShuffledDStream.getorCompute
在通常状况下,是RDD不存在,执行orElse代码快,
// DStream.scala line 338 /** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) // line 352 } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
ShuffledDStream.compute
又调用parent.getOrCompute
// ShuffledDStream.scala line 40 override def compute(validTime: Time): Option[RDD[(K, C)]] = { parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } }
MappedDStream的compute,又是父类的getOrCompute,结果又调用compute,如此循环。
// MappedDStream.scala line 34 override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) }
FlatMappedDStream的compute,又是父类的getOrCompute。结果又调用compute,如此循环。
// FlatMappedDStream.scala line 34 override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) }
直到DStreamshi SocketInputDStream,也就是inputStream时,compute是继承自父类。
先不考虑if中的逻辑,直接else代码块。
进入createBlockRDD
// ReceiverInputDStream.scala line 69 override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
new BlockRDD[T](ssc.sc, validBlockIds) line 127,RDD实例化成功
// ReceiverInputDStream.scala line 94 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) // line 127 } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } }
此BlockRDD是Spark Core的RDD的子类,且没有依赖的RDD。至此,RDD的实例化已经完成。
// BlockRDD.scala line 30 private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) // RDd.scala line 74 abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging
至此,最终还原回来的RDD:
new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。
在本例中则为
new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true)
而最终的print为
() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)
其中foreachFunc为 DStrean.scala line 766
至此,RDD已经经过DStream实例化完成,如今再回顾下,是否能够理解DStream是RDD的模版。
不过别急,回到ForEachDStream.scala line 46 ,将上述函数做为构造参数,传入Job。
-------------分割线--------------
补充下Job建立的流程图,来源于版本定制班学员博客,略有修改。
补充下RDD按照lineage从 OutputDStream 回溯 建立RDD Dag的流程图,来源于版本定制班学员博客
补充案例中 RDD按照lineage从 OutputDStream 回溯 建立RDD Dag的流程图,来源于版本定制班学员博客
下节内容从源码分析Job提交,敬请期待。