Spark源码分析:多种部署方式之间的区别与联系(1)

 从官方的文档咱们能够知道,Spark的部署方式有不少种:local、Standalone、Mesos、YARN.....不一样部署方式的后台处理进程是不同的,可是若是咱们从代码的角度来看,其实流程都差很少。
  从代码中,咱们能够得知其实Spark的部署方式其实比官方文档中介绍的还要多,这里我来列举一下: 微信

  一、local:这种方式是在本地启动一个线程来运行做业;
  二、local[N]:也是本地模式,可是启动了N个线程;
  三、local[*]:仍是本地模式,可是用了系统中全部的核;
  四、local[N,M]:这里有两个参数,第一个表明的是用到的核个数;第二个参数表明的是允许该做业失败M次。上面的几种模式没有指定M参数,其默认值都是1;
  五、local-cluster[N, cores, memory]:本地伪集群模式,参数的含义我就不说了,看名字就知道;式;
  六、spark:// :这是用到了Spark的Standalone模
  七、(mesos|zk)://:这是Mesos模式;
  八、yarn-standalone\yarn-cluster\yarn-client:这是YARN模式。前面两种表明的是集群模式;后面表明的是客户端模式;
  九、simr://:这种你就不知道了吧?simr实际上是Spark In MapReduce的缩写。咱们知道MapReduce 1中是没有YARN的,若是你在MapReduce 1中使用Spark,那么就用这种模式吧。

  整体来讲,上面列出的各类部署方式运行的流程大体同样:都是从SparkContext切入,在SparkContext的初始化过程当中主要作了如下几件事:
  一、根据SparkConf建立SparkEnv oop

01 // Create the Spark execution environment (cache, map output tracker, etc)
02   private[spark] val env = SparkEnv.create(
03     conf,
04     "<driver>",
05     conf.get("spark.driver.host"),
06     conf.get("spark.driver.port").toInt,
07     isDriver = true,
08     isLocal = isLocal,
09     listenerBus = listenerBus)
10   SparkEnv.set(env)

  二、初始化executor的环境变量executorEnvs
  这个步骤代码太多了,我就不贴出来。
  三、建立TaskScheduler post

1 // Create and start the scheduler
2   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

  四、建立DAGScheduler this

1 @volatile private[spark] var dagScheduler: DAGScheduler = _
2   try {
3     dagScheduler = new DAGScheduler(this)
4   } catch {
5     case e: Exception => throw
6       newSparkException("DAGScheduler
7                      cannot be initialized due to %s".format(e.getMessage))
8   }

  五、启动TaskScheduler spa

1 // start TaskScheduler after taskScheduler
2 // sets DAGScheduler reference in DAGScheduler's
3   // constructor
4   taskScheduler.start()

  那么,DAGScheduler和TaskScheduler都是什么?
  DAGScheduler称为做业调度,它基于Stage的高层调度模块的实现,它为每一个Job的Stages计算DAG,记录哪些RDD和Stage的输出已经实物化,而后找到最小的调度方式来运行这个Job。而后以Task Sets的形式提交给底层的任务调度模块来具体执行。
  TaskScheduler称为任务调度。它是低层次的task调度接口,目前仅仅被TaskSchedulerImpl实现。这个接口能够以插件的形式应用在不一样的task调度器中。每一个TaskScheduler只给一个SparkContext调度task,这些调度器接受来自DAGScheduler中的每一个stage提交的tasks,并负责将这些tasks提交给cluster运行。若是提交失败了,它将会重试;并处理stragglers。全部的事件都返回到DAGScheduler中。
  在建立DAGScheduler的时候,程序已经将taskScheduler做为参数传进去了,代码以下: 插件

01 def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
02     this(
03       sc,
04       taskScheduler,
05       sc.listenerBus,
06       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
07       sc.env.blockManager.master,
08       sc.env)
09   }
10  
11   def this(sc: SparkContext) = this(sc, sc.taskScheduler)

也就是DAGScheduler封装了TaskScheduler。TaskScheduler中有两个比较重要的方法: 线程

1 // Submit a sequence of tasks to run.
2 def submitTasks(taskSet: TaskSet): Unit
3  
4 // Cancel a stage.
5 def cancelTasks(stageId: Int, interruptThread: Boolean)

  这些方法在DAGScheduler中被调用,而TaskSchedulerImpl实现了TaskScheduler,为各类调度模式提供了任务调度接口,在TaskSchedulerImpl中还实现了resourceOffers和statusUpdate两个接口给Backend调用,用于提供调度资源和更新任务状态。
  在YARN模式中,还提供了YarnClusterScheduler类,他只是简单地继承TaskSchedulerImpl类,主要重写了getRackForHost(hostPort: String)和postStartHook() 方法。继承图以下: orm


若是想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共账号: iteblog_hadoop


在下篇文章中,我将介绍上面九种部署模式涉及到的各类类及其之间的关系。欢迎关注本博客!这里先列出下篇文章用到的类图
相关文章
相关标签/搜索