从Spark应用的提交到执行完成有不少步骤,为了便于理解,咱们把应用执行的整个过程划分为三个阶段。而咱们知道Spark有多种运行模式,不一样模式下这三个阶段的执行流程也不相同。sql
本节介绍这三个阶段的划分,并概要介绍不一样模式下各个阶段的执行流程,各个模式的详细流程会在后面的章节进行分析。apache
咱们知道,Spark应用能够在多种模式下运行。所谓多种模式主要是针对资源分配方式来讲的,Spark应用能够在yarn,k8s,mesos等分布式资源管理平台上运行,也能够启动自带的master和worker端来分配和管理资源(standalone模式)。例如:咱们能够经过如下命令来向yarn提交一个spark任务:app
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
$SPARK_HOME/examples/jars/spark-examples*.jar
代码1 spark应用提交命令dom
要注意的是,在执行以上应用提交命令时yarn资源管理集群必须已经启动。另外,Spark应用的执行是经过Driver端和Executor端共同配合完成的。分布式
要完成以上应用的执行,须要经历不少步骤,为了便于更好的理解Spark应用从提交到执行完成的整个过程,咱们把整个过程划分红三个阶段:ide
应用的提交函数
执行环境的准备学习
任务的调度和执行ui
如图1所示:spa
图1 应用执行整体流程
无论以哪一种模式运行,Spark应用的执行过程均可以划分红这三个阶段。下面对这三个阶段分别进行说明。
这三个阶段以及每一个阶段要完成的目标如图2所示。
图2 Spark应用执行的3阶段目标概述
咱们根据如下代码为例,来说解Spark应用执行的各个阶段。
# HelloWorld.scala
import scala.math.random
import org.apache.spark.sql.SparkSession
object HelloWorld {
def main(args: Array[String]) {
val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
}
}
这个阶段在Driver端完成,主要目标是:准备依赖包并肯定Spark应用的执行主类。具体的任务包括:
解析任务提交的参数,并对参数进行解析和保存。
准备(可能会下载)任务启动参数指定的依赖文件或程序包。
根据Spark应用的执行模式和应用的编写语言,来肯定执行的主类名称。
实例化执行主类,生成SparkApplication对象,并调用SparkApplication#start()函数来运行Spark应用(如果Java或scala代码实际上是:执行Spark应用中的main函数)。
注意:第1阶段完成时,Driver端并无向资源管理平台申请任何资源,也没有启动任何Spark内部的服务。
经过第1阶段,已经找到了运行在Driver端的Spark应用的执行主类,并建立了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第2阶段。
第2阶段主要目标是:建立SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的建立。第2阶段完成后Task的执行环境就准备好了。
也就是说,第2阶段不只会在Driver端进行初始化,并且还要准备好Executor。这一阶段的任务主要是在Driver端执行建立SparkSession的代码来完成,也就是执行下面一行代码:
val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()
第2阶段的Driver端主要完成如下步骤:
建立SparkContext和SparkEnv对象,在建立这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。
第2阶段的Executor端主要完成如下步骤:
Driver端向Cluster Manager申请资源,如果Yarn模式会在NodeManager上建立ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
在启动Executor时向Driver端注册BlockManager服务,并建立心跳服务RPC环境,经过该RPC环境向Driver汇报Executor的状态信息。
详细的执行步骤,会在后面介绍每种模式的运行原理时,详细分析。第2阶段执行完成后的Spark集群状态以下图:
经过第2阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的建立,资源已经申请到了,而且已经启动了Executor。
这一阶段会执行接下来的数据处理的代码:
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
第3阶段Driver端主要完成如下步骤:
执行Spark的处理代码,当执行map操做时,生成新的RDD;
当执行Action操做时,触发Job的提交,此时会执行如下步骤:
根据RDD的血缘,把Job划分红相互依赖的Stage;
把每一个Stage拆分红一个或多个Task;
把这些Task提交给已经建立好的Executor去执行;
获取Executor的执行状态信息,直到Executor完成全部Task的执行;
获取执行结果和最终的执行状态。
本节介绍了Spark应用的执行过程,经过本节的学习应该对Spark应用的执行过程有一个整体的理解。接下来会根据具体的运行模式来详细分析每一个阶段的执行步骤。