此文是从思惟导图中导出稍做调整后生成的,思惟脑图对代码浏览支持不是很好,为了更好阅读体验,文中涉及到的源码都是删除掉没必要要的代码后的伪代码,如需获取更好阅读体验可下载脑图配合阅读:node
此博文共分为四个部分:git
在 Spark Streaming 程序的入口,咱们都会定义一个 batchDuration,就是须要每隔多长时间就比照静态的 DStreamGraph 来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,整体负责动态做业调度的具体类是 JobScheduler。github
JobScheduler 有两个很是重要的成员:JobGenerator 和 ReceiverTracker。JobScheduler 将每一个 batch 的 RDD DAG 具体生成工做委托给 JobGenerator,而将源头输入数据的记录工做委托给 ReceiverTracker。异步
job运行的总指挥是JobScheduler.start(),socket
JobScheduler 有两个很是重要的成员:JobGenerator 和 ReceiverTracker。JobScheduler 将每一个 batch 的 RDD DAG 具体生成工做委托给 JobGenerator,而将源头输入数据的记录工做委托给 ReceiverTracker。oop
在StreamingContext中启动schedulerpost
class StreamingContext(sc,cp,batchDur){ val scheduler = new JobScheduler(this) start(){ scheduler.start() } }
在JobScheduler中启动recieverTracker和JobGeneratorthis
class JobScheduler(ssc) { var receiverTracker:ReceiverTracker=null var jobGenerator=new JobGenerator(this) val jobExecutor=ThreadUtils.newDaemonFixedThreadPool() if(stared) return // 只启动一次 receiverTracker.start() jobGenerator.start() }
在JobScheduler的start中启动ReceiverTraker:receiverTracker.start():
spa
RecieverTracker 调用launchReceivers方法线程
class ReceiverTracker { var endpoint:RpcEndpointRef=null def start()=synchronized{ endpoint=ssc.env.rpcEnv.setEndpoint( "receiverTracker", new ReceiverTrackerEndpoint() ) launchReceivers() } }
ReceiverTracker将RDD DAG和启动receiver的Func包装成ReceiverSupervisor发送到最优的Excutor节点上
从ReceiverInputDStreams中获取Receivers,并把他们发送到全部的worker nodes:
class ReceiverTracker { var endpoint:RpcEndpointRef= private def launchReceivers(){ // DStreamGraph的属性inputStreams val receivers=inputStreams.map{nis=> val rcvr=nis.getReceiver() // rcvr是对kafka,socket等接受数据的定义 rcvr } // 发送到worker endpoint.send(StartAllReceivers(receivers)) } }
在JobScheduler的start中启动JobGenerator:JobGenerator.start()
首次启动
private def startFirstTime() { // 定义定时器 val startTime = new Time(timer.getStartTime()) // 启动DStreamGraph graph.start(startTime - graph.batchDuration) // 启动定时器 timer.start(startTime.milliseconds) }
graph的生成是在StreamingContext中:
val graph: DStreamGraph={ // 重启服务时 if(isCheckpointPresent){ checkPoint.graph.setContext(this) checkPoint.graph.restoreCheckPointData() checkPoint.graph }else{ // 首次初始化时 val newGraph=new DStreamGraph() newGraph.setBatchDuration(_batchDur) newGraph } }
在GenerateJobs中启动graph:
graph.start(nowTime-batchDuration)
JobGenerator中定义了一个定时器:
val timer=new RecurringTimer(colck,batchDuaraion, longTime=>eventLoop.post( GenerateJobs( new Time(longTime) ) ) )
在JobGenerator启动时会开始执行这个调度器:
timer.start(startTime.milliseconds)
// 来自 JobGenerator
private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { ... private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") ... }
经过代码也能够看到,整个 timer 的调度周期就是 batchDuration,每次调度起来就是作一个很是简单的工做:往 eventLoop 里发送一个消息 —— 该为当前 batch (new Time(longTime)) GenerateJobs 了!
JobGenerator中定义了一个定时器,在定时器中启动生成job操做
class JobGenerator: // 定义定时器 val timer= new RecurringTimer(colck,batchDuaraion, longTime=>eventLoop.post(GenerateJobs( new Time(longTime)))) private def generateJobs(time: Time) { Try { // 1. 将已收到的数据进行一次 allocate receiverTracker.allocateBlocksToBatch(time) // 2. 复制一份新的DAG实例 graph.generateJobs(time) } match { case Success(jobs) => // 3. 获取 meta 信息 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) // 4. 提交job jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } // 5. checkpoint eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
在生成Job并提交到excutor的第二步,
JobGenerator->DStreamGraph->OutputStreams->ForEachDStream->TransformationDStream->InputDStream
具体流程是:
- 1. JobGenerator调用了DStreamGraph里面的gererateJobs(time)方法
- 2. DStreamGraph里的generateJobs方法遍历了outputStreams
- 3. OutputStreams调用了其generateJob(time)方法
- 4. ForEachDStream实现了generateJob方法,调用了:
parent.getOrCompute(time)
递归的调用父类的getOrCompute方法去动态生成物理DAG图
JobScheduler经过线程池执行从JobGenerator提交过来的Job,jobExecutor异步的去处理提交的job
class JobScheduler{ numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) val jobExecutor =ThreadUtils. newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") def submitJobSet(jobSet: JobSet) { jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) }
JobHandler 除了作一些状态记录外,最主要的就是调用 job.run(),
在 ForEachDStream.generateJob(time) 时,是定义了 Job 的运行逻辑,即定义了 Job.func。而在 JobHandler 这里,是真正调用了 Job.run()、将触发 Job.func 的真正执行!
// 来自 JobHandler def run() { ... // 【发布 JobStarted 消息】 _eventLoop.post(JobStarted(job)) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { // 【主要逻辑,直接调用了 job.run()】 job.run() } _eventLoop = eventLoop if (_eventLoop != null) { // 【发布 JobCompleted 消息】 _eventLoop.post(JobCompleted(job)) } ... }
spark.streaming.concurrentJobs job并行度
这里 jobExecutor 的线程池大小,是由 spark.streaming.concurrentJobs 参数来控制的,当没有显式设置时,其取值为 1。
进一步说,这里 jobExecutor 的线程池大小,就是可以并行执行的 Job 数。而回想前文讲解的 DStreamGraph.generateJobs(time) 过程,一次 batch 产生一个 Seq[Job},里面可能包含多个 Job —— 因此,确切的,有几个 output 操做,就调用几回 ForEachDStream.generatorJob(time),就产生出几个 Job
脑图制做参考:https://github.com/lw-lin/CoolplaySpark
完整脑图连接地址:https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png