本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
JobGenerator周期性的不断产生Job,最终Job会在Executor上执行处理。缓存
咱们能够看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker内部维护了一个 streamIdToUnallocatedBlockQueues,用于追踪Executor上报上来的Block。架构
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = new ReceivedBlockTracker(
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
复制代码
receivedBlockTracker内部重要的元数据存储结构:app
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
复制代码
JobScheduler里面包含核心的重量级成员,分别是:jobGenerator 和 receiverTracker。其中初始化以下:框架
private val jobGenerator = new JobGenerator(this)
receiverTracker = new ReceiverTracker(ssc)
复制代码
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
复制代码
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
复制代码
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
复制代码
JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用ide
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
复制代码
receiverTracker持有receivedBlockTracker的引用函数
从streamIdToUnallocatedBlockQueues取出streamId对应的全部间隔为200ms(default)采集的block,并把它放到timeToAllocatedBlocks中。oop
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
(首先按照用户设置的时间窗,从streamIdToUnallocatedBlockQueues取出全部的Block)
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap <=点睛之笔
(而后把未分配用户指定时间窗的block放进timeToAllocatedBlocks)
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) <=点睛之笔
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
复制代码
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD) <=点睛之笔
}
rddOption
} else {
None
}
}
}
复制代码
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
复制代码
* Generates RDDs with blocks received by the receiver of this stream.
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
(主要从timeToAllocatedBlocks中取出数据)
val receiverTracker = ssc.scheduler.receiverTracker <=点睛之笔
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) <=点睛之笔
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
(主要从timeToAllocatedBlocks中取出数据,构建RDD,方便后续调用链使用generatedRDDs)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos) <=点睛之笔
}
}
Some(blockRDD)
}
复制代码
可见 allocateBlocksToBatch的做用就是为了把对应窗的Block放进timeToAllocatedBlocks。方便调用链使用。post
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
复制代码
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
复制代码
// Map to track all the InputInfo related to specific batch time and input stream. private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]学习
case class StreamInputInfo(
inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty)
复制代码
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
复制代码
本文是做者花大量时间整理而成,请勿作伸手党,禁止转载,欢迎学习,有问题请留言。
秦凯新 于深圳 2018