Spark 的核心概念 RDD

1.RDD 概述

1.1 什么是 RDD ?

RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中最基本的抽象,它表明一个不可变、可分区、里面元素能够并行计算的集合。php

RDD 具备数据流模型特色:自动容错、位置感知性调度和可伸缩。java

RDD 容许用户在执行多个查询时,显示地将工做集缓存在内存中,后续的查询可以重用工做集,这将会极大的提高查询的效率。linux

咱们能够认为 RDD 就是一个代理,咱们操做这个代理就像操做本地集合同样,不需去关心任务调度、容错等问题。android

1.2 RDD 的属性

在 RDD 源码中这样来描述 RDD算法

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

复制代码
  1. 一组分片(Partition),即数据集的基本组成单位。 对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD 的时候指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  2. 对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD 的时候指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  3. RDD 之间互相存在依赖关系。 RDD 的每次转换都会生成一个新的 RDD ,因此 RDD 以前就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark 能够经过这个依赖关系从新计算丢失部分的分区数据,而不是对 RDD 的全部分区进行从新计算。
  4. 一个Partitioner ,即 RDD 的分片函数 。当前Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner ,另一个是基于范围的 RangePartitioner。只有对于key-value的RDD ,才会有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函数不但决定了RDD 自己的分片数量,也决定了 Parent RDD Shuffle 输出时的分片数量。
  5. 一个列表,存储存取每一个Partition 的优先位置(preferred location)。 对于一个HDFS 文件来讲,这个列表保存的就是每一个 Partition 所在的块位置。安装“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽量地将计算任务分配到其所要处理数据块的存储位置。

2 建立 RDD

2.1 由一个存在的 Scala 集合进行建立

#经过并行化scala集合建立RDD,通常在测试的时候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
复制代码

2.2 由外部的存储系统的数据集建立,包括本地的文件系统,还有全部 Hadoop 支持的数据集,好比 HDFS、Cassandra、Hbase

var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
复制代码

2.3 调用一个已经存在了的RDD 的 Transformation,会生成一个新的 RDD。

3 RDD 的编程 API

3.1 Transformation

这种 RDD 中的全部转换都是延迟加载的,也就是说,他们并不会直接就计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个返回结果的 Driver 的动做时,这些操做才会真正的运行。这种设计会让Spark 更加有效率的运行。apache

经常使用的 Transformation 操做:编程

转换 含义
map(func) 返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成
flatMap(func) 相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
mapPartitions(func) 相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,与groupByKey相似,reduce任务的个数能够经过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操做
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey相似,可是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 调用外部程序
coalesce(numPartitions) 从新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false ;少分区变多分区 true ; 多分区变少分区 false
repartition(numPartitions) 从新分区 必须shuffle 参数是要分多少区 少变多
repartitionAndSortWithinPartitions(partitioner) 从新分区+排序 比先分区再排序效率高 对K/V的RDD进行操做

3.2 Action

触发代码的运行操做,咱们一个Spark 应用,至少须要一个 Action 操做。windows

动做 含义
reduce(func) 经过func函数汇集RDD中的全部元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的全部元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(相似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。
foreach(func) 在数据集的每个元素上,运行函数func进行更新。
foreachPartition(func) 在每一个分区上,运行函数 func

3.3 Spark WordCount 代码示例

执行流程图: api

wc执行流程图

pom.xml 依赖数组

<!-- 导入scala的依赖 -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.2.0</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!-- 指定hadoop-client API的版本 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
复制代码

scala 版本代码实现:

package com.zhouq.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * scala 版本实现 wc
  *
  */
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //这行代码是由于我在windows 上直接跑,须要去读取 hadoop 上的文件,设置个人用户名。若是是linux 环境能够不设置。视状况而定
    System.setProperty("HADOOP_USER_NAME", "root")
    //建立spark 配置,设置应用程序名字
//    val conf = new SparkConf().setAppName("scalaWordCount")
    val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")

//    conf.set("spark.testing.memory","102457600")
    //建立spark 执行的入口
    val sc = new SparkContext(conf)

    //指定之后从哪里读取数据建立RDD (弹性分布式数据集)
    //取到一行数据
    val lines: RDD[String] = sc.textFile(args(0))

    //切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //按单词和一组合
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //按key 进行聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 排序, false 表示倒序
    val sorted = reduced.sortBy(_._2, false)

    //将结果保存到hdfs中
    sorted.saveAsTextFile(args(1))

    //释放资源
    sc.stop()
  }
}
复制代码

Java7 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
* Java 版WordCount
*/
public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //建立SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception{
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String tp) throws Exception {
                return new Tuple2<>(tp, 1);
            }
        });

        //先交换再排序,由于 只有groupByKey 方法
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
//                return new Tuple2<>(tp._2, tp._1);
                return tp.swap();
            }
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

Java8 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

/**
* Java Lambda 表达式版本的  WordCount
*/
public class JavaLambdaWordCount {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //建立SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
//        lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
//        words.mapToPair(tp -> new Tuple2<>(tp,1));
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));

        //先交换再排序,由于 只有groupByKey 方法
//        swaped.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
//                return new Tuple2<>(tp._2, tp._1);
            return tp.swap();
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
//        sorted.mapToPair(tp -> tp.swap());
        JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

4 RDD 的依赖关系

RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不一样的类型,即 窄依赖(narrow dependency)和宽依赖(wide dependency)。

在这里插入图片描述

窄依赖:窄依赖指的是每个父 RDD 的 Partition 最多被子 RDD 的一个分区使用。能够比喻为独生子女。 宽依赖:宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition

5 RDD 的持久化

5.1 RDD 的 cache(持久化)

Spark中最重要的功能之一是跨操做在内存中持久化(或缓存)数据集。当您持久保存RDD时,每一个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其余操做中重用它们。这使得将来的行动更快(一般超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可使用persist()或cache()方法标记要保留的RDD 。第一次在动做中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 若是丢失了RDD的任何分区,它将使用最初建立它的转换自动从新计算。

5.2 何时咱们须要持久化?

  1. 要求的计算速度快
  2. 集群的资源要足够大
  3. 重要: cache 的数据会屡次触发Action
  4. 建议先进行数据过滤,而后将缩小范围后的数据再cache 到内存中.

5.3 如何使用

使用 rdd.persist()或者rdd.cache()

val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法来缓存数据到内存
val cache = lines.cache()
//注意查看下面两次count 的时间
cached.count
cached.count

复制代码

5.4 数据缓存的存储级别 StorageLevel

咱们在 StorageLevel.scala 源码中能够看到:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码

解释一下各个参数的意思:

第一个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障致使分区数据丢失,当从新分配任务时,去另外的机器读取备份数据进行从新计算)

OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中

5.5 如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不一样权衡。咱们推荐经过下面的过程选择一个合适的存储级别:

  1. 若是你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。由于这是cpu利用率最高的选项,会使RDD上的操做尽量的快。
  2. 若是不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提升对象的空间使用率,可是仍可以至关快的访问。
  3. 除非函数计算RDD的花费较大或者它们须要过滤大量的数据,不要将RDD存储到磁盘上,不然,重复计算一个分区就会和重磁盘上读取数据同样慢。
  4. 若是你但愿更快的错误恢复,能够利用重复(replicated)存储级别。全部的存储级别均可以经过重复计算丢失的数据来支持完整的容错,可是重复的数据可以使你在RDD上继续运行任务,而不须要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具备以下优点:
    1. 它运行多个执行者共享Tachyon中相同的内存池
    2. 它显著地减小垃圾回收的花费
    3. 若是单个的执行者崩溃,缓存的数据不会丢失

5.6 删除 cache

Spark自动的监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。若是你想手动的删除RDD,可使用 RDD.unpersist()方法

5.7 RDD 的 checkpoint机制

咱们除了把数据缓存到内存中,还能够把数据缓存到HDFS 中,保证中间数据不丢失.

何时咱们须要作chechpoint?

  1. 作复杂的迭代计算,要求保证数据安全,不丢失
  2. 对速度要求不高(跟 cache 到内存进行对比)
  3. 将中间结果保存到 hdfs 中

怎么作 checkpoint ?

首先设置 checkpoint 目录,而后再执行计算逻辑,再执行 checkpoint() 操做。

下面代码使用cache 和 checkpoint 两种方式实现计算每门课最受欢迎老师的 topN

package com.zhouq.spark

import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求每门课程最受欢迎老师TopN  --2
  *   -- 使用cache
  *   -- 使用checkpoint 通常设置hdfs 目录
  */
object GroupFavTeacher2_cache_checkpoint {
  def main(args: Array[String]): Unit = {
    //前 N
    val topN = args(1).toInt
    //学科集合
    val subjects = Array("bigdata", "javaee", "php")
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
    //建立spark 执行入口
    val sc = new SparkContext(conf)
    //checkpoint 得先设置 sc 的checkpoint 的dir
//    sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")

    //指定读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      var teacher = line.substring(index + 1)
      var httpHost = line.substring(0, index)
      var subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })
    //将学科,老师联合当作key
    val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)

    //第一种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 之后被反复使用,才使用cache
    val cached: RDD[((String, String), Int)] = reduced.cache()

    //第二种 使用checkpoint,得先设置 sc 的 checkpointDir
//   val cached: RDD[((String, String), Int)] = reduced.checkpoint()

    /**
      * 先对学科进行过滤,而后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的状况.
      * take 是Action 操做,每次take 都会进行一次任务提交,具体查看日志打印状况
      */
    for (sub <- subjects) {
      //过滤出当前的学科
      val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
      //使用RDD 的 sortBy ,内存+磁盘排序,避免scala 中的排序因内存不足致使异常状况.
      //take 是Action 的,因此每次循环都会触发一次提交任务,祥见日志打印状况
      val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
      println(favTeacher.toBuffer)
    }

    /**
      * 前面cache的数据已经计算完了,后面还有不少其余的指标要计算
      * 后面计算的指标也要触发不少次Action,最好将数据缓存到内存
      * 原来的数据占用着内存,把原来的数据释放掉,才能缓存新的数据
      */

    //把原来缓存的数据释放掉
    cached.unpersist(true)

    sc.stop()
  }
}
复制代码

6 DAG 的生成

DAG(Directed Acyclic Graph)叫作有向无环图,原始的RDD经过一系列的转换就就造成了DAG,根据RDD之间的依赖关系的不一样将DAG划分红不一样的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据。

在这里插入图片描述

微信公众号文章连接:Spark RDD

有兴趣欢迎关注,你们一块儿交流学习。

在这里插入图片描述
相关文章
相关标签/搜索