Spark学习(二)——RDD基础

其余更多java基础文章:
java基础学习(目录)java


RDD的建立与方法(代码详细)算法

1. RDD概述

RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫作弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它表明一个不可变、只读的,被分区的数据集。操做 RDD 就像操做本地集合同样,有不少的方法能够调用,使用方便,而无需关心底层的调度细节。缓存

2. RDD的建立

Spark Core为咱们提供了三种建立RDD的方式,包括:安全

  1. 使用程序中的集合建立RDD
  2. 使用HDFS文件建立RDD

2.1 Spark初始化

spark程序须要作的第一件事情,就是建立一个SparkContext对象,它将告诉spark如何访问一个集群,而要建立一个SparkContext对象,你首先要建立一个SparkConf对象,该对象访问了你的应用程序的信息,好比下面的代码:bash

SparkConf conf=new SparkConf();
        conf.set("参数", "参数值");     //由于jvm没法得到足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
复制代码

2.2 使用程序中的集合建立RDD

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是经过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
复制代码

2.3 使用HDFS文件建立RDD

//经过hdfs上的文件定义一个RDD 这个数据暂时尚未加载到内存,也没有在上面执行动做,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
复制代码

3. RDD的两种算子

RDD支持两种类型的操做算子:Transformation与Action。jvm

3.1 Transformation

Transformation操做会由一个RDD生成一个新的 RDD。Transformation操做是延迟计算的,也就是说从一个RDD转换生成另外一个RDD的转换操做不是立刻执行,须要等到Actions操做时,才真正开始运算。分布式

3.2 Action

Action操做会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。函数

例如,first() 就是的一个行动操做,它会返回 RDD 的第一个元素。post

result = testlines.first()
复制代码

transformations操做和Action操做的区别在于Spark计算RDD 的方式不一样。对于在任什么时候候transformations获得的新的RDD,Spark只会惰性计算。只有在一个Action操做中用到时,才会真正计算。这种策略也是spark性能高的部分缘由。性能

好比,咱们读取一个文本文件建立一个RDD,而后把其中包含spark的行筛选出来。若是Spark在咱们运行lines = sc.textFile(test.txt) 时就把文件中全部的行都读取到内存中并存储起来,内存开销会很大,而咱们接下来的操做会筛选掉其中的不少数据。相反, 若是Spark 在知道了完整的转化操做链以后,它就能够只计算求结果时真正须要的数据。

事实上,在执行行动操做 first()时,Spark也只是扫描文件直到找到第一个匹配的行为止,而不是读取整个文件。

3.3 RDD经常使用算子

Spark32个经常使用算子总结
Spark经常使用算子详解

4. RDD的持久化机制

blog.csdn.net/weixin_3560…

RDD还有一个叫持久化的机制,就是在不一样操做间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动做中重用。这将使得后续的动做(action)变得更加迅速。缓存是用Spark构建迭代算法的关键。RDD的缓存可以在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。经过缓存,Spark避免了RDD上的重复计算,可以极大地提高计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。

4.1 RDD持久化机制

  • Spark很是重要的一个功能特性就是能够将RDD 持久化在内存中,当对RDD执行持久化操做时,每一个节点都会将本身操做的RDD的partition持久化到内存中,而且在以后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操做的场景,就只要对RDD计算一次便可,后面直接使用该RDD ,而不须要计算屡次该RDD
  • 要持久化一个RDD,只要调用其cache()或者persist()方法便可。可是并非这两个方法被调用时当即缓存,在该RDD第一次被计算出来时(触发后面的action时),该RDD将会被缓存在计算节点的内存中,并供后面重用。并且Spark的持久化机制仍是自动容错的,若是持久化的RDD的任何partition丢失了,那么Spark会自动经过其源RDD,使用transformation操做从新计算该partition。
  • cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。若是须要从内存中去除缓存,那么可使用unpersist()方法。

4.2 RDD的持久化的级别

顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}
复制代码

查看其构造参数

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}
复制代码

能够看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆之外的内存,这些内存直接受操做系统管理(而不是虚拟机)。这样作的结果就是能保持一个较小的堆,以减小垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,能够将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象能够节省磁盘或内存的空间,通常 序列化:反序列化=1:3
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。例如:

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
复制代码

就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),而且在多个节点上备份2份(正常的RDD只有一份)

注意: 持久化的单位为Partition
注意: 当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当从新的使用RDD时候,其余的partition会根据依赖关系从新计算

4.3 选择持久化级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不一样权衡。咱们推荐经过下面的过程选择一个合适的存储级别:

  1. 若是你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。由于这是cpu利用率最高的选项,会使RDD上的操做尽量的快。
  2. 若是不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提升对象的空间使用率,可是仍可以至关快的访问。
  3. 除非函数计算RDD的花费较大或者它们须要过滤大量的数据,不要将RDD存储到磁盘上,不然,重复计算一个分区就会和重磁盘上读取数据同样慢。
  4. 若是你但愿更快的错误恢复,能够利用重复存储级别。全部的存储级别均可以经过重复计算丢失的数据来支持完整的容错,可是重复的数据可以使你在RDD上继续运行任务,而不须要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具备以下优点:
    • 它运行多个执行者共享Tachyon中相同的内存池
    • 它显著地减小垃圾回收的花费
    • 若是单个的执行者崩溃,缓存的数据不会丢失

4.4 checkPoint

当业务场景很是的复杂的时候,RDD的lineage(血统)依赖会很是的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖从新的计算丢失的RDD,这样会形成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。

  • 为当前RDD设置检查点。该函数将会建立一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程当中,该RDD的全部依赖于父RDD中的信息将所有被移出。对RDD进行checkpoint操做并不会立刻被执行,必须执行Action操做才能触发。

4.4.1 checkPoint优势

  1. 持久化在HDFS上,HDFS默认的3副本备份使得持久化的备份数据更加的安全
  2. 切断RDD的依赖关系:当业务场景复杂的时候,RDD的依赖关系很是的长的时候,当靠后的RDD数据丢失的时候,会经历较长的从新计算的过程,采用checkPoint会转为依赖checkPointRDD,能够避免长的lineage从新计算。

4.4.2 checkPoint的原理

  1. 当finalRDD执行Action类算子计算job任务的时候,Spark会从finalRDD从后往前回溯查看哪些RDD使用了checkPoint算子
  2. 将使用了checkPoint的算子标记
  3. Spark会自动的启动一个job来从新计算标记了的RDD,并将计算的结果存入HDFS,而后切断RDD的依赖关系
  4. 建议在执行checkpoint()方法以前先对rdd进行persisted操做。 在checkPoint的RDD以前先cache RDD,那么Spark就不用启动一个job来计算checkPoint的RDD,而是将持久化到内存的数据直接拷贝到HDFS,进而提升Spark的计算速度,提升应用程序的性能

5. 共享变量

Spark共享变量(广播变量、累加器)

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。

5.1 广播变量

有时会变量是在driver端建立的,可是由于须要在excutor端使用,因此driver会把变量以task的形式发送到excutor端,若是有不少个task,就会有不少给excutor端携带不少个变量,若是这个变量很是大的时候,就可能会形成内存溢出(以下图所示)。这个时候就引出了广播变量。

使用广播变量后:

int factor = 3;
final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);
...
//在RDD计算中
int factor = factorBroadcast.value();
复制代码

另外,为了确保全部的节点得到相同的变量,对象factorBroadcast广播后只读不可以被修改。

注意事项:

  • 能不能将一个RDD使用广播变量广播出去?
    不能,由于RDD是不存储数据的。能够将RDD的结果广播出去。 广播变量只能在Driver端定义,不能在Executor端定义。

5.2 累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操做。Accumulator只提供了累加的功能,可是却给咱们提供了多个task对一个变量并行操做的功能。task只能对Accumulator进行累加操做,不能读取它的值。只有Driver程序能够读取Accumulator的值。

final Accumulator<Integer> sum = sc.accumulator(0);
...
//RDD计算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());
复制代码

累加器只能由Spark内部进行更新,并保证每一个任务在累加器的更新操做仅执行一次,也就是说重启任务也不该该更新。在转换操做中,用户必须意识到任务和做业的调度过程从新执行会形成累加器的屡次更新。

累加器一样具备Spark懒加载的求值模型。若是它们在RDD的操做中进行更新,它们的值只在RDD进行行动操做时才进行更新。

相关文章
相关标签/搜索