站在不一样的角度看jobgit
transaction: Job是由一组RDD上转换和动做组成。github
stage: Job是由ResultStage和多个ShuffleMapState组成web
init:由action操做触发提交执行的一个函数 action操做会触发调用sc.runJob方法,api
Job是一组rdd的转换以及最后动做的操做集合,它是Spark里面计算最大最虚的概念,甚至在spark的任务页面中都没法看到job这个单位。 可是无论怎么样,在spark用户的角度,job是咱们计算目标的单位,每次在一个rdd上作一个动做操做时,都会触发一个job,完成计算并返回咱们想要的数据。 Job是由一组RDD上转换和动做组成,这组RDD之间的转换关系表现为一个有向无环图(DAG),每一个RDD的生成依赖于前面1个或多个RDD。 在Spark中,两个RDD之间的依赖关系是Spark的核心。站在RDD的角度,二者依赖表现为点对点依赖, 可是在Spark中,RDD存在分区(partition)的概念,两个RDD之间的转换会被细化为两个RDD分区之间的转换。 Stage的划分是对一个Job里面一系列RDD转换和动做进行划分。 首先job是因动做而产生,所以每一个job确定都有一个ResultStage,不然job就不会启动。 其次,若是Job内部RDD之间存在宽依赖,Spark会针对它产生一个中间Stage,即为ShuffleStage,严格来讲应该是ShuffleMapStage,这个stage是针对父RDD而产生的, 至关于在父RDD上作一个父rdd.map().collect()的操做。ShuffleMapStage生成的map输入,对于子RDD,若是检测到所本身所“宽依赖”的stage完成计算,就能够启动一个shuffleFectch, 从而将父RDD输出的数据拉取过程,进行后续的计算。 所以一个Job由一个ResultStage和多个ShuffleMapStage组成。数组
https://github.com/ColZer/DigAndBuried/blob/master/spark/shuffle-study.md闭包
调用SparkContext里面的函数重载,将分区数量,须要计算的分区下标等参数设置好 以rdd.count为例:异步
rdd.count // 获取分区数 sc.runJob(this, Utils.getIteratorSize _).sum // 设置须要计算的分区 runJob(rdd, func, 0 until rdd.partitions.length) // 设置须要在每一个partition上执行的函数 runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
定义一个接收计算结果的对象数组并将其返回 构造一个Array,并构造一个函数对象"(index, res) => results(index) = res"继续传递给runJob函数,而后等待runJob函数运行结束,将results返回; 对这里的解释至关在runJob添加一个回调函数,将runJob的运行结果保存到Array到, 回调函数,index表示mapindex, res为单个map的运行结果ide
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { // 定义返回的结果集 val results = new Array[U](partitions.size) // 定义resulthandler runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) // 返回计算结果 results }
将须要执行excutor的地址和回调函数等传给DAG调度器,由DAG调度器进行具体的submitJob操做。函数
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { // 获取须要发送的excutor地址 val callSite = getCallSite // 闭包封装,防止序列化错误 val cleanedFunc = clean(func) // 提交给dag调度器, dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) // docheckpoint rdd.doCheckpoint() }
注意:dagScheduler.runJob是堵塞的操做,即直到Spark完成Job的运行以前,rdd.doCheckpoint()是不会执行的 上异步的runJob回调用下面这个方法,里面设置了JobWaiter,用来等待job执行完毕。oop
def runJob{ ... // job提交后会返回一个jobwaiter对象 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => ... }
给JOB分配一个ID,并将其放入队列,返回一个阻塞器,等待当前job执行完毕。将结果数据传送给handler function
def submitJob{ // 生成JOB的ID val jobId = nextJobId.getAndIncrement() // 生成阻塞器 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // post方法的实现:eventQueue.put(event),其实是将此job提交到了一个LinkedBlockingDeque eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) } waiter
在提交job时,咱们将job放到了一个LinkedBlockingDeque队列,而后由EventLoop 负责接收处理请求,触发job的提交,产生一个finalStage. EventLoop是在jobScheduler中启动的时候在JobGenerator中启动的 当从队列中拉去job时,开建立ResultStage:
class EventLoop override def run(): Unit = { try { while (!stopped.get) { // 拉去job val event = eventQueue.take() try { // 触发建立stage onReceive(event) ... } def doOnReceive{ case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) }
建立job:根据JobId,finalStage,excutor地址,job状态监听的JobListener,task的属性properties等生成job,并把job放入Map中记录。
// class DAGScheduler private[scheduler] def handleJobSubmitted() { // 以不一样形式的hashMap存放job jobIdToStageIds = new HashMap[Int, HashSet[Int]] stageIdToStage = new HashMap[Int, Stage] jobIdToActiveJob = new HashMap[Int, ActiveJob] // 初始化finalStage var finalStage: ResultStage = createResultStage(finalRDD, func, partitions, jobId, callSite) // 初始化job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) **listenerBus.post(** **SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))** // 提交finalStage,计算时会判断其以前是否存在shuffleStage,若是存在会优先计算shuffleStage,最后再计算finalStage **submitStage(finalStage)** }
参见: MapOutputTrackerMaster stage的状态分为三类:计算失败,计算完成和未计算完成,迭代的去计算完成父stage后,就能够到下一步,将stage转换到具体的task进行执行。
class DAGScheduler private[scheduler] def handleJobSubmitted() { var finalStage: ResultStage = createResultStage(finalRDD, func, partitions, jobId, callSite) ... submitStage(finalStage) } // 迭代的去判断父stage是否所有计算完成 private def submitStage(stage: Stage) { if(jobId.isDefined){ val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { // 父stage已经计算完成,能够开始当前计算 submitMissingTasks(stage, jobId.get) } else { // 父stage的map操做未完成,继续进行迭代 for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } // 获取未计算完成的stage private def getMissingParentStages(stage: Stage): List[Stage] = { ... for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency=> // 判断当前stage是否计算完成 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } ... }
首先利于上面说到的Stage知识获取所须要进行计算的task的分片;由于该Stage有些分片可能已经计算完成了;而后将Task运行依赖的RDD,Func,shuffleDep 进行序列化,经过broadcast发布出去; 而后建立Task对象,提交给taskScheduler调度器进行运行
参见: 获取task分片 对Stage进行遍历全部须要运行的Task分片; 缘由:存在部分task失败之类的状况,或者task运行结果所在的BlockManager被删除了,就须要针对特定分片进行从新计算;即所谓的恢复和重算机制;
class DAGScheduler{ def submitMissingTasks(stage, jobId){ val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() val properties = jobIdToActiveJob(jobId).properties runningStages += stage }
对Stage的运行依赖进行序列化并broadcast给excutors(若是不序列化在数据传输过程当中可能出错) 对ShuffleStage和FinalStage所序列化的内容有所不一样:对于ShuffleStage序列化的是RDD和shuffleDep;而对FinalStage序列化的是RDD和Func 对于FinalStage咱们知道,每一个Task运行过程当中,须要知道RDD和运行的函数,好比咱们这里讨论的Count实现的Func;而对于ShuffleStage,ShuffleDependency记录了父RDD,排序方式,聚合器等,reduce端须要获取这些参数进行初始化和计算。
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... // consistent view of both variables. RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions } taskBinary = sc.broadcast(taskBinaryBytes)
针对每一个须要计算的分片构造一个Task对象, 对于ResultTask就是在分片上调用咱们的Func,而ShuffleMapTask按照ShuffleDep进行 MapOut
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { // 一个stage会产生多个task任务 case stage: ShuffleMapStage => partitionsToCompute.map { id => new ShuffleMapTask() } case stage: ResultStage => partitionsToCompute.map { id => new ResultTask() } }
调用taskScheduler将task提交给Spark进行调度
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... if (tasks.size > 0) { // 将taskSet发送给 taskScheduler taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { markStageAsFinished(stage, None) }
DAGScheduler接收到DAGSchedulerEvent后判断其类型是TaskCompletion,不一样的stage的实现方式不同,shuffle的实现更复杂一点
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) } private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => // 调用jobWaiter的taskSucced通知结果 job.listener.taskSucceeded case smt: ShuffleMapTask => // 调用outputTracker mapOutputTracker.registerMapOutput }
当计算完毕后,JobWaiter同步调用resultHandler处理task返回的结果。
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => // 调用jobWaiter的taskSucced通知结果 job.listener.taskSucceeded( rt.outputId, event.result) case smt: ShuffleMapTask => } // jobWaiter是JobListner的子类 class JobWaiter extends JobListener{ override def taskSucceeded(index: Int, result: Any): Unit = { synchronized { resultHandler(index, result.asInstanceOf[T]) } if (finishedTasks.incrementAndGet() == totalTasks) { jobPromise.success(()) } } }
参见: MapStatus的注册和获取 将运行结果(mapStatus)传送给outputTrancker
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => case smt: ShuffleMapTask => // mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) }
job执行完毕后执行
private[scheduler] abstract class Stage( val id: Int, // stageId val rdd: RDD[_],// RDD that this stage runs on val numTasks: Int,// task数量 val parents: List[Stage],// 父stage val firstJobId: Int,//当前stage上JobId val callSite: CallSite// 生成RDD存放位置 ) extends Logging {
class ShuffleMapStage( val shuffleDep: ShuffleDependency[_, _, _], mapOutputTrackerMaster: MapOutputTrackerMaster) extends Stage(id, rdd, numTasks, parents, firstJobId, callSite){ // 判断当前stage是否可用 def isAvailable: Boolean = numAvailableOutputs == numPartitions }
每一个运行在Executor上的Task, 经过SparkEnv获取shuffleManager对象, 而后调用getWriter来当前MapID=partitionId的一组Writer. 而后将rdd的迭代器传递给writer.write函数, 由每一个Writer的实现去实现具体的write操做;
class ShuffleMapTask extends Task( def runTask(context: TaskContext): MapStatus = { // 反序列化接收到的数据 val (rdd, dep) = closureSerializer.deserialize( ByteBuffer.wrap(taskBinary.value)) var writer: ShuffleWriter[Any, Any] = null val manager = SparkEnv.get.shuffleManager // 调用ShuffleManager的getWriter方法获取一组writer writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 遍历RDD进行write writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } }
上面代码中,在调用rdd的iterator()方法时,会根据RDD实现类的compute方法指定的处理逻辑对数据进行处理,固然,若是该Partition对应的数据已经处理过并存储在MemoryStore或DiskStore,直接经过BlockManager获取到对应的Block数据,而无需每次须要时从新计算。而后,write()方法会将已经处理过的Partition数据输出到磁盘文件。 在Spark Shuffle过程当中,每一个ShuffleMapTask会经过配置的ShuffleManager实现类对应的ShuffleManager对象(其实是在SparkEnv中建立),根据已经注册的ShuffleHandle,获取到对应的ShuffleWriter对象,而后经过ShuffleWriter对象将Partition数据写入内存或文件。
参见: 过滤须要执行的分片 返回须要计算的partition信息
class ShuffleMapStage{ def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } }
参见: reduce端获取 ResultTask不须要进行写操做。直接将计算结果返回。
class ResultTask extends Task { def runTask(context: TaskContext): U = { // 对RDD和函数进行反序列化 val (rdd, func) = ser.deserialize( ByteBuffer.wrap(taskBinary.value) // 调用函数进行计算 func(context, rdd.iterator(partition, context)) } } // RDD的iterator函数, class RDD{ def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } }
返回须要计算的partition信息,不须要通过tracker,在提交Job的时候会将其保存在ResultStage
class DAGScheduler{ def handleJobSubmitted(){ // 定义resultStage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) // 将job传递给resultStage finalStage.setActiveJob(job) } } class ResultStage{ // 过滤掉已经完成的 findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } }
sparkCore源码解析系列: