0七、RDD持久化


为了不屡次计算同一个RDD(如上面的同一result RDD就调用了两次Action操做),可让Spark对数据进行持久化。当咱们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。若是一个有持久化数据的节点发生故障,Spark会在须要用到缓存的数据时重算丢失的数据分区。

Spark很是重要的一个功能特性就是能够将RDD持久化在内存中。当对RDD执行持久化操做时,每一个节点都会将本身操做的RDD的partition持久化到内存中,而且在以后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操做的场景,就只要对RDD计算一次便可,后面直接使用该RDD,而不须要反复计算屡次该RDD。
 
巧妙使用RDD持久化,甚至在某些场景下,能够将spark应用程序的性能提高10倍。对于迭代式算法和快速交互式应用来讲,RDD持久化,是很是重要的。
 
要持久化一个RDD,只要调用其 cache()或者 persist()方法便可。在该RDD第一次被计算出来时,就会直接缓存在每一个节点中。并且Spark的持久化机制仍是自动容错的,若是持久化的RDD的任何partition丢失了,那么Spark会自动经过其源RDD,使用transformation操做从新计算该partition。
 
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。若是须要从内存中清理缓存,那么可使用unpersist()方法。
 
Spark本身也会在shuffle操做时,进行数据的持久化,好比写入磁盘,主要是为了在节点失败时,避免须要从新计算整个过程。
 
 RDD持久化策略
 RDD持久化是能够手动选择不一样的策略的。好比能够将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorageLevel便可。

持久化级别 html

 

MEMORY_ONLY 算法

以非序列化的Java对象的方式持久化在JVM内存中。若是内存没法彻底存储RDD全部的partition,那么那些没有持久化的partition就会在下一次须要使用它的时候,从新被计算 缓存

MEMORY_AND_DISK app

同上,可是当某些partition没法存储在内存中时,会持久化到磁盘中。下次须要使用这些partition时,须要从磁盘上读取 ide

MEMORY_ONLY_SER 性能

MEMORY_ONLY,可是会使用Java序列化方式,将Java对象序列化后进行持久化。能够减小内存开销,可是须要进行反序列化,所以会加大CPU开销 ui

MEMORY_AND_DSK_SER spa

MEMORY_AND_DSK。可是使用序列化方式持久化Java对象 3d

DISK_ONLY orm

使用非序列化Java对象的方式持久化,彻底存储到磁盘上

MEMORY_ONLY_2

MEMORY_AND_DISK_2

等等

若是是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其余节点,从而在数据丢失时,不须要再次计算,只须要使用备份数据便可

 

如何选择RDD持久化策略?
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:
 
一、优先使用MEMORY_ONLY,若是能够缓存全部数据的话,那么就使用这种策略。由于纯内存速度最快,并且没有序列化,不须要消耗CPU进行反序列化操做。
二、若是MEMORY_ONLY策略,没法存储的下全部数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操做仍是很是快,只是要消耗CPU进行反序列化。
三、若是须要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不须要从新计算了。
四、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如从新计算一次。
 
 正确调用:

注意,须要在第一次调用Acton操做以前就要调用  persist() 方法。
若是要缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就须要从新计算。可是对于使用内存与磁盘的缓存级别(MEMORY_AND_DISK、MEMORY_AND_DISK_SER)的分区来讲,被移除的分区都会写入磁盘。不论哪种状况,都没必要担忧你的做业由于缓存了太多数据而被打断。
 
/**
 * RDD持久化
 */
public class Persist {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Persist").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // cache()或者persist()的使用,是有规则的
        // 必须在transformation或者textFile等建立了一个RDD以后,直接连续调用cache()或persist()才能够
        // 若是你先建立一个RDD,而后单独另起一行执行cache()或persist()方法,是没有用的
        // 并且,会报错,大量的文件会丢失
        JavaRDD<String> lines = sc.textFile("test.txt").cache();
        long beginTime = System.currentTimeMillis();
        long count = lines.count();
        System.out.println(count);
        long endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
        beginTime = System.currentTimeMillis();
        count = lines.count();
        System.out.println(count);
        endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
        sc.close();
    }
}
相关文章
相关标签/搜索