Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块
这位写的很是好, 让我对Spark的源码分析, 变的轻松了许多
这里本身再梳理一遍node

先看一个简单的spark操做,app

val sc = new SparkContext(……)
val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()

 

1. SparkContext

这是Spark的入口, 任何须要使用Spark的地方都须要先建立SparkContextoop

在SparkContext中, 最主要的初始化工做就是start TaskScheduler和DAGScheduler, 这两个就是Spark的核心所在源码分析

Spark的设计很是的干净, 把整个DAG抽象层从实际的task执行中剥离了出来
DAGScheduler, 负责解析spark命令, 生成stage, 造成DAG, 最终划分红tasks, 提交给TaskScheduler, 他只完成静态分析
TaskScheduler, 专门负责task执行, 他只负责资源管理, task分配, 执行状况的报告
这样的好处, 就是Spark能够经过提供不一样的TaskScheduler简单的支持各类资源调度和执行平台, 如今Spark支持, local, standalone, mesos, Yarn...this

class SparkContext(
    val master: String,
    val appName: String,
    val sparkHome: String = null,
    val jars: Seq[String] = Nil,
    val environment: Map[String, String] = Map(),
    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
  extends Logging {

  // Create and start the scheduler
  private var taskScheduler: TaskScheduler = {
  //.......
  }
  taskScheduler.start()

  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
  dagScheduler.start()
}

 

2. sc.textFile

而后固然要载入被处理的数据, 最经常使用的textFile, 其实就是生成HadoopRDD, 做为起始的RDDspa

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
      .map(pair => pair._2.toString)
  }
  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minSplits: Int = defaultMinSplits
      ) : RDD[(K, V)] = {
    val conf = new JobConf(hadoopConfiguration)
    FileInputFormat.setInputPaths(conf, path)
    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
  }

 

3. Transform and Action

这里调用的filter transform很简单, 能够参考前面的blog
关键调用count action, action的不一样在于, 会调用runjob
因此在调用action以前, job都是没有被真正执行的scala

  def count(): Long = {// 只有在action中才会真正调用runJob, 因此transform都是lazy的
    sc.runJob(this, (iter: Iterator[T]) => { // count调用的是简化版的runJob, 只传入rdd和func, 其余的会用默认值补全
      var result = 0L
      while (iter.hasNext) {
        result += 1L
        iter.next()
      }
      result
    }).sum
  }

 

4. sc.runJob

关键在于调用了dagScheduler.runJob设计

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver(建立SparkContext的进程) rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassManifest](
      rdd: RDD[T], //只须要传入Final RDD, 前面的能够根据dependency推出
      func: (TaskContext, Iterator[T]) => U, //action的逻辑,好比count逻辑
      partitions: Seq[Int],  //partition的个数
      allowLocal: Boolean, //对于一些简单的action,是否容许在local执行
      resultHandler: (Int, U) => Unit) { //会在JobWaiter的taskSucceeded中用于处理task result
    val callSite = Utils.formatSparkCallSite
    logInfo("Starting job: " + callSite)
    val start = System.nanoTime
    val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
      localProperties.get)
    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
    result
  }
相关文章
相关标签/搜索