Apache Spark源码分析-- Job的提交与运行

本文以wordCount为例,详细说明spark建立和运行job的过程,重点是在进程及线程的建立。java

实验环境搭建 node

在进行后续操做前,确保下列条件已知足。web

1. 下载spark binary 0.9.1shell

2. 安装scalaapache

3. 安装sbt浏览器

4. 安装java微信

启动spark-shell单机模式运行,即local模式 jvm

local模式运行很是简单,只要运行如下命令便可,假设当前目录是$SPARK_HOME函数

MASTER=local bin/spark-shell 学习

"MASTER=local"就是代表当前运行在单机模式

local cluster方式运行

localcluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别以下

1. 启动master

2. 启动worker

3. 启动spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。

master主要是运行类 org.apache.spark.deploy.master.Master在8080端口启动监听,日志以下图所示

修改配置

1. 进入$SPARK_HOME/conf目录

2. 将spark-env.sh.template重命名为spark-env.sh

3. 修改spark-env.sh,添加以下内容

export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost运行workerbin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker启动完成,链接到master。打开maser的webui能够看到链接上来的worker. Master WEb UI的监听地址是http://localhost:8080

启动spark-shellMASTER=spark://localhost:7077 bin/spark-shell

若是一切顺利,将看到下面的提示信息。

Created spark context..Spark context available as sc.

能够用浏览器打开localhost:4040来查看以下内容

1. stages

2. storage

3. environment

4. executors

wordcount

上述环境准备稳当以后,咱们在sparkshell中运行一下最简单的例子,在spark-shell中输入以下代码

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代码统计在README.md中含有Spark的行数有多少

部署过程详解

Spark布置环境中组件构成以下图所示。

 



  • Driver Program 简要来讲在spark-shell中输入的wordcount语句对应于上图的Driver Program.

  • Cluster Manager 就是对应于上面提到的master,主要起到deploy management的做用

  • Worker Node 与Master相比,这是slave node。上面运行各个executor,executor能够对应于线程。executor处理两种基本的业务逻辑,一种就是driver     programme,另外一种就是job在提交以后拆分红各个stage,每一个stage能够运行一到多个task

Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另外一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,若是是真正的standalone或Mesos及Yarn集群,worker与master或分布于不一样的主机之上。

JOB的生成和运行

job生成的简单流程以下

1. 首先应用程序建立SparkContext的实例,如实例为sc

2. 利用SparkContext的实例来建立生成RDD

3. 通过一连串的transformation操做,原始的RDD转换成为其它类型的RDD

4. 当action做用于转换以后RDD时,会调用SparkContext的runJob方法

5. sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处

调用路径大体以下

1. sc.runJob->dagScheduler.runJob->submitJob

2. DAGScheduler::submitJob会建立JobSummitted的event发送给内嵌类eventProcessActor

3. eventProcessActor在接收到JobSubmmitted以后调用processEvent处理函数

4. job到stage的转换,生成finalStage并提交运行,关键是调用submitStage

5. 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖窄依赖两种

6. 若是计算中发现当前的stage没有任何依赖或者全部的依赖都已经准备完毕,则提交task

7. 提交task是调用函数submitMissingTasks来完成

8. task真正运行在哪一个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks

9. TaskSchedulerImpl中会根据Spark的当前运行模式来建立相应的backend,若是是在单机运行则建立LocalBackend

10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件

11. receiveOffers->executor.launchTask->TaskRunner.run

代码片断executor.lauchTask

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {    val tr = new TaskRunner(context, taskId, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr)  }

说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor以内。

运算结果是包装成为MapStatus而后经过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣能够自行勾勒。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费学习交流群:

相关文章
相关标签/搜索