Spark源码阅读——streaming模块做业生成和提交

Spark源码阅读——streaming模块做业生成和提交


一般咱们开发spark-streaming都会用到以下代码:闭包

val sparkConf = new SparkConf()
    .set("xxx", "")
    ...
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))

//而后咱们使用ssc来建立一个InputDStream
val stream = ssc.socketTextStream("localhost", 9090)
stream.map(...)
    .filter(...)
    .reduce(...)
    .foreachRDD(...)
ssc.start()
ssc.awaitTermination()

咱们知道stream的一系列调用链其实是构建dag的过程,咱们深刻foreachRDD代码中,查看到:socket

private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

DStream::registeride

private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

DStreamGraph::addOutputStreamoop

def addOutputStream(outputStream: DStream[_]) {
    this.synchronized {
      outputStream.setGraph(this)
      outputStreams += outputStream
    }
  }

这里DStream将本身注册到DStreamGraph里。后续生成做业的时候会遍历这个outputStreams。到这里dag已经构建并注册完毕,下面咱们看StreamingContext启动的以后(start)是如何每隔一段时间生成一个做业的。post

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

这里最核心的代码就是调用了JobScheduler.start(),JobScheduler是在StreamingContext建立的时候初始化的。ui

//JobScheduler::start
  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      ssc.sparkContext,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

这里咱们关注jobGenerator的启动。来看下JobGenerator.startthis

def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

咱们看到这里也初始化并启动了一个事件循环线程,接受JobGeneratorEvent类型的事件,同时第一次启动时调用了startFirstTime方法,spa

private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

在这个方法中启动了DStreamGraph和timer,咱们看timer是个什么东西:线程

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

发现它其实是一个定时器,每隔一段时间就向事件循环线程中发送一个GenerateJobs事件,而这一段时间就是咱们建立StreamingContext时传进来的时间间隔参数。而后咱们来看事件循环线程是如何处理这个事件的:rest

private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }
  
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

这里重点关注调用了DStreamGraph的generateJobs方法返回job集合和后续调用JobScheduler.submitJobSet方法

//DStreamGraph::generateJobs
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

看到这里遍历了上面提到的outputStreams,针对每一个注册的DStream调用其generateJob方法生成job,这里调用的实际上是DStream的子类ForEachDStream的generateJob方法,由于全部的action算子的底层实现最终都会落到ForEachDStream上。

//ForEachDStream::generateJob
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

getOrCompute方法内部会调用compute方法来返回一个RDD,而这个compute方法是一个抽象方法,须要子类去实现,通常来讲InputDStream直接在compute中建立一个RDD,而其余tranform算子的compute方法经过对其依赖的上游算子产生的RDD作变换获得一个新RDD。你们有兴趣能够看一下对接kafka模块的DirectInputDStream是如何实现的。这里返回一个rdd以后,建立了一个闭包封装在Job对象中返回,闭包中走了SparkContext提交job的逻辑,那么咱们接下来看Job中的闭包是何时执行的。 从这里返回一些列调用栈回到JobGenerator.generateJobs方法的JobScheduler.submitJobSet处

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

在submitJobSet中,依次将job封装在JobHandler中,提交到jobExecutor来执行,JobHandler是Runnable接口的一个实现,它的run方法中,比较核心的逻辑以下:

var _eventLoop = eventLoop
if (_eventLoop != null) {
  _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
  PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
    job.run()
  }
  _eventLoop = eventLoop
  if (_eventLoop != null) {
    _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  }
}

这里调用了job.run方法,而在run方法中实际执行的就是咱们传递给Job对象的闭包。整个job的运行是一个阻塞的过程,会独占jobExecutor的一个线程,而JobExecutor的线程数在初始化的时候是经过判断spark.streaming.concurrentJobs参数来指定的,默认为1。 好了,streaming模块生成做业的逻辑已经大致分析完成了。

相关文章
相关标签/搜索