Spark 系列(三)—— 弹性式数据集RDDs

1、RDD简介

RDD 全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操做,能够由外部数据集或其余 RDD 转换而来,它具备如下特性:html

  • 一个 RDD 由一个或者多个分区(Partitions)组成。对于 RDD 来讲,每一个分区会被一个计算任务所处理,用户能够在建立 RDD 时指定其分区个数,若是没有指定,则默认采用程序所分配到的 CPU 的核心数;
  • RDD 拥有一个用于计算分区的函数 compute;
  • RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线同样。在部分分区数据丢失后,能够经过这种依赖关系从新计算丢失的分区数据,而不是对 RDD 的全部分区进行从新计算;
  • Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪一个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
  • 一个优先位置列表 (可选),用于存储每一个分区的优先位置 (prefered location)。对于一个 HDFS 文件来讲,这个列表保存的就是每一个分区所在的块的位置,按照“移动数据不如移动计算“的理念,Spark 在进行任务调度的时候,会尽量的将计算任务分配到其所要处理数据块的存储位置。

RDD[T] 抽象类的部分相关代码以下:java

// 由子类实现以计算给定分区
def compute(split: Partition, context: TaskContext): Iterator[T]

// 获取全部分区
protected def getPartitions: Array[Partition]

// 获取全部依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps

// 获取优先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 分区器 由子类重写以指定它们的分区方式
@transient val partitioner: Option[Partitioner] = None

2、建立RDD

RDD 有两种建立方式,分别介绍以下:git

2.1 由现有集合建立

这里使用 spark-shell 进行测试,启动命令以下:github

spark-shell --master local[4]

启动 spark-shell 后,程序会自动建立应用上下文,至关于执行了下面的 Scala 语句:shell

val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]")
val sc = new SparkContext(conf)

由现有集合建立 RDD,你能够在建立时指定其分区个数,若是没有指定,则采用程序所分配到的 CPU 的核心数:apache

val data = Array(1, 2, 3, 4, 5)
// 由现有集合建立 RDD,默认分区数为程序所分配到的 CPU 的核心数
val dataRDD = sc.parallelize(data) 
// 查看分区数
dataRDD.getNumPartitions
// 明确指定分区数
val dataRDD = sc.parallelize(data,2)

执行结果以下:编程

2.2 引用外部存储系统中的数据集

引用外部存储系统中的数据集,例如本地文件系统,HDFS,HBase 或支持 Hadoop InputFormat 的任何数据源。数组

val fileRDD = sc.textFile("/usr/file/emp.txt")
// 获取第一行文本
fileRDD.take(1)

使用外部存储系统时须要注意如下两点:缓存

  • 若是在集群环境下从本地文件系统读取数据,则要求该文件必须在集群中全部机器上都存在,且路径相同;
  • 支持目录路径,支持压缩文件,支持使用通配符。

2.3 textFile & wholeTextFiles

二者均可以用来读取外部文件,可是返回格式是不一样的:网络

  • textFile:其返回格式是 RDD[String] ,返回的是就是文件内容,RDD 中每个元素对应一行数据;
  • wholeTextFiles:其返回格式是 RDD[(String, String)],元组中第一个参数是文件路径,第二个参数是文件内容;
  • 二者都提供第二个参数来控制最小分区数;
  • 从 HDFS 上读取文件时,Spark 会为每一个块建立一个分区。
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}

3、操做RDD

RDD 支持两种类型的操做:transformations(转换,从现有数据集建立新数据集)和 actions(在数据集上运行计算后将值返回到驱动程序)。RDD 中的全部转换操做都是惰性的,它们只是记住这些转换操做,但不会当即执行,只有遇到 action 操做后才会真正的进行计算,这相似于函数式编程中的惰性求值。

val list = List(1, 2, 3)
// map 是一个 transformations 操做,而 foreach 是一个 actions 操做
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出: 10 20 30

4、缓存RDD

4.1 缓存级别

Spark 速度很是快的一个缘由是 RDD 支持缓存。成功缓存后,若是以后的操做使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,可是因为 RDD 之间的依赖关系,若是某个分区的缓存数据丢失,只须要从新计算该分区便可。

Spark 支持多种缓存级别 :

Storage Level
(存储级别)
Meaning(含义)
MEMORY_ONLY 默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。若是内存空间不够,则部分分区数据将再也不缓存。
MEMORY_AND_DISK 将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。若是内存空间不够,将未缓存的分区数据存储到磁盘,在须要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER
将 RDD 以序列化的 Java 对象的形式进行存储(每一个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增长 CPU 的计算负担。仅支持 Java 和 Scala 。
MEMORY_AND_DISK_SER
相似于 MEMORY_ONLY_SER,可是溢出的分区数据会存储到磁盘,而不是在用到它们时从新计算。仅支持 Java 和 Scala。
DISK_ONLY 只在磁盘上缓存 RDD
MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc
与上面的对应级别功能相同,可是会为每一个分区在集群中的两个节点上创建副本。
OFF_HEAP MEMORY_ONLY_SER 相似,但将数据存储在堆外内存中。这须要启用堆外内存。

启动堆外内存须要配置两个参数:

  • spark.memory.offHeap.enabled :是否开启堆外内存,默认值为 false,须要设置为 true;
  • spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。

4.2 使用缓存

缓存数据的方法有两个:persistcachecache 内部调用的也是 persist,它是 persist 的特殊化形式,等价于 persist(StorageLevel.MEMORY_ONLY)。示例以下:

// 全部存储级别均定义在 StorageLevel 对象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

4.3 移除缓存

Spark 会自动监视每一个节点上的缓存使用状况,并按照最近最少使用(LRU)的规则删除旧数据分区。固然,你也可使用 RDD.unpersist() 方法进行手动删除。

5、理解shuffle

5.1 shuffle介绍

在 Spark 中,一个任务对应一个分区,一般不会跨分区操做数据。但若是遇到 reduceByKey 等操做,Spark 必须从全部分区读取数据,并查找全部键的全部值,而后汇总在一块儿以计算每一个键的最终结果 ,这称为 Shuffle

5.2 Shuffle的影响

Shuffle 是一项昂贵的操做,由于它一般会跨节点操做数据,这会涉及磁盘 I/O,网络 I/O,和数据序列化。某些 Shuffle 操做还会消耗大量的堆内存,由于它们使用堆内存来临时存储须要网络传输的数据。Shuffle 还会在磁盘上生成大量中间文件,从 Spark 1.3 开始,这些文件将被保留,直到相应的 RDD 再也不使用并进行垃圾回收,这样作是为了不在计算时重复建立 Shuffle 文件。若是应用程序长期保留对这些 RDD 的引用,则垃圾回收可能在很长一段时间后才会发生,这意味着长时间运行的 Spark 做业可能会占用大量磁盘空间,一般可使用 spark.local.dir 参数来指定这些临时文件的存储目录。

5.3 致使Shuffle的操做

因为 Shuffle 操做对性能的影响比较大,因此须要特别注意使用,如下操做都会致使 Shuffle:

  • 涉及到从新分区操做: 如 repartitioncoalesce
  • 全部涉及到 ByKey 的操做:如 groupByKeyreduceByKey,但 countByKey 除外;
  • 联结操做:如 cogroupjoin

5、宽依赖和窄依赖

RDD 和它的父 RDD(s) 之间的依赖关系分为两种不一样的类型:

  • 窄依赖 (narrow dependency):父 RDDs 的一个分区最多被子 RDDs 一个分区所依赖;
  • 宽依赖 (wide dependency):父 RDDs 的一个分区能够被子 RDDs 的多个子分区所依赖。

以下图,每个方框表示一个 RDD,带有颜色的矩形表示分区:

区分这两种依赖是很是有用的:

  • 首先,窄依赖容许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算,例如先执行 map 操做,而后执行 filter 操做。而宽依赖则须要计算好全部父分区的数据,而后再在节点之间进行 Shuffle,这与 MapReduce 相似。
  • 窄依赖可以更有效地进行数据恢复,由于只需从新对丢失分区的父分区进行计算,且不一样节点之间能够并行计算;而对于宽依赖而言,若是数据丢失,则须要对全部父分区数据进行计算并再次 Shuffle。

6、DAG的生成

RDD(s) 及其之间的依赖关系组成了 DAG(有向无环图),DAG 定义了这些 RDD(s) 之间的 Lineage(血统) 关系,经过血统关系,若是一个 RDD 的部分或者所有计算结果丢失了,也能够从新进行计算。那么 Spark 是如何根据 DAG 来生成计算任务呢?主要是根据依赖关系的不一样将 DAG 划分为不一样的计算阶段 (Stage):

  • 对于窄依赖,因为分区的依赖关系是肯定的,其转换操做能够在同一个线程执行,因此能够划分到同一个执行阶段;
  • 对于宽依赖,因为 Shuffle 的存在,只能在父 RDD(s) 被 Shuffle 处理完成后,才能开始接下来的计算,所以遇到宽依赖就须要从新划分阶段。

参考资料

  1. 张安站 . Spark 技术内幕:深刻解析 Spark 内核架构设计与实现原理[M] . 机械工业出版社 . 2015-09-01
  2. RDD Programming Guide
  3. RDD:基于内存的集群计算容错抽象

更多大数据系列文章能够参见 GitHub 开源项目大数据入门指南

相关文章
相关标签/搜索