5.spark core之RDD编程

  spark提供了对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。RDD是一个分布式的数据集合,数据能够跨越集群中的多个机器节点,被分区并行执行。
  在spark中,对数据的全部操做不外乎建立RDD、转化已有RDD及调用RDD操做进行求值。spark会自动地将RDD中的数据分发到集群中并行执行。java

五大特性

  • a list of partitions
      RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,通常会遵循数据的本地性(通常一个hdfs里的block会加载为一个partition)。
  • a function for computing each split
      RDD的每一个partition中都会有function,即函数应用,其做用是实现RDD之间partition的转换。
  • a list of dependencies on other RDDs
      RDD会记录它的依赖,为了容错(重算,cache,checkpoint),即内存中的RDD操做出错或丢失时会进行重算。
  • Optionally,a Partitioner for Key-value RDDs
      可选项,若是RDD里面存的数据是key-value形式,则能够传递一个自定义的Partitioner进行从新分区,例如自定义的Partitioner是基于key进行分区,那则会将不一样RDD里面的相同key的数据放到同一个partition里面。
  • Optionally, a list of preferred locations to compute each split on
      可选项,最优的位置去计算每一个分片,即数据的本地性。

    建立RDD

      spark提供了两种建立RDD的方式:读取外部数据源、将驱动器程序中的集合进行并行化。python

    并行化集合

      使用sparkContext的parallelize()方法将集合并行化。
      parallelize()方法第二个参数可指定分区数。spark会为每一个分区建立一个task任务,一般每一个cpu须要2-4个分区。spark会自动地根据集群大小设置分区数,也支持经过parallelize()方法的第二个参数手动指定。apache

    scala

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)

    java

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);

    python

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)

      注:除了开发和测试外,这种方式用得很少。这种方式须要把整个数据集先放到一台机器的内存中。编程

    读取外部数据源

      spark可接入多种hadoop支持的数据源来建立分布式数据集。包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。
      spark支持多种存储格式,包括textFiles、SequenceFiles及其余hadoop存储格式。缓存

    scala

    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

    java

    JavaRDD<String> distFile = sc.textFile("data.txt");

    python

    >>> distFile = sc.textFile("data.txt")

RDD操做

  RDD支持两种操做:转化操做和行动操做。
算子.png分布式

转化操做

  RDD的转化操做会返回一个新的RDD。转化操做是惰性求值的,只有行动操做用到转化操做生成的RDD时,才会真正进行转化。
转化算子.png
  spark使用lineage(血统)来记录转化操做生成的不一样RDD之间的依赖关系。依赖分为窄依赖(narrow dependencies)和宽依赖(wide dependencies)。ide

  • 窄依赖
    • 子RDD的每一个分区依赖于常数个父分区
    • 输入输出一对一,结果RDD的分区结构不变,主要是map、flatMap
    • 输入输出一对一,但结果RDD的分区结构发生变化,如union、coalesce
    • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample
  • 宽依赖函数

    • 子RDD的每一个分区依赖于全部父RDD分区
    • 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
    • 对两个RDD基于key进行合并和重组,如join
      转化算子依赖.pngoop

      行动操做

        行动操做则会向驱动器程序返回结果或把结果写入外部系统,会触发实际的计算。
      action算子.png性能

      缓存方式

        RDD经过persist方法或cache方法能够将前面的计算结果缓存,可是并非这两个方法被调用时当即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
        cache最终也是调用了persist方法,默认的存储级别是仅在内存存储一份。
      缓存.jpg
        Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
      缓存方式.png
        缓存有可能丢失,RDD的缓存容错机制保证即便缓存丢失也能保证计算正确执行。经过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,所以只须要计算丢失的部分便可,并不须要重算所有Partition。

      容错机制

      • Lineage机制

        • RDD的Lineage记录的是粗粒度的特定数据Transformation操做行为。当RDD的部分分区数据丢失时,能够经过Lineage来从新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,因此Spark并不适用于全部高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能的提高。

        • Spark Lineage机制是经过RDD的依赖关系来执行的

          • 窄依赖能够在某个计算节点上直接经过计算父RDD的某块数据计算获得子RDD对应的某块数据。

          • 宽依赖则要等到父RDD全部数据都计算完成后,将父RDD的计算结果进行hash并传到对应节点上以后才能计算子RDD。宽依赖要将祖先RDD中的全部数据块所有从新计算,因此在长“血统”链特别是有宽依赖的时候,须要在适当的时机设置数据检查点。
      • Checkpoint机制

        • 简介

          • 当RDD的action算子触发计算结束后会执行checkpoint;Task计算失败的时候会从checkpoint读取数据进行计算。
        • 实现方式(checkpoint有两种实现方式,若是代码中没有设置checkpoint,则使用local的checkpoint模式,若是设置路径,则使用reliable的checkpoint模式。)

          • LocalRDDCheckpointData:临时存储在本地executor的磁盘和内存上。该实现的特色是比较快,适合lineage信息须要常常被删除的场景(如GraphX),可容忍executor挂掉。

          • ReliableRDDCheckpointData:存储在外部可靠存储(如hdfs),能够达到容忍driver 挂掉状况。虽然效率没有存储本地高,可是容错级别最好。

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

这里写图片描述

相关文章
相关标签/搜索