Spark学习之Spark调优与调试(一)

1、使用SparkConf配置Spark

  对 Spark 进行性能调优,一般就是修改 Spark 应用的运行时配置选项。Spark 中最主要的配置机制是经过 SparkConf 类对 Spark 进行配置。当建立出一个 SparkContext 时,就须要建立出一个 SparkConf 的实例。shell

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    
    // 建立一个conf对象
    val conf = new SparkConf()
    conf.set("spark.app.name", "My Spark App")
    conf.set("spark.master", "local[4]")
    conf.set("spark.ui.port", "36000")   // 重载默认端口配置   
    // 使用这个配置对象建立一个SparkContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    
  }
}

  Spark 容许经过 spark-submit 工具动态设置配置项。当应用被 spark-submit 脚本启动时,脚本会把这些配置项设置到运行环境中。当一个新的 SparkConf 被建立出来时,这些环境变量会被检测出来而且自动配好。这样,在使用spark-submit 时,用户应用中只要建立一个“空”的 SparkConf ,并直接传给 SparkContext的构造方法就好了。apache

  spark-submit 工具为经常使用的 Spark 配置项参数提供了专用的标记,还有一个通用标记--conf 来接收任意 Spark 配置项的值。并发

$ bin/spark-submit \
    --class com.example.MyApp \
    --master local[4] \
    --name "My Spark App" \
    --conf spark.ui.port=36000 \
    myApp.jar

  spark-submit 也支持从文件中读取配置项的值。这对于设置一些与环境相关的配置项比较有用,方便不一样用户共享这些配置(好比默认的 Spark 主节点)。默认状况下, spark-submit 脚本会在 Spark 安装目录中找到 conf/spark-defaults.conf 文件,尝试读取该文件中以空格隔开的键值对数据。你也能够经过 spark-submit 的 --properties-File 标记,自定义该文件的路径。app

$ bin/spark-submit \
    --class com.example.MyApp \
    --properties-file my-config.conf \
    myApp.jar

## Contents of my-config.conf ##
spark.master local[4]
spark.app.name "My Spark App"
spark.ui.port 36000 

  有时,同一个配置项可能在多个地方被设置了。例如,某用户可能在程序代码中直接调用了 setAppName() 方法,同时也经过 spark-submit 的 --name 标记设置了这个值。针对这种状况,Spark 有特定的优先级顺序来选择实际配置。优先级最高的是在用户代码中显式调用 set() 方法设置的选项。其次是经过 spark-submit 传递的参数,再次是写在配置文件中的值,最后是系统的默认值。 下表列出了一些经常使用的配置项。工具

  

  

2、Spark执行的组成部分:做业、任务和步骤

  下面经过一个示例应用来展现 Spark 执行的各个阶段,以了解用户代码如何被编译为下层的执行计划。咱们实现的是一个简单的日志分析应用。输入数据是一个由不一样严重等级的日志消息和一些分散的空行组成的文本文件,咱们但愿计算其中各级别的日志消息的条数。性能

  

    val input = sc.textFile("words.txt")  // 读取输入文件
    // 切分为单词而且删掉空行 若是大于0的话删除不掉空行
    val tokenized = input.map(line=>line.split(" ")).filter(words=>words.size>1)  //若是大于0的话删除不掉空行
    val counts = tokenized.map(words=>(words(0),1)).reduceByKey((a,b)=>a+b) // 提取出日志等级并进行计数

  这一系列代码生成了一个叫做 counts 的 RDD,其中包含各级别日志对应的条目数。在shell 中执行完这些命令以后,程序没有执行任何行动操做。相反,程序定义了一个 RDD对象的有向无环图(DAG),咱们能够在稍后行动操做被触发时用它来进行计算。每一个RDD 维护了其指向一个或多个父节点的引用,以及表示其与父节点之间关系的信息。好比,当你在 RDD 上调用 val b = a.map() 时, b 这个 RDD 就存下了对其父节点 a 的一个引用。这些引用使得 RDD 能够追踪到其全部的祖先节点。ui

  Spark 提供了 toDebugString() 方法来查看 RDD 的谱系。spa

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    
    // 建立一个conf对象
    val conf = new SparkConf()
    conf.set("spark.app.name", "My Spark App")
    conf.set("spark.master", "local[4]")
    // conf.set("spark.ui.port", "36000")   // 重载默认端口配置   
    // 使用这个配置对象建立一个SparkContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    val input = sc.textFile("words.txt")  // 读取输入文件
    // 切分为单词而且删掉空行 若是大于0的话删除不掉空行
    val tokenized = input.map(line=>line.split(" ")).filter(words=>words.size>1)  //若是大于0的话删除不掉空行
    val counts = tokenized.map(words=>(words(0),1)).reduceByKey((a,b)=>a+b) // 提取出日志等级并进行计数
    
    println(input.toDebugString)  // 经过toDebugString查看RDD的谱系
    println("====================================================")
    println(tokenized.toDebugString)
    println("====================================================")
    println(counts.toDebugString)
    
  }
}

  

  在调用行动操做以前,RDD 都只是存储着可让咱们计算出具体数据的描述信息。要触发实际计算,须要对 counts 调用一个行动操做,好比使用 collect() 将数据收集到驱动器程序中。scala

    counts.collect().foreach(println)

  Spark 调度器会建立出用于计算行动操做的 RDD 物理执行计划。咱们在此处调用 RDD 的collect() 方法,因而 RDD 的每一个分区都会被物化出来并发送到驱动器程序中。Spark 调度器从最终被调用行动操做的 RDD(在本例中是 counts )出发,向上回溯全部必须计算的 RDD。调度器会访问 RDD 的父节点、父节点的父节点,以此类推,递归向上生成计算全部必要的祖先 RDD 的物理计划。咱们以最简单的状况为例,调度器为有向图中的每一个RDD 输出计算步骤,步骤中包括 RDD 上须要应用于每一个分区的任务。而后以相反的顺序执行这些步骤,计算得出最终所求的 RDD。日志

相关文章
相关标签/搜索