Job 物理执行图
在 Overview 里咱们初步介绍了 DAG 型的物理执行图,里面包含 stages 和 tasks。这一章主要解决的问题是:
给定 job 的逻辑执行图,如何生成物理执行图(也就是 stages 和 tasks)?
一个复杂 job 的逻辑执行图

代码贴在本章最后。
给定这样一个复杂数据依赖图,如何合理划分 stage,并肯定 task 的类型和个数? 一个直观想法是将先后关联的 RDDs 组成一个 stage,每一个箭头生成一个 task。对于两个 RDD 聚合成一个 RDD 的状况,这三个 RDD 组成一个 stage。这样虽然能够解决问题,但显然效率不高。除了效率问题,这个想法还有一个更严重的问题:
大量中间数据须要存储。对于 task 来讲,其执行结果要么要存到磁盘,要么存到内存,或者二者皆有。若是每一个箭头都是 task 的话,每一个 RDD 里面的数据都须要存起来,占用空间可想而知。 仔细观察一下逻辑执行图会发现:在每一个 RDD 中,每一个 partition 是独立的,也就是说在 RDD 内部,每一个 partition 的数据依赖各自不会相互干扰。所以,一个大胆的想法是将整个流程图当作一个 stage,为最后一个 finalRDD 中的每一个 partition 分配一个 task。图示以下:

全部的粗箭头组合成第一个 task,该 task 计算结束后顺便将 CoGroupedRDD 中已经计算获得的第二个和第三个 partition 存起来。以后第二个 task(细实线)只需计算两步,第三个 task(细虚线)也只须要计算两步,最后获得结果。 这个想法有两个不靠谱的地方:
- 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的全部 partitions,并且都在这一个 task 里面计算。
- 须要设计巧妙的算法来判断哪一个 RDD 中的哪些 partition 须要 cache。并且 cache 会占用存储空间。
虽然这是个不靠谱的想法,但有一个可取之处,即
pipeline 思想:数据用的时候再算,并且数据是流到要计算的位置的。好比在第一个 task 中,从 FlatMappedValuesRDD 中的 partition 向前推算,只计算要用的(依赖的) RDDs 及 partitions。在第二个 task 中,从 CoGroupedRDD 到 FlatMappedValuesRDD 计算过程当中,不须要存储中间结果(MappedValuesRDD 中 partition 的所有数据)。 更进一步,从 record 粒度来说,以下图中,第一个 pattern 中先算 g(f(record1)),而后原始的 record1 和 f(record1) 均可以丢掉,而后再算 g(f(record2)),丢掉中间结果,最后算 g(f(record3))。对于第二个 pattern 中的 g,record1 进入 g 后,理论上能够丢掉(除非被手动 cache)。其余 pattern 同理。

回到 stage 和 task 的划分问题,上面不靠谱想法的主要问题是碰到 ShuffleDependency 后没法进行 pipeline。那么只要在 ShuffleDependency 处断开,就只剩 NarrowDependency,而 NarrowDependency chain 是能够进行 pipeline 的。按照此思想,上面 ComplexJob 的划分图以下:

因此划分算法就是:
从后往前推算,遇到 ShuffleDependency 就断开,遇到 NarrowDependency 就将其加入该 stage。每一个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。 粗箭头表示 task。由于是从后往前推算,所以最后一个 stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents。
若是 stage 最后要产生 result,那么该 stage 里面的 task 都是 ResultTask,不然都是 ShuffleMapTask。之因此称为 ShuffleMapTask 是由于其计算结果须要 shuffle 到下一个 stage,本质上至关于 MapReduce 中的 mapper。ResultTask 至关于 MapReduce 中的 reducer(若是须要从 parent stage 那里 shuffle 数据),也至关于普通 mapper(若是该 stage 没有 parent stage)。 还有一个问题:算法中提到 NarrowDependency chain 能够 pipeline,但是这里的
ComplexJob 只展现了 OneToOneDependency 和 RangeDependency 的 pipeline,普通 NarrowDependency 如何 pipeline? 回想上一章里面 cartesian(otherRDD) 里面复杂的 NarrowDependency,图示以下:

通过算法划分后结果以下:

图中粗箭头展现了第一个 ResultTask,其余的 task 依此类推。因为该 stage 的 task 直接输出 result,因此这个图包含 6 个 ResultTasks。与 OneToOneDependency 不一样的是这里每一个 ResultTask 须要计算 3 个 RDD,读取两个 data block,而整个读取和计算这三个 RDD 的过程在一个 task 里面完成。当计算 CartesianRDD 中的 partition 时,须要从两个 RDD 获取 records,因为都在一个 task 里面,不须要 shuffle。这个图说明:
不论是 1:1 仍是 N:1 的 NarrowDependency,只要是 NarrowDependency chain,就能够进行 pipeline,生成的 task 个数与该 stage 最后一个 RDD 的 partition 个数相同。
物理图的执行
生成了 stage 和 task 之后,下一个问题就是
task 如何执行来生成最后的 result? 回到 ComplexJob 的物理执行图,若是按照 MapReduce 的逻辑,从前到后执行,map() 产生中间数据 map outpus,通过 partition 后放到本地磁盘。再通过 shuffle-sort-aggregate 后生成 reduce inputs,最后 reduce() 执行获得 result。执行流程以下:

整个执行流程没有问题,但不能直接套用在 Spark 的物理执行图上,由于 MapReduce 的流程图简单、固定,并且没有 pipeline。 回想 pipeline 的思想是
数据用的时候再算,并且数据是流到要计算的位置的。Result 产生的地方的就是要计算的位置,要肯定 “须要计算的数据”,咱们能够从后往前推,须要哪一个 partition 就计算哪一个 partition,若是 partition 里面没有数据,就继续向前推,造成 computing chain。这样推下去,结果就是:须要首先计算出每一个 stage 最左边的 RDD 中的某些 partition。
对于没有 parent stage 的 stage,该 stage 最左边的 RDD 是能够当即计算的,并且每计算出一个 record 后即可以流入 f 或 g(见前面图中的 patterns)。若是 f 中的 record 关系是 1:1 的,那么 f(record1) 计算结果能够当即顺着 computing chain 流入 g 中。若是 f 的 record 关系是 N:1,record1 进入 f() 后也能够被回收。总结一下,computing chain 从后到前创建,而实际计算出的数据从前到后流动,并且计算出的第一个 record 流动到不能再流动后,再计算下一个 record。这样,虽然是要计算后续 RDD 的 partition 中的 records,但并非要求当前 RDD 的 partition 中全部 records 计算获得后再总体向后流动。 对于有 parent stage 的 stage,先等着全部 parent stages 中 final RDD 中数据计算好,而后通过 shuffle 后,问题就又回到了计算 “没有 parent stage 的 stage”。
代码实现:每一个 RDD 包含的 getDependency() 负责确立 RDD 的数据依赖,compute() 方法负责接收 parent RDDs 或者 data block 流入的 records,进行计算,而后输出 record。常常能够在 RDD 中看到这样的代码
firstParent[T].iterator(split, context).map(f)
。firstParent 表示该 RDD 依赖的第一个 parent RDD,iterator() 表示 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 表示每流入一个 recod 就对其进行 f(record) 操做,输出 record。为了统一接口,这段 compute() 仍然返回一个 iterator,来迭代 map(f) 输出的 records。
总结一下:
整个 computing chain 根据数据依赖关系自后向前创建,遇到 ShuffleDependency 后造成 stage。在每一个 stage 中,每一个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。 若是要本身设计一个 RDD,那么须要注意的是 compute() 只负责定义 parent RDDs => output records 的计算逻辑,具体依赖哪些 parent RDDs 由
getDependency()
定义,具体依赖 parent RDD 中的哪些 partitions 由
dependency.getParents()
定义。 例如,在 CartesianRDD 中,
// RDD x = (RDD a).cartesian(RDD b)
// 定义 RDD x 应该包含多少个 partition,每一个 partition 是什么类型
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
// 定义 RDD x 中的每一个 partition 怎么计算获得
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
// s1 表示 RDD x 中的 partition 依赖 RDD a 中的 partitions(这里只依赖一个)
// s2 表示 RDD x 中的 partition 依赖 RDD b 中的 partitions(这里只依赖一个)
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
// 定义 RDD x 中的 partition i 依赖于哪些 RDD 中的哪些 partitions
//
// 这里 RDD x 依赖于 RDD a,同时依赖于 RDD b,都是 NarrowDependency
// 对于第一个依赖,RDD x 中的 partition i 依赖于 RDD a 中的
// 第 List(i / numPartitionsInRdd2) 个 partition
// 对于第二个依赖,RDD x 中的 partition i 依赖于 RDD b 中的
// 第 List(id % numPartitionsInRdd2) 个 partition
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
生成 job
前面介绍了逻辑和物理执行图的生成原理,那么,
怎么触发 job 的生成?已经介绍了 task,那么 job 是什么? 下表列出了能够触发执行图生成的典型
action(),其中第二列是
processPartition()
,定义如何计算 partition 中的 records 获得 result。第三列是
resultHandler()
,定义如何对从各个 partition 收集来的 results 进行计算来获得最终结果。
Action |
finalRDD(records) => result |
compute(results) |
reduce(func) |
(record1, record2) => result, (result, record i) => result |
(result1, result 2) => result, (result, result i) => result |
collect() |
Array[records] => result |
Array[result] |
count() |
count(records) => result |
sum(result) |
foreach(f) |
f(records) => result |
Array[result] |
take(n) |
record (i<=n) => result |
Array[result] |
first() |
record 1 => result |
Array[result] |
takeSample() |
selected records => result |
Array[result] |
takeOrdered(n, [ordering]) |
TopN(records) => result |
TopN(results) |
saveAsHadoopFile(path) |
records => write(records) |
null |
countByKey() |
(K, V) => Map(K, count(K)) |
(Map, Map) => Map(K, count(K)) |
用户的 driver 程序中一旦出现 action(),就会生成一个 job,好比
foreach()
会调用
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
,向 DAGScheduler 提交 job。若是 driver 程序后面还有 action(),那么其余 action() 也会生成 job 提交。因此,driver 有多少个 action(),就会生成多少个 job。这就是 Spark 称 driver 程序为 application(可能包含多个 job)而不是 job 的缘由。 每个 job 包含 n 个 stage,最后一个 stage 产生 result。好比,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程当中,DAGScheduler 会首先划分 stage,而后先提交
无 parent stage 的 stages,并在提交过程当中肯定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才可以提交。从 stage 和 task 的执行角度来说,一个 stage 的 parent stages 执行完后,该 stage 才能执行。
提交 job 的实现细节
下面简单分析下 job 的生成和提交代码,提交过程在 Architecture 那一章也会有图文并茂的分析:
- rdd.action() 会调用
DAGScheduler.runJob(rdd, processPartition, resultHandler)
来生成 job。
- runJob() 会首先经过
rdd.getPartitions()
来获得 finalRDD 中应该存在的 partition 的个数和类型:Array[Partition]。而后根据 partition 个数 new 出来未来要持有 result 的数组 Array[Result](partitions.size)
。
- 最后调用 DAGScheduler 的
runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)
来提交 job。cleanedFunc 是 processParittion 通过闭包清理后的结果,这样能够被序列化后传递给不一样节点的 task。
- DAGScheduler 的 runJob 继续调用
submitJob(rdd, func, partitions, allowLocal, resultHandler)
来提交 job。
- submitJob() 首先获得一个 jobId,而后再次包装 func,向 DAGSchedulerEventProcessActor 发送 JobSubmitted 信息,该 actor 收到信息后进一步调用
dagScheduler.handleJobSubmitted()
来处理提交的 job。之因此这么麻烦,是为了符合事件驱动模型。
- handleJobSubmmitted() 首先调用 finalStage = newStage() 来划分 stage,而后submitStage(finalStage)。因为 finalStage 可能有 parent stages,实际先提交 parent stages,等到他们执行完,finalStage 须要再次提交执行。再次提交由 handleJobSubmmitted() 最后的 submitWaitingStages() 负责。
分析一下 newStage() 如何划分 stage:
- 该方法在 new Stage() 的时候会调用 finalRDD 的 getParentStages()。
- getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
- 一个 ShuffleMapStage(不是最后造成 result 的 stage)造成后,会将该 stage 最后一个 RDD 注册到
MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
,这一步很重要,由于 shuffle 过程须要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。
分析一下 submitStage(stage) 如何提交 stage 和 task:
- 先肯定该 stage 的 missingParentStages,使用
getMissingParentStages(stage)
。若是 parentStages 均可能已经执行过了,那么就为空了。
- 若是 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将本身加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
- 若是 missingParentStages 为空,说明该 stage 能够当即执行,那么就调用
submitMissingTasks(stage, jobId)
来生成和提交具体的 task。若是 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。若是 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)
来提交一整个 taskSet。
- 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每个 taskSet 被包装成 manager: TaskSetMananger,而后交给
schedulableBuilder.addTaskSetManager(manager)
。schedulableBuilder 能够是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知backend.reviveOffers()
去执行 task,backend 的类型是 SchedulerBackend。若是在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
- SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,
backend.reviveOffers()
实际上是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
来 launch tasks。scheduler 就是 TaskSchedulerImpl。scheduler.resourceOffers()
从 FIFO 或者 Fair 调度器那里得到排序后的 TaskSetManager,并通过TaskSchedulerImpl.resourceOffer()
,考虑 locality 等因素来肯定 task 的所有信息 TaskDescription。调度细节这里暂不讨论。
- DriverActor 中的 launchTasks() 将每一个 task 序列化,若是序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
。
Discussion
至此,咱们讨论了:
- driver 程序如何触发 job 的生成
- 如何从逻辑执行图获得物理执行图
- pipeline 思想与实现
- 生成与提交 job 的实际代码
还有不少地方没有深刻讨论,如:
- 链接 stage 的 shuffle 过程
- task 运行过程及运行位置
下一章重点讨论 shuffle 过程。 从逻辑执行图的创建,到将其转换成物理执行图的过程很经典,过程当中的 dependency 划分,pipeline,stage 分割,task 生成 都是有条不紊,有理有据的。
ComplexJob 的源代码
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}