Spark Core知识点复习-1

Day1111

Spark任务调度
Spark几个重要组件
Spark Core
    RDD的概念和特性
    生成RDD的两种类型
    RDD算子的两种类型
    算子练习
    分区
    RDD的依赖关系
    DAG:有向无环图
    任务提交
    缓存
    checkPoint
    自定义排序
    自定义分区器
    自定义累加器
    广播变量
    Spark Shuffle过程
    Spark优化过程
    SparkSQL
    集成Hive

一.Spark Core

1 Spark任务调度:

|->:standalone
    |->:local
    |->:Yarn
    |->:Mesos

2 Spark几个重要的组件

|->:Master:管理Worker,负责接收Driver发送的注册信息(任务信息)
        |->:Worker:负责本节点资源和任务的管理,启动Exector进程
        |->:Exector:负责计算任务
        |->:Driver:用来提交任务(SparkSubmit进程)

3 Spark Core: RDD的概念和特性

数据的描述
    1):一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
    2):一个计算每一个分区的函数。Spark中RDD的计算是以分片为单位的,每一个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不须要保存每次计算的结果。
    3):RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。
    4):一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD自己的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
    5):一个列表,存储存取每一个Partition的优先位置(preferred location)。对于一个HDFS文件来讲,这个列表保存的就是每一个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽量地将计算任务分配到其所要处理数据块的存储位置。

        基本特性:可分区,函数,依赖,分区器,就近原则
RDD的弹性
    1): 自动进行内存和磁盘数据存储的切换
        Spark优先把数据放到内存中,若是内存放不下,就会放到磁盘里面,程序进行自动的存储切换
    2): 基于血统的高效容错机制
        在RDD进行转换和动做的时候,会造成RDD的Lineage依赖链,当某一个RDD失效的时候,能够经过从新计算上游的RDD来从新生成丢失的RDD数据。
    3): Task若是失败会自动进行特定次数的重试
        RDD的计算任务若是运行失败,会自动进行任务的从新计算,默认次数是4次。
    4): Stage若是失败会自动进行特定次数的重试
        若是Job的某个Stage阶段计算失败,框架也会自动进行任务的从新计算,默认次数也是4次。
    5): Checkpoint和Persist可主动或被动触发
        RDD能够经过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也能够将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的全部父RDD依赖都会被移除。
    6): 数据调度弹性
        Spark把这个JOB执行模型抽象为通用的有向无环图DAG,能够将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
    7): 数据分片的高度弹性
        能够根据业务的特征,动态调整数据分片的个数,提高总体的应用执行效率。
    RDD全称叫作弹性分布式数据集(Resilient Distributed Datasets):它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能经过其余RDD转换而建立,为此,RDD支持丰富的转换操做(如map, join, filter, groupBy等),经过这种转换操做,新的RDD则包含了如何从其余RDDs衍生所必需的信息,因此说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会造成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是经过血缘关系(Lineage)一鼓作气的,即便出现数据分区丢失,也能够经过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组肯定性操做构成的DAG,而后写回稳定存储。另外RDD还能够将数据集缓存到内存中,使得在多个操做之间能够重用数据集,基于这个特色能够很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。能够说Spark最初也就是实现RDD的一个分布式系统,后面经过不断发展壮大成为如今较为完善的大数据生态系统,简单来说,Spark-RDD的关系相似于Hadoop-MapReduce关系。

4 生成RDD的两种类型

1:从集合中建立RDD
        val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)
      //这两个方法都有第二参数是一个默认值2  分片数量(partition的数量)
      //scala集合经过makeRDD建立RDD,底层实现也是parallelize
      val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
     //scala集合经过parallelize建立RDD
      val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
    2:从外部存储建立RDD
      al rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")

5 RDD算子的两种类型

|->:transformation算子:转化成新RDD
    |->:Action算子:转化成非RDD

6 算子练习

|->迭代类型算子:map,flatMap,mapPartitions,foreach,foreachPartition...
        |->shuffle类算子:
                            |->byKey:groupBy,reduceByKey(不必定),groupByKey,sortBy,SortByKey...
                            |->重分区算子:repartition(必然发生shuffle),colaesce(不必定,多分区变少分区不须要发生shuffle),partitionBy(发生shuffle),repartitionAndSortWithinPartitions
                            |->join类算子:join(不必定),fullOuterJoi,leftOuterJoin,rightOuterJoin
                            |->去重类算子:distinct,countApproxDistinct(返回去重的个数)
        |->聚合类算子:reduce,reduceByKey,aggregate,aggregateByKey,fold,foldByKey,combineByKey,combineByKey,countByKey,countByValue
        |->排序类算子:sortBy,sortByKey
优化: 
  1.map,mapPartition优化:必定要分数据量和对应的物力资源来肯定到底使用哪一个算子
        数据量 |       map   |   mapPartition
                |   每一个元素   |        每一个分区
      --------------------------------------
        比较大 |               |         优先选择
       海量数据   |     优先选择 |   可能发生OOM

  2.foreach,foreachPartition优化:须要考虑到持久化时可以承受的链接数
         场景 |     foreach       |    foreachPartition
                |       每一个元素      |     每一个分区
      ---------------------------------------------------------
      链接数据库   |   每一个元素对应一个链接    |    优先选择(一个分区对应一个链接)
       海量数据   |     优先选择           |    可能发生OOM

  3.groupByKey,reduceByKey:若是能用reduceByKey解决的需求就用reduceByKey
         场景    |    groupByKey      |    reduceByKey(局部聚合)
      ---------------------------------------------------------
                 |                |       优先选择
  4.join+filter(过滤):为了不join过程产生很大的数据集的状况,能够先filter再join
    filter:过滤后再计算可能发生严重的数据倾斜,能够在过滤后先调整
  5.序列化调优:
    :RDD在计算过程当中,调用的算子和传入算子的函数都是在Executor端执行,除此以外都是在Driver端执行的
class SearchFunction(val query: String) extends Serializable {
  //第一个方法是判断输入的字符串是否存在query 存在返回true,不存在返回false
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  // 问题:"isMatch"表示"this.isMatch",所以咱们要传递整个"this"
  def getMatchFunctionReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => this.isMatch(x))// 等价于:rdd.filter(isMatch)
  // 问题:"query"表示"this.query",所以咱们要传递整个"this"
  def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => x.contains(this.query))
  // 安全:只把咱们须要的字段拿出来放入局部变量中
  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    val _query = this.query
    rdd.filter(x => x.contains(_query))
  }
}
object SearchFunctions {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(SearchFunctions.getClass.getName).setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
    val sf = new SearchFunction("hello")
    sf.getMatchFunctionReference(rdd).foreach(println)
    sf.getMatchesFieldReference(rdd).foreach(println)
    sf.getMatchesNoReference(rdd).foreach(println)
    sc.stop()
  }
}
class Rules extends Serializable {
  val rulesMap = Map("xiaoli" -> 23, "xiaoming" -> 26)
}
object ObjectRules extends Serializable {
  val rulesMap = Map("jack" -> 27, "lucy" -> 22)
}
object SerializeTest_1 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //map方法中的函数是在Executor的某个Task中执行的
    val res = lines.map(x => {
      val rules = new Rules
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
    ArrayBuffer(
    (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@5c3d762c),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@736d5f3b),
    (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@374cd5ba))
     */
    sc.stop()
  }
}
object SerializeTest_2 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //该对象在Driver中建立
    val rules = new Rules
    //map方法中的函数是在Executor的某个Task中执行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
  ArrayBuffer(
  (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@48158406),
  (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@a287af2),
  (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@a287af2))
     */
    sc.stop()
  }
}
object SerializeTest_3 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //该对象在Driver中建立单例对象
    val rules = ObjectRules
    //map方法中的函数是在Executor的某个Task中执行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593))
     */
    sc.stop()
  }
}
object SerializeTest_4 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //该对象在Driver中建立单例对象
        //map方法中的函数是在Executor的某个Task中执行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      /*
      不用在Driver端去建立对象,Rules不用实现序列化
       */
      (hostname, threadName, ObjectRules.rulesMap.getOrElse(x, 0), ObjectRules.toString)
    })
    println(res.collect.toBuffer)
    /*
    ArrayBuffer(
    (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6))
     */
    sc.stop()
  }
}

7 分区

textFile分片过程:由指定的cpu核数+指定的分区数+block块的大小+文件的个数,通过分片算法获得最终的分区数

8 RDD的依赖关系

|->宽依赖:一对多  一个父RDD分区会被多个子RDD使用
        |->窄依赖:一对一,多对一
        |->为何区分宽窄依赖:
                    |->1:有宽窄依赖就能够进行相应的容错
                    |->2:宽依赖决定了stage的划分的依据

9 DAG

为何划分stage:主要是为了生成task,stage划分过程实际上就将rdd的依赖按照shuffle来分为一个到多个的范围,task执行过程根本不会跨stage
        task数量 = stage数量 * 分区数(注:前提是没有手动更改分区数)
        若是手动更改分区数,该stage的task数据由最后的分区数决定的
相关文章
相关标签/搜索