sparkCore-RDD详解

1.1 什么是RDD

1.1.1 产生背景

当初设计RDD主要是为了解决三个问题:html

  • Fast:Spark以前的Hadoop用的是MapReduce的编程模型,没有很好的利用分布式内存系统,中间结果都须要保存到external disk,运行效率很低。RDD模型是in-memory computing的,中间结果不须要被物化(materialized),它的persistence机制,能够保存中间结果重复使用,对须要迭代运算的机器学习应用和交互式数据挖掘应用,加速显著。Spark快还有一个缘由是开头提到过的Delay Scheduling机制,它得益于RDD的Dependency设计。
  • General: MapReduce编程模型只能提供有限的运算种类(Map和Reduce),RDD但愿支持更普遍更多样的operators(map,flatMap,filter等等),而后用户能够任意地组合他们。

The ability of RDDs to accommodate computing needs that were previously met only by introducing new frameworks is, we believe, the most credible evidence of the power of the RDD abstraction.java

  • Fault tolerance: 其余的in-memory storage on clusters,基本单元是可变的,用细粒度更新(fine-grained updates)方式改变状态,如改变table/cell里面的值,这种模型的容错只能经过复制多个数据copy,须要传输大量的数据,容错效率低下。而RDD是不可变的immutable,经过粗粒度变换(coarse-grained transformations),好比map,filter和join,能够把相同的运算同时做用在许多数据单元上,这样的变换只会产生新的RDD而不改变旧的RDD。这种模型可让Spark用Lineage很高效地容错(后面会有介绍)。

1.1.2 RDD定义

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents(表明) an immutable(不变的),partitioned collection of elements that can be operated on in parallel

RDD是spark的核心,也是整个spark的架构基础,RDD是弹性分布式集合(Resilient Distributed Datasets)的简称。node

1.1.3 RDD特色

  • immutable:只读,任何操做都不会改变RDD自己,只会创造新的RDDgit

  • fault-tolerant:容错,经过Lineage能够高效容错github

  • partitioned:分片,RDD以partition做为最小存储和计算单元,分布在cluster的不一样nodes上,一个node能够有多个partitions,一个partition只能在一个node上web

  • in parallel:并行,一个Task对应一个partition,Tasks之间相互独立能够并行计算apache

  • persistence:持久化,用户能够把会被重复使用的RDDs保存到storage上(内存或者磁盘)编程

  • partitioning:分区,用户能够选择RDD元素被partitioned的方式来优化计算,好比两个须要被join的数据集能够用相同的方式作hash-partitioned,这样能够减小shuffle提升性能缓存

1.1.4 RDD抽象概念

一个RDD定义了对数据的一个操做过程, 用户提交的计算任务能够由多个RDD构成。多个RDD能够是对单个/多个数据的多个操做过程。多个RDD之间的关系使用依赖来表达。操做过程就是用户自定义的函数。服务器

RDD(弹性分布式数据集)去掉形容词,主体为:数据集。若是认为RDD就是数据集,那就有点理解错了。我的认为:RDD是定义对partition数据项转变的高阶函数,应用到输入源数据,输出转变后的数据,即:RDD是一个数据集到另一个数据集的映射,而不是数据自己。 这个概念相似数学里的函数f(x) = ax^2 + bx + c。这个映射函数能够被序列化,所要被处理的数据被分区后分布在不一样的机器上,应用一下这个映射函数,得出结果,聚合结果。

这些集合是弹性的,若是数据集一部分丢失,则能够对它们进行重建。具备自动容错、位置感知调度和可伸缩性,而容错性是最难实现的,大多数分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。对于大规模数据分析系统,数据检查点操做成本高,主要缘由是大规模数据在服务器之间的传输带来的各方面的问题,相比记录数据的更新,RDD也只支持粗粒度的转换共享状态而非细粒度的更新共享状态,也就是记录如何从其余RDD转换而来(即lineage),以便恢复丢失的分区。

RDDs 很是适合将相同操做应用在整个数据集的全部的元素上的批处理应用. 在这些场景下, RDDs 能够利用血缘关系图来高效的记住每个 transformations 的步骤, 而且不须要记录大量的数据就能够恢复丢失的分区数据. RDDs 不太适合用于须要异步且细粒度的更新共享状态的应用, 好比一个 web 应用或者数据递增的 web 爬虫应用的存储系统。

1.2 RDD特色

Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

每一个特性都对应RDD.scala中的一个方法实现:

  • a list of partition 由多个机器里面的partition组成的

  • a function for computing each split 并行计算

  • a list of dependencies on other RDDS rdd间存在依赖关系,记录数据转换间的依赖

  • a partitioner for key-vaue RDDS 可进行从新分区(只有key value的partition有)

  • a list of preferred locations to compute each spilt on 用最指望的位置进行计算

1.3 RDD操做

1.3.1 RDD建立

  1. parallelize:从普通Scala集合建立
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  1. 从Hadoop文件系统或与Hadoop兼容的其余持久化存储系统建立,如Hive、HBase
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
  1. 从父RDD转换获得新的RDD
val fromParent=distFile.map(s=>s.length)

1.3.2 操做方式

RDD在宏观来看相似于java中对象的概念,咱们在java中对对象上做用一系列操做(方法)获得最终结果。一样的咱们在RDD上进行一系列操做(算子)将一个RDD转换为另外一个RDD,最终获得咱们所须要的RDD。RDD算子主要包括:

  • Transformation算子:Transformation操做是延迟计算的,即从一个RDD转换成另外一个RDD的转换操做不是 立刻执行,须要等到有Action操做时,才真正出发执行,如Map、Filter等操做

  • Action算子:Action算子会出发Spark提交做业(Job),并将数据输出到Spark系统,如collect、count等

RDD操做特色 惰性求值

transformation算子做用在RDD时,并非当即触发计算,只是记录须要操做的指令。等到有Action算子出现时才真正开始触发计算。

textFile等读取数据操做和persist和cache缓存操做也是惰性的

为何要使用惰性求值呢:使用惰性求值能够把一些操做合并到一块儿来减小数据的计算步骤,提升计算效率。

从惰性求值角度看RDD就是一组spark计算指令的列表

1.3.4 缓存策略

RDD的缓存策略在StorageLevel中实现,经过对是否序列化,是否存储多个副本等条件的组合造成了多种缓存方式。例如:MEMORY_ONLY_SER存储在内存中并进行序列化,当内存不足时,不进行本地化;MEMORY_AND_DISK_2优先存储内存中,内存中无空间时,存储在本地磁盘,并有两个副本。

class StorageLevel private(
    // 缓存方式
    private var _useDisk: Boolean, 		// 是否使用磁盘
    private var _useMemory: Boolean, 	// 是否使用内存
    private var _useOffHeap: Boolean,	// 是否使用堆外内存
    private var _deserialized: Boolean, // 是否序列化
    private var _replication: Int = 1)	// 存储副本,默认一个
  extends Externalizable {
      
  // 条件组合结果
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

策略选择顺序

  • 默认选择MEMORY_ONLY

  • 若是内存不足,选择MEMORY_ONLY_SER

  • 若是须要作容错,选择MEMORY_ONLY_SER_2

  • 若是中间计算RDD的代价比较大时,选择MEMORY_AND_DISK

控制操做

  1. persist操做,能够将RDD持久化到不一样层次的存储介质,以便后续操做重复使用。

   1)cache:RDD[T] 默认使用MEMORY_ONLY

   2)persist:RDD[T] 默认使用MEMORY_ONLY

   3)Persist(level:StorageLevel):RDD[T] eg: myRdd.persist(StorageLevels.MEMORY_ONLY_SER)

  1. checkpoint

  将RDD持久化到HDFS中,与persist操做不一样的是checkpoint会切断此RDD以前的依赖关系,而persist依然保留RDD的依赖关系。

1.3.5 RDD回收

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the `RDD.unpersist()` method.

spark有一个监控线程去检测内存使用状况,当内存不足时使用LRU进行淘汰old data,也能够经过RDD.unpersist()方法手动移除缓存。

1.3.6 RDD保存

  • saveAsTextFile()将RDD中的元素保存在指定目录中,这个目录位于任何Hadoop支持的存储系统中
  • saveAsObjectFile()将原RDD中的元素序列化成Java对象,存储在指定目录中
  • saveAsSequenceFile() 将键值对型RDD以SequenceFile的格式保存。键值对型RDD也能够以文本形式保存

须要注意的是,上面的方法都把一个目录名字做为入参,而后在这个目录为每一个RDD分区建立一个文件夹。这种设计不只能够高效并且可容错。由于每一个分区被存成一个文件,因此Spark在保存RDD的时候能够启动多个任务,并行执行,将数据写入文件系统中,这样也保证了写入数据的过程当中可容错,一旦有一个分区写入文件的任务失败了,Spark能够在重启一个任务,重写刚才失败任务建立的文件。

2. RDD详解

2.1 RDD分区

RDD 表示并行计算的计算单元是使用分区(Partition)

2.1.1 分区实现

RDD 内部的数据集合在逻辑上和物理上被划分红多个小子集合,这样的每个子集合咱们将其称为分区,分区的个数会决定并行计算的粒度,而每个分区数值的计算都是在一个单独的任务中进行,所以并行任务的个数,也是由 RDD(其实是一个阶段的末 RDD,调度章节会介绍)分区的个数决定的。

RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,经过 RDD 编号 + 分区编号能够惟一肯定该分区对应的块编号,利用底层数据存储层提供的接口,就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据

怎么切分是Partitioner定义的, Partitioner有两个接口: numPartitions分区数, getPartition(key: Any): Int根据传入的参数肯定分区号。实现了Partitioner的有:

  1. HashPartitioner
  2. RangePartitioner
  3. GridPartitioner
  4. PythonPartitioner

一个RDD有了Partitioner, 就能够对当前RDD持有的数据进行划分

2.1.2 分区个数

RDD 分区的一个分配原则是:尽量使得分区的个数等于集群的CPU核数

RDD 能够经过建立操做或者转换操做获得。转换操做中,分区的个数会根据转换操做对应多个 RDD 之间的依赖关系肯定,窄依赖子 RDD 由父 RDD 分区个数决定,Shuffle 依赖由子 RDD 分区器决定。

建立操做中,程序开发者能够手动指定分区的个数,例如 sc.parallelize (Array(1, 2, 3, 4, 5), 2) 表示建立获得的 RDD 分区个数为 2,在没有指定分区个数的状况下,Spark 会根据集群部署模式,来肯定一个分区个数默认值。

对于 parallelize 方法,默认状况下,分区的个数会受 Apache Spark 配置参数 spark.default.parallelism 的影响,不管是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式来运行 Apache Spark,分区的默认个数等于对 spark.default.parallelism 的指定值,若该值未设置,则 Apache Spark 会根据不一样集群模式的特征,来肯定这个值。

本地模式,默认分区个数等于本地机器的 CPU 核心总数(或者是用户经过 local[N] 参数指定分配给 Apache Spark 的核心数目),集群模式(Standalone 或者 Yarn)默认分区个数等于集群中全部核心数目的总和,或者 2,取二者中的较大值(conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)))

对于 textFile 方法,默认分区个数等于 min(defaultParallelism, 2)

2.1.3 分区内部记录个数

分区分配的另外一个分配原则是:尽量使同一 RDD 不一样分区内的记录的数量一致。

对于转换操做获得的 RDD,若是是窄依赖,则分区记录数量依赖于父 RDD 中相同编号分区是如何进行数据分配的,若是是 Shuffle 依赖,则分区记录数量依赖于选择的分区器,分区器有哈希分区和范围分区。哈希分区器没法保证数据被平均分配到各个分区,而范围分区器则能作到这一点

对于textFile 方法分区内数据的大小则是由 Hadoop API 接口 FileInputFormat.getSplits 方法决定(见 HadoopRDD 类),获得的每个分片即为 RDD 的一个分区,分片内数据的大小会受文件大小、文件是否可分割、HDFS 中块大小等因素的影响,但整体而言会是比较均衡的分配

2.2 RDD依赖

2.2.1 依赖与 RDD

RDD 的容错机制是经过记录更新来实现的,且记录的是粗粒度的转换操做。在外部,咱们将记录的信息称为血统(Lineage)关系,而到了源码级别,Apache Spark 记录的则是 RDD 之间的依赖Dependency关系。在一次转换操做中,建立获得的新 RDD 称为子 RDD,提供数据的 RDD 称为父 RDD,父 RDD 可能会存在多个,咱们把子 RDD 与父 RDD 之间的关系称为依赖关系,或者能够说是子 RDD 依赖于父 RDD。

依赖只保存父 RDD 信息,转换操做的其余信息,如数据处理函数,会在建立 RDD 时候,保存在新的 RDD 内。依赖在 Apache Spark 源码中的对应实现是 Dependency 抽象类,每一个 Dependency 子类内部都会存储一个 RDD 对象,对应一个父 RDD,若是一次转换转换操做有多个父 RDD,就会对应产生多个 Dependency 对象,全部的 Dependency 对象存储在子 RDD 内部,经过遍历 RDD 内部的 Dependency 对象,就能获取该 RDD 全部依赖的父 RDD。

2.2.2 依赖分类

Apache Spark 将依赖进一步分为两类,分别是窄依赖Narrow DependencyShuffle 依赖(Shuffle Dependency,在部分文献中也被称为 Wide Dependency,即宽依赖

窄依赖中,父 RDD 中的一个分区最多只会被子 RDD 中的一个分区使用,换句话说,父 RDD 中,一个分区内的数据是不能被分割的,必须整个交付给子 RDD 中的一个分区。

窄依赖可进一步分类成一对一依赖和范围依赖,对应实现分别是 OneToOneDependency 类和RangeDependency 类。一对一依赖表示子 RDD 分区的编号与父 RDD 分区的编号彻底一致的状况,若两个 RDD 之间存在着一对一依赖,则子 RDD 的分区个数、分区内记录的个数都将继承自父 RDD。范围依赖是依赖关系中的一个特例,只被用于表示 UnionRDD 与父 RDD 之间的依赖关系。相比一对一依赖,除了第一个父 RDD,其余父 RDD 和子 RDD 的分区编号再也不一致,Apache Spark 统一将unionRDD与父 RDD 之间(包含第一个 RDD)的关系都叫作范围依赖。

依赖类图:

graph TD
A[Dependency<br>依赖关系基类]--- B[NarrowDependency<br>窄依赖]
A---C[ShuffleDenpendency<br>shuffle依赖]
B---D[OneToOneDependency<br>一对一依赖]
B---E[RangeDependency<br>范围依赖]

下图展现了几类常见的窄依赖及其对应的转换操做。

窄依赖

Shuffle 依赖中,父 RDD 中的分区可能会被多个子 RDD 分区使用。由于父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的全部分区,所以 Shuffle 依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。下图展现了几类常见的Shuffle依赖及其对应的转换操做。

窄依赖

Shuffle 依赖的对应实现为ShuffleDependency 类,其实现比较复杂,主要经过如下成员完成:

  • rdd:用于表示 Shuffle 依赖中,子 RDD 所依赖的父 RDD。
  • shuffleId:Shuffle 的 ID 编号,在一个 Spark 应用程序中,每一个 Shuffle 的编号都是惟一的。
  • shuffleHandle:Shuffle 句柄,ShuffleHandle 内部通常包含 Shuffle ID、Mapper 的个数以及对应的 Shuffle 依赖,在执行 ShuffleMapTask 时候,任务能够经过 ShuffleManager 获取获得该句柄,并进一步获得 Shuffle 相关信息。
  • partitioner:分区器,用于决定 Shuffle 过程当中 Reducer 的个数(其实是子 RDD 的分区个数)以及 Map 端的一条数据记录应该分配给哪个 Reducer,也能够被用在 CoGroupedRDD 中,肯定父 RDD 与子 RDD 之间的依赖关系类型。
  • serializer:序列化器。用于 Shuffle 过程当中 Map 端数据的序列化和 Reduce 端数据的反序列化。
  • KeyOrdering:键值排序策略,用于决定子 RDD 的一个分区内,如何根据键值对 类型数据记录进行排序。
  • Aggregator:聚合器,内部包含了多个聚合函数,比较重要的函数有 createCombiner:V => CmergeValue: (C, V) => C 以及 mergeCombiners: (C, C) => C。例如,对于 groupByKey 操做,createCombiner 表示把第一个元素放入到集合中,mergeValue 表示一个元素添加到集合中,mergeCombiners 表示把两个集合进行合并。这些函数被用于 Shuffle 过程当中数据的聚合。
  • mapSideCombine:用于指定 Shuffle 过程当中是否须要在 map 端进行 combine 操做。若是指定该值为 true,因为 combine 操做须要用到聚合器中的相关聚合函数,所以 Aggregator 不能为空,不然 Apache Spark 会抛出异常。例如:groupByKey 转换操做对应的ShuffleDependency中,mapSideCombine = false,而 reduceByKey 转换操做中,mapSideCombine = true

依赖关系是两个 RDD 之间的依赖,所以若一次转换操做中父 RDD 有多个,则可能会同时包含窄依赖和 Shuffle 依赖,下图所示的 Join 操做,RDD a 和 RDD c 采用了相同的分区器,两个 RDD 之间是窄依赖,Rdd b 的分区器与 RDD c 不一样,所以它们之间是 Shuffle 依赖,具体实现可参见 CoGroupedRDD 类的 getDependencies 方法。这里可以再次发现:一个依赖对应的是两个 RDD,而不是一次转换操做。

窄依赖

2.2.3 依赖与容错机制

介绍完依赖的类别和实现以后,回过头来,从分区的角度继续探究 Apache Spark 是如何经过依赖关系来实现容错机制的。下图给出了一张依赖关系图,fileRDD 经历了 mapreduce 以及filter 三次转换操做,获得了最终的 RDD,其中,mapfilter 操做对应的依赖为窄依赖,reduce 操做对应的是 Shuffle 依赖。

fault-tolrarnt0

假设最终 RDD 第一块分区内的数据由于某些缘由丢失了,因为 RDD 内的每个分区都会记录其对应的父 RDD 分区的信息,所以沿着下图所示的依赖关系往回走,咱们就能找到该分区数据最终来源于 fileRDD 的全部分区,再沿着依赖关系日后计算路径中的每个分区数据,便可获得丢失的分区数据。

fault-tolrarnt0

这个例子并非特别严谨,按照咱们的思惟,只有执行了持久化,存储在存储介质中的 RDD 分区才会出现数据丢失的状况,可是上例中最终的 RDD 并无执行持久化操做。事实上,Apache Spark 将没有被持久化数据从新被计算,以及持久化的数据第一次被计算,也等价视为数据“丢失”,在 1.7 节中咱们会看到这一点。

2.2.4 依赖与并行计算

在上一节中咱们看到,在 RDD 中,能够经过计算链Computing Chain来计算某个 RDD 分区内的数据,咱们也知道分区是并行计算的基本单位,这时候可能会有一种想法:可否把 RDD 每一个分区内数据的计算当成一个并行任务,每一个并行任务包含一个计算链,将一个计算链交付给一个 CPU 核心去执行,集群中的 CPU 核心一块儿把 RDD 内的全部分区计算出来。

答案是能够,这得益于 RDD 内部分区的数据依赖相互之间并不会干扰,而 Apache Spark 也是这么作的,但在实现过程当中,仍有不少实际问题须要去考虑。进一步观察窄依赖、Shuffle 依赖在作并行计算时候的异同点。

先来看下方左侧的依赖图,依赖图中全部的依赖关系都是窄依赖(包括一对一依赖和范围依赖),能够看到,不只计算链是独立不干扰的(因此能够并行计算),全部计算链内的每一个分区单元的计算工做也不会发生重复,如右侧的图所示。这意味着除非执行了持久化操做,不然计算过程当中产生的中间数据咱们没有必要保留 —— 由于当前分区的数据只会给计算链中的下一个分区使用,而不用专门保留给其余计算链使用。

paralle1

再来观察 Shuffle 依赖的计算链,如图下方左侧的图中,既有窄依赖,又有 Shuffle 依赖,因为 Shuffle 依赖中,子 RDD 一个分区的数据依赖于父 RDD 内全部分区的数据,当咱们想计算末 RDD 中一个分区的数据时,Shuffle 依赖处须要把父 RDD 全部分区的数据计算出来,如右侧的图所示(紫色表示最后两个分区计算链共同通过的地方) —— 而这些数据,在计算末 RDD 另一个分区的数据时候,一样会被用到。

paralle2

若是咱们作到计算链的并行计算的话,这就意味着,要么 Shuffle 依赖处父 RDD 的数据在每次须要使用的时候都重复计算一遍,要么想办法把父 RDD 数据保存起来,提供给其他分区的数据计算使用。

Apache Spark 采用的是第二种办法,但保存数据的方法可能与想象中的会有所不一样,Spark 把计算链从 Shuffle 依赖处断开,划分红不一样的阶段Stage,阶段之间存在依赖关系(其实就是 Shuffle 依赖),从而能够构建一张不一样阶段之间的有向无环图DAG

2.3 RDD lineage

RDD的逻辑执行计划和物理执行计划详解...

RDD逻辑执行计划

RDD是经过一系列transformation操做进行计算的,而这些transformation操做造成的图就是DAG,也就是逻辑执行计划

RDD物理执行计划

根据RDD延迟计算特性,其真正在触发计算是在有output时发生的,outPutRdd上记录了其上级依赖的RDD,依次向前直到碰到inputRdd,这个经过依赖反向去获取RDD的过程造成的就是物理执行计划。

逻辑执行计划所包含的RDD和物理执行计划所包含的RDD不必定是对等的

能够经过toDebugString查看RDD的lineage

2.4 RDD 计算函数

前往查看详情...

2.5 RDD 分区器

前往查看详情...

参考

https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

https://ihainan.gitbooks.io/spark-source-code/content/section1/rddPartitions.html

http://spark.apachecn.org/paper/zh/spark-rdd.html

相关文章
相关标签/搜索