RDD
全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操做,能够由外部数据集或其余 RDD 转换而来,它具备如下特性:html
RDD[T]
抽象类的部分相关代码以下:java
// 由子类实现以计算给定分区 def compute(split: Partition, context: TaskContext): Iterator[T] // 获取全部分区 protected def getPartitions: Array[Partition] // 获取全部依赖关系 protected def getDependencies: Seq[Dependency[_]] = deps // 获取优先位置列表 protected def getPreferredLocations(split: Partition): Seq[String] = Nil // 分区器 由子类重写以指定它们的分区方式 @transient val partitioner: Option[Partitioner] = None
RDD 有两种建立方式,分别介绍以下:git
这里使用 spark-shell
进行测试,启动命令以下:github
spark-shell --master local[4]
启动 spark-shell
后,程序会自动建立应用上下文,至关于执行了下面的 Scala 语句:shell
val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]") val sc = new SparkContext(conf)
由现有集合建立 RDD,你能够在建立时指定其分区个数,若是没有指定,则采用程序所分配到的 CPU 的核心数:apache
val data = Array(1, 2, 3, 4, 5) // 由现有集合建立 RDD,默认分区数为程序所分配到的 CPU 的核心数 val dataRDD = sc.parallelize(data) // 查看分区数 dataRDD.getNumPartitions // 明确指定分区数 val dataRDD = sc.parallelize(data,2)
执行结果以下:编程
引用外部存储系统中的数据集,例如本地文件系统,HDFS,HBase 或支持 Hadoop InputFormat 的任何数据源。数组
val fileRDD = sc.textFile("/usr/file/emp.txt") // 获取第一行文本 fileRDD.take(1)
使用外部存储系统时须要注意如下两点:缓存
二者均可以用来读取外部文件,可是返回格式是不一样的:网络
RDD[String]
,返回的是就是文件内容,RDD 中每个元素对应一行数据;RDD[(String, String)]
,元组中第一个参数是文件路径,第二个参数是文件内容;def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...} def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}
RDD 支持两种类型的操做:transformations(转换,从现有数据集建立新数据集)和 actions(在数据集上运行计算后将值返回到驱动程序)。RDD 中的全部转换操做都是惰性的,它们只是记住这些转换操做,但不会当即执行,只有遇到 action 操做后才会真正的进行计算,这相似于函数式编程中的惰性求值。
val list = List(1, 2, 3) // map 是一个 transformations 操做,而 foreach 是一个 actions 操做 sc.parallelize(list).map(_ * 10).foreach(println) // 输出: 10 20 30
Spark 速度很是快的一个缘由是 RDD 支持缓存。成功缓存后,若是以后的操做使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,可是因为 RDD 之间的依赖关系,若是某个分区的缓存数据丢失,只须要从新计算该分区便可。
Spark 支持多种缓存级别 :
Storage Level (存储级别) |
Meaning(含义) |
---|---|
MEMORY_ONLY |
默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。若是内存空间不够,则部分分区数据将再也不缓存。 |
MEMORY_AND_DISK |
将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。若是内存空间不够,将未缓存的分区数据存储到磁盘,在须要使用这些分区时从磁盘读取。 |
MEMORY_ONLY_SER |
将 RDD 以序列化的 Java 对象的形式进行存储(每一个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增长 CPU 的计算负担。仅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER |
相似于 MEMORY_ONLY_SER ,可是溢出的分区数据会存储到磁盘,而不是在用到它们时从新计算。仅支持 Java 和 Scala。 |
DISK_ONLY |
只在磁盘上缓存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 , etc |
与上面的对应级别功能相同,可是会为每一个分区在集群中的两个节点上创建副本。 |
OFF_HEAP |
与 MEMORY_ONLY_SER 相似,但将数据存储在堆外内存中。这须要启用堆外内存。 |
启动堆外内存须要配置两个参数:
- spark.memory.offHeap.enabled :是否开启堆外内存,默认值为 false,须要设置为 true;
- spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。
缓存数据的方法有两个:persist
和 cache
。cache
内部调用的也是 persist
,它是 persist
的特殊化形式,等价于 persist(StorageLevel.MEMORY_ONLY)
。示例以下:
// 全部存储级别均定义在 StorageLevel 对象中 fileRDD.persist(StorageLevel.MEMORY_AND_DISK) fileRDD.cache()
Spark 会自动监视每一个节点上的缓存使用状况,并按照最近最少使用(LRU)的规则删除旧数据分区。固然,你也可使用 RDD.unpersist()
方法进行手动删除。
在 Spark 中,一个任务对应一个分区,一般不会跨分区操做数据。但若是遇到 reduceByKey
等操做,Spark 必须从全部分区读取数据,并查找全部键的全部值,而后汇总在一块儿以计算每一个键的最终结果 ,这称为 Shuffle
。
Shuffle 是一项昂贵的操做,由于它一般会跨节点操做数据,这会涉及磁盘 I/O,网络 I/O,和数据序列化。某些 Shuffle 操做还会消耗大量的堆内存,由于它们使用堆内存来临时存储须要网络传输的数据。Shuffle 还会在磁盘上生成大量中间文件,从 Spark 1.3 开始,这些文件将被保留,直到相应的 RDD 再也不使用并进行垃圾回收,这样作是为了不在计算时重复建立 Shuffle 文件。若是应用程序长期保留对这些 RDD 的引用,则垃圾回收可能在很长一段时间后才会发生,这意味着长时间运行的 Spark 做业可能会占用大量磁盘空间,一般可使用 spark.local.dir
参数来指定这些临时文件的存储目录。
因为 Shuffle 操做对性能的影响比较大,因此须要特别注意使用,如下操做都会致使 Shuffle:
repartition
和 coalesce
;groupByKey
和 reduceByKey
,但 countByKey
除外;cogroup
和 join
。RDD 和它的父 RDD(s) 之间的依赖关系分为两种不一样的类型:
以下图,每个方框表示一个 RDD,带有颜色的矩形表示分区:
区分这两种依赖是很是有用的:
RDD(s) 及其之间的依赖关系组成了 DAG(有向无环图),DAG 定义了这些 RDD(s) 之间的 Lineage(血统) 关系,经过血统关系,若是一个 RDD 的部分或者所有计算结果丢失了,也能够从新进行计算。那么 Spark 是如何根据 DAG 来生成计算任务呢?主要是根据依赖关系的不一样将 DAG 划分为不一样的计算阶段 (Stage):
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南