Spark 源码分析(三): SparkContext 初始化之 TaskScheduler 建立与启动

前面已经分析到了 driver 进程成功在某台 worker 上启动了,下面就开始执行咱们写的那些代码了。以一个 wordcount 程序为例,代码以下:java

val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./file/localfile")
    val words = lines.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey(_ + _)
    wordCounts.foreach(wordCount => println(wordCount._1 + " " + wordCount._2))
复制代码

首先会去初始化咱们的 SparkContext 对象,在初始化 SparkContext 对象前会先建立一个 SparkConf 对象用来配置各类参数。SparkContext 对象的初始化代码在 org.apache.spark.SparkContext 的 374-594 行(spark 2.1.1 的源码中),这块代码大概作了这些事情:算法

1,建立 Spark 的执行环境 SparkEnv;apache

2,建立并初始化 SparkUI;后端

3,Hadoop 相关配置和 Executor 环境变量的设置;缓存

4,建立心跳接收器,用来和 Executor 作通讯;ide

5,建立和启动 TaskScheduler;函数

6,建立和启动 DAGScheduler;oop

7,初始化 BlockManager;ui

8,启动测量系统 MetricsSystem;this

9,建立和启动 ExecutorAllocationManager;

10,建立和启动 ContextCleaner;

11,Spark Environment Update;

12,向系统的测量系统注册 DAGSchedulerSource,BlockManagerSource,executorAllocationManagerSource;

13,标记当前 SparkContext 为激活状态(这个代码在 SparkContext 类的最后,2237 行);

以上就是 SparkContext 初始化过程,咱们最主要的是分析 TaskScheduler 和 DAGScheduler 的建立和启动过程,这篇主要来看 TaskScheduler。

建立和启动 TaskScheduler 的代码从 501 行开始,代码以下:

// 建立 TaskScheduler,DAGScheduler
		val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // 开启 taskScheduler
    _taskScheduler.start()
复制代码

上面代码能够看出经过调用 SparkContext.createTaskScheduler 方法去建立 TaskScheduler。

createTaskScheduler 方法会根据传入的 master 参数进行模式匹配,咱们使用的是 spark 的 standalone cluster 模式,对应的 master 确定是这种的:“spark://xxxx”。

因此会匹配到 SPARK_REGEX(sparkUrl) 这里,这里的 SPARK_REGEX 的匹配规则是 val SPARK_REGEX = """spark://(.*)""".r

匹配成功后,会去建立 TaskSchedulerImpl 对象、StandaloneSchedulerBackend 对象,而后而后将 StandaloneSchedulerBackend 对象带入到 TaskSchedulerImpl 的 initialize 方法中进行初始化操做。最后返回 StandaloneSchedulerBackend 和 TaskSchedulerImpl 对象。

这里的 TaskSchedulerImpl 是 TaskScheduler trait 的实现类,做用是:从 DAGScheduler 接收不一样的 stage 任务,并向集群提交这些任务。

以上分析对应的代码以下:

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
复制代码

下面主要看 scheduler.initialize(backend) 这里,执行 TaskSchedulerImpl 的 initialize 方法。这个方法要传入一个 SchedulerBackend 对象。

initialize 的过程当中首先会将 StandaloneSchedulerBackend 对象持有到该 TaskSchedulerImpl 对象里。

而后去建立调度池 rootPool,这里面缓存了调度队列等相关信息。

而后再去根据 schedulingMode 去进行模式匹配使用哪一种调度算法,schedulingMode 初始化时候默认值是 FIFO,因此这里默认的调度算法会匹配到 FIFO 模式。

模式匹配成功后建立 FIFOSchedulableBuilder,用来操做 rootPool 中的调度队列。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // 建立调度池
    rootPool = new Pool("", schedulingMode, 0, 0)
    // 模式匹配调度算法
    // schedulingMode 有个默认值为 FIFO,具体能够点到对应的代码看
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }
复制代码

初始化完成后就会返回 (backend, scheduler)

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
复制代码

而后为 SparkContext 对象中对应的 _schedulerBackend 和 _taskScheduler 对象赋值,这里的 _schedulerBackend 是 StandaloneSchedulerBackend,_taskScheduler 是 TaskSchedulerImpl

_schedulerBackend = sched
    _taskScheduler = ts
复制代码

接下来就是建立 DAGScheduler。

DAGScheduler 主要做用是:分析用户提交的应用,根据 RDD 之间的依赖关系建立 DAG 图,而后根据 DAG 图划分红多个 stage,为每一个 stage 分一组 task 去处理一批数据。而后将这些 task 交给 TaskScheduler,TaskScheduler 会经过 ClusterManager 在集群中找到符合要求的 Worker 上的 Executor 去启动这些 task。

具体源码能够看后面的文章。

_dagScheduler = new DAGScheduler(this)
复制代码

new DAGScheduler(this) 这个构造函数里 this 就是当前的 SparkContext 对象,后续代码里会经过 SparkContext.taskScheduler 方法拿到里面的 _taskScheduler 对象,而后会调用 TaskScheduler.setDAGScheduler 方法设置好 DAGScheduler 的引用。这里设置好以后就会开启 TaskScheduler。

_taskScheduler.start()
复制代码

_taskScheduler.start() 方法中回去调用 backend.start 方法,在这里就是 StandaloneSchedulerBackend 中的 start 方法。

override def start() {
    // 这里会去调用 StandaloneSchedulerBackend 的 start 方法
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
复制代码

StandaloneSchedulerBackend 是一个 SchedulerBackend trait 的实现类,是 TaskScheduler 调度后端接口,不一样的集群部署模式会有不一样的实现。

TaskScheduler 给 task 分配资源的时候其实是经过 SchedulerBackend 去完成的。StandaloneSchedulerBackend 是用于 standalone cluster 模式下的 SchedulerBackend。做用于 driver 内,用于和 Executor 通讯,Task 的资源分配。

StandaloneSchedulerBackend 的 start 方法会去调用其父类的 start 方法,也就是 CoarseGrainedSchedulerBackend 的 start 方法。

这个方法内会去建立一个 DriverEndPointRef。

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // 根据配置参数属性去建立 DriverEndPointRef
    driverEndpoint = createDriverEndpointRef(properties)
  }
复制代码

createDriverEndpointRef 这个方法内部会先想 rpcEnv 上注册一个建立好的 DriverEndpoint。

protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
  	// 向 rpcEnv 注册 createDriverEndpoint 这个方法返回的 DriverEndpoint
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }
复制代码

createDriverEndpoint 这个方法是用来建立 DriverEndpoint 的,这个 DriverEndpoint 就是用来 提交 task 到 Executor,并接收 Executor 的返回结果的。

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
  // 建立 DriverEndpoint 
  new DriverEndpoint(rpcEnv, properties)
  }
复制代码

至此,TaskScheduler 建立和启动已经完成。

相关文章
相关标签/搜索