其余更多java基础文章:
java基础学习(目录)java
RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫作弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它表明一个不可变、只读的,被分区的数据集。操做 RDD 就像操做本地集合同样,有不少的方法能够调用,使用方便,而无需关心底层的调度细节。缓存
Spark Core为咱们提供了三种建立RDD的方式,包括:安全
spark程序须要作的第一件事情,就是建立一个SparkContext对象,它将告诉spark如何访问一个集群,而要建立一个SparkContext对象,你首先要建立一个SparkConf对象,该对象访问了你的应用程序的信息,好比下面的代码:bash
SparkConf conf=new SparkConf();
conf.set("参数", "参数值"); //由于jvm没法得到足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
复制代码
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
//并行集合,是经过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
JavaRDD<Integer> distData = sc.parallelize(data);
复制代码
//经过hdfs上的文件定义一个RDD 这个数据暂时尚未加载到内存,也没有在上面执行动做,lines仅仅指向这个文件
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
复制代码
RDD支持两种类型的操做算子:Transformation与Action。jvm
Transformation操做会由一个RDD生成一个新的 RDD。Transformation操做是延迟计算的,也就是说从一个RDD转换生成另外一个RDD的转换操做不是立刻执行,须要等到Actions操做时,才真正开始运算。分布式
Action操做会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。函数
例如,first() 就是的一个行动操做,它会返回 RDD 的第一个元素。post
result = testlines.first()
复制代码
transformations操做和Action操做的区别在于Spark计算RDD 的方式不一样。对于在任什么时候候transformations获得的新的RDD,Spark只会惰性计算。只有在一个Action操做中用到时,才会真正计算。这种策略也是spark性能高的部分缘由。性能
好比,咱们读取一个文本文件建立一个RDD,而后把其中包含spark的行筛选出来。若是Spark在咱们运行lines = sc.textFile(test.txt) 时就把文件中全部的行都读取到内存中并存储起来,内存开销会很大,而咱们接下来的操做会筛选掉其中的不少数据。相反, 若是Spark 在知道了完整的转化操做链以后,它就能够只计算求结果时真正须要的数据。
事实上,在执行行动操做 first()时,Spark也只是扫描文件直到找到第一个匹配的行为止,而不是读取整个文件。
RDD还有一个叫持久化的机制,就是在不一样操做间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动做中重用。这将使得后续的动做(action)变得更加迅速。缓存是用Spark构建迭代算法的关键。RDD的缓存可以在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。经过缓存,Spark避免了RDD上的重复计算,可以极大地提高计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
复制代码
查看其构造参数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
复制代码
能够看到StorageLevel类的主构造器包含了5个参数:
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。例如:
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
复制代码
就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),而且在多个节点上备份2份(正常的RDD只有一份)
注意: 持久化的单位为Partition
注意: 当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当从新的使用RDD时候,其余的partition会根据依赖关系从新计算
Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不一样权衡。咱们推荐经过下面的过程选择一个合适的存储级别:
当业务场景很是的复杂的时候,RDD的lineage(血统)依赖会很是的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖从新的计算丢失的RDD,这样会形成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。
Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)
累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。
有时会变量是在driver端建立的,可是由于须要在excutor端使用,因此driver会把变量以task的形式发送到excutor端,若是有不少个task,就会有不少给excutor端携带不少个变量,若是这个变量很是大的时候,就可能会形成内存溢出(以下图所示)。这个时候就引出了广播变量。
int factor = 3;
final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);
...
//在RDD计算中
int factor = factorBroadcast.value();
复制代码
另外,为了确保全部的节点得到相同的变量,对象factorBroadcast广播后只读不可以被修改。
注意事项:
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操做。Accumulator只提供了累加的功能,可是却给咱们提供了多个task对一个变量并行操做的功能。task只能对Accumulator进行累加操做,不能读取它的值。只有Driver程序能够读取Accumulator的值。
final Accumulator<Integer> sum = sc.accumulator(0);
...
//RDD计算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());
复制代码
累加器只能由Spark内部进行更新,并保证每一个任务在累加器的更新操做仅执行一次,也就是说重启任务也不该该更新。在转换操做中,用户必须意识到任务和做业的调度过程从新执行会形成累加器的屡次更新。
累加器一样具备Spark懒加载的求值模型。若是它们在RDD的操做中进行更新,它们的值只在RDD进行行动操做时才进行更新。