大数据开发-Spark-RDD的持久化和缓存

1.RDD缓存机制 cache, persist

Spark 速度很是快的一个缘由是 RDD 支持缓存。成功缓存后,若是以后的操做使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,可是因为 RDD 之间的依赖关系,若是某个分区的缓存数据丢失,只须要从新计算该分区便可。算法

涉及到的算子:persist、cache、unpersist;都是 Transformation数组

缓存是将计算结果写入不一样的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆
外内存、磁盘);缓存

经过缓存,Spark避免了RDD上的重复计算,可以极大地提高计算速度;
RDD持久化或缓存,是Spark最重要的特征之一。能够说,缓存是Spark构建迭代式算法和快速交互式查询的关键因
素;大数据

Spark速度很是快的缘由之一,就是在内存中持久化(或缓存)一个数据集。当持久化一个RDD后,每个节点都将
把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其余动做(Action)中重用。这使
得后续的动做变得更加迅速;使用persist()方法对一个RDD标记为持久化。之因此说“标记为持久化”,是由于出现persist()语句的地方,并不会马
上计算生成RDD并把它持久化,而是要等到遇到第一个行动操做触发真正计算之后,才会把计算结果进行持久化;经过persist()或cache()方法能够标记一个要被持久化的RDD,持久化被触发,RDD将会被保留在计算节点的内存中
并重用;人工智能

何时缓存数据,须要对空间和速度进行权衡。通常状况下,若是多个动做须要用到某个 RDD,而它的计算代价
又很高,那么就应该把这个 RDD 缓存起来;spa

缓存有可能丢失,或者存储于内存的数据因为内存不足而被删除。RDD的缓存的容错机制保证了即便缓存丢失也能保
证计算的正确执行。经过基于RDD的一系列的转换,丢失的数据会被重算。RDD的各个Partition是相对独立的,所以
只须要计算丢失的部分便可,并不须要重算所有Partition。3d

启动堆外内存须要配置两个参数:code

  • spark.memory.offHeap.enabled :是否开启堆外内存,默认值为 false,须要设置为 true;
  • spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。

1.1 缓存级别

Spark 速度很是快的一个缘由是 RDD 支持缓存。成功缓存后,若是以后的操做使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,可是因为 RDD 之间的依赖关系,若是某个分区的缓存数据丢失,只须要从新计算该分区便可。orm

file

file

Spark 支持多种缓存级别 :对象

Storage Level(存储级别) Meaning(含义)
MEMORY_ONLY 默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。若是内存空间不够,则部分分区数据将再也不缓存。
MEMORY_AND_DISK 将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。若是内存空间不够,将未缓存的分区数据存储到磁盘,在须要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER 将 RDD 以序列化的 Java 对象的形式进行存储(每一个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增长 CPU 的计算负担。仅支持 Java 和 Scala 。
MEMORY_AND_DISK_SER 相似于 MEMORY_ONLY_SER,可是溢出的分区数据会存储到磁盘,而不是在用到它们时从新计算。仅支持 Java 和 Scala。
DISK_ONLY 只在磁盘上缓存 RDD
MEMORY_ONLY_2, MEMORY_AND_DISK_2 与上面的对应级别功能相同,可是会为每一个分区在集群中的两个节点上创建副本。
OFF_HEAP MEMORY_ONLY_SER 相似,但将数据存储在堆外内存中。这须要启用堆外内存。

启动堆外内存须要配置两个参数:

  • spark.memory.offHeap.enabled :是否开启堆外内存,默认值为 false,须要设置为 true;
  • spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。

1.2 使用缓存

缓存数据的方法有两个:persistcachecache 内部调用的也是 persist,它是 persist 的特殊化形式,等价于 persist(StorageLevel.MEMORY_ONLY)。示例以下:

// 全部存储级别均定义在 StorageLevel 对象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

被缓存的RDD在DAG图中有一个绿色的圆点。

file

1.3 移除缓存

Spark 会自动监视每一个节点上的缓存使用状况,并按照最近最少使用(LRU)的规则删除旧数据分区。固然,你也可使用 RDD.unpersist() 方法进行手动删除。

2.RDD容错机制Checkpoint

2.1 涉及到的算子:checkpoint;也是 Transformation

Spark中对于数据的保存除了持久化操做以外,还提供了检查点的机制;检查点本质是经过将RDD写入高可靠的磁盘,主要目的是为了容错。检查点经过将数据写入到HDFS文件系统实现了

RDD的检查点功能。Lineage过长会形成容错成本太高,这样就不如在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从

作检查点的RDD开始重作Lineage,就会减小开销。

2.2 cache 和 checkpoint 区别

cache 和 checkpoint 是有显著区别的,缓存把 RDD 计算出来而后放在内存中,可是 RDD 的依赖链不能丢掉, 当某个点某个 executor 宕了,上面 cache 的RDD就会丢掉, 须要经过依赖链重放计算。不一样的是,checkpoint 是把

RDD 保存在 HDFS中,是多副本可靠存储,此时依赖链能够丢掉,因此斩断了依赖链。

2.3 checkpoint适合场景

如下场景适合使用检查点机制:

  1. DAG中的Lineage过长,若是重算,则开销太大

  2. 在宽依赖上作 Checkpoint 得到的收益更大

与cache相似 checkpoint 也是 lazy 的。

val rdd1 = sc.parallelize(1 to 100000)
// 设置检查点目录

sc.setCheckpointDir("/tmp/checkpoint")

val rdd2 = rdd1.map(_*2)

rdd2.checkpoint

// checkpoint是lazy操做

rdd2.isCheckpointed

// checkpoint以前的rdd依赖关系

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

// 执行一次action,触发checkpoint的执行

rdd2.count

rdd2.isCheckpointed

// 再次查看RDD的依赖关系。能够看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

//查看RDD所依赖的checkpoint文件

rdd2.getCheckpointFile

备注:checkpoint的文件做业执行完毕后不会被删除
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注
file

相关文章
相关标签/搜索