做者:studytime
原文: https://www.studytime.xin
1.一、RDD是什么?数据库
RDD(Resilient Distributed Dataset)叫作弹性分布式数据集,是Spark中最基本的数据抽象,表明一个不可变、可分区、里面的元素可并行计算的集合。编程
1.二、RDD的主要属性?数组
RDD 是Spark 中最基本的数据抽象,是一个逻辑概念,它可能并不对应次磁盘或内存中的物理数据,而仅仅是记录了RDD的由来,父RDD是谁,以及怎样从父RDD计算而来。缓存
spark 源码里面对 RDD 的描述:并发
Internally, each RDD is characterized by five main properties: A list of partitions A function for computing each split A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
能够知道,每一个 RDD 有如下五部分构成:app
RDD是一个应用层面的逻辑概念。一个RDD多个分片。RDD就是一个元数据记录集,记录了RDD内存全部的关系数据。分布式
Spark 程序设计流程通常以下:ide
步骤一:实例化 sparkContent 对象。sparkContent 封装了程序运行的上下文环境,包括配置信息、数据库管理器、任务调度等。
步骤二:构造 RDD。可经过 sparkContent 提供的函数构造 RDD,常见的 RDD 构造方式分为:将 Scala集合转换为 RDD 和将 Hadoop 文件转换为 RDD。
步骤三:在 RDD 基础上,经过 Spark 提供的 transformation 算子完成数据处理步骤。
步骤四:经过 action 算子将最终 RDD 做为结果直接返回或者保存到文件中。函数
Spark 提供了两大类编程接口,分别为 RDD 操做符以及共享变量。
其中 RDD 操做符包括 transformation 和 action 以及 control API 三类;共享变量包括广播变量和累加器两种。oop
一、 建立conf ,封装了spark配置信息
SparkConf conf = new SparkConf().setAppName(appName); conf.set(“spark.master”, “local”); conf.set(“spark.yarn.queue”, “infrastructure”);
二、 建立 SparkContext,封装了调度器等信息
JavaSparkContext jsc = new JavaSparkContext(conf);
一、Java 集合构建
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = jsc.parallelize(data, 3)
二、 将文本文件转换为 RDD
jsc.textFile(“/data”, 1) jsc.textFile(“/data/file.txt”, 1) jsc.textFile(“/data/*.txt”, 1) jsc.textFile(“hdfs://bigdata:9000/data/”, 1) jsc.sequenceFile(“/data”, 1) jsc.wholeTextFiles(“/data”, 1)
transformation API 是惰性的,调用这些API比不会触发实际的分布式数据计算,而仅仅是将相关信息记录下来,直到action API才会开始数据计算。
Spark 提供了大量的 transformation API,下面列举了一些经常使用的API:
API | 功能 |
---|---|
map(func) | 将 RDD 中的元素,经过 func 函数逐一映射成另一个值,造成一个新的 RDD |
filter(func) | 将 RDD 中使用 func 函数返回 true 的元素过滤出来,造成一个新的 RDD |
flatMap(func | 相似于map,但每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 相似于 map,但独立地在 RDD 的每个分片上运行,所以在类型为T的 RDD 上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
sample(withReplacement, fraction, seed) | 数据采样函数。根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 |
union(otherDataset) | 求两个 RDD (目标 RDD 与指定 RDD)的并集,并以 RDD 形式返回 |
intersection(otherDataset) | 求两个 RDD (目标 RDD 与指定 RDD )的交集,并以 RDD 形式返回 |
distinct([numTasks])) | 对目标 RDD 进行去重后返回一个新的 RDD |
groupByKey([numTasks]) | 针对 key/value 类型的 RDD,将 key 相同的 value 汇集在一块儿。默认任务并发度与父 RDD 相同,可显示设置 [numTasks]大小 |
reduceByKey(func, [numTasks]) | 针对 key/value 类型的 RDD,将 key 相同的 value 汇集在一块儿,将对每组value,按照函数 func 规约,产生新的 RDD |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 与 reduceByKey 相似,但目标 key/value 的类型与最终产生的 RDD 可能不一样 |
sortByKey([ascending], [numTasks]) | 针对 key/value 类型的 RDD,按照 key 进行排序,若 ascending 为 true,则为升序,反之为降序 |
join(otherDataset, [numTasks]) | 针对 key/value 类型的 RDD,对 (K,V) 类型的 RDD 和(K,W)类型的RDD上调用,按照 key 进行等值链接,返回一个相同key对应的全部元素在一块儿的(K,(V,W))的RDD 至关于内链接(求交集) |
cogroup(otherDataset, [numTasks]) | 分组函数,对(K,V)类型的RDD和(K,W)类型的RD按照key进行分组,产生新的 (K,(Iterable<V>,Iterable<W>)) 类型的RDD |
cartesian(otherDataset) | 求两个 RDD 的笛卡尔积 |
coalesce(numPartitions) | 从新分区, 缩减分区数,用于大数据集过滤后,提升小数据集的执行效率 |
repartition(numPartitions) | 从新分区,将目标 RDD 的 partition 数量从新调整为 numPartitions, 少变多 |
glom() | 将RDD中每一个partition中元素转换为数组,并生 成新的rdd2 |
mapValues() | 针对于(K,V)形式的类型只对V进行操做 |
cache | RDD缓存,能够避免重复计算从而减小时间,cache 内部调用了 persist 算子,cache 默认就一个缓存级别 MEMORY-ONLY |
persist | persist 能够选择缓存级别 |
transformation 算子具备惰性执行的特性,他仅仅是记录一些原信息,知道遇到action算子才会触发相关transformation 算子的执行,
Spark 提供了大量的 action API,下面列举了一些经常使用的API:
API | 功能 |
---|---|
reduce(func) | 将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止 |
collect() | 将 RDD 以数组的形式返回给 Driver,经过将计算后的较小结果集返回 |
count() | 计算 RDD 中的元素个数 |
first() | 返回 RDD 中第一个元素 |
take(n) | 以数组的形式返回 RDD 前 n 个元素 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
saveAsTextFile(path) | 将 RDD 存储到文本文件中,并一次调用每一个元素的toString方法将之转换成字符串保存成一行 |
saveAsSequenceFile(path) | 针对 key/value 类型的 RDD,保存成 SequenceFile 格式文件 |
countByKey() | 针对 key/value 类型的 RDD,统计每一个 key出现的次数,并以 hashmap 形式返回 |
foreach(func) | 将 RDD 中的元素一次交给 func 处理 |
aggregate | 先对分区进行操做,再整体操做 |
aggregateByKey | |
lookup(key: K) | 针对 key/value 类型的 RDD, 指定key值,返回RDD中该K对应的全部V值。 |
foreachPartition | 相似于 foreach,但独立地在 RDD 的每个分片上运行,其中可嵌入foreach算子 |
因为 RDD 是粗粒度的操做数据集,每一个 Transformation 操做都会生成一个新的 RDD,因此 RDD 之间就会造成相似流水线的先后依赖关系;RDD 和它依赖的父 RDD(s)的关系有两种不一样的类型,即窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。
宽依赖和窄依赖深度剖析图:
窄依赖:指的是子 RDD 只依赖于父 RDD 中一个固定数量的分区。
宽依赖:指的是子 RDD 的每个分区都依赖于父 RDD 的全部分区。
RDD Stage:
在 Spark 中,Spark 会将每个 Job 分为多个不一样的 Stage, 而 Stage 之间的依赖关系则造成了有向无环图,Spark 会根据 RDD 之间的依赖关系将 DAG 图(有向无环图)划分为不一样的阶段,对于窄依赖,因为 Partition 依赖关系的肯定性,Partition 的转换处理就能够在同一个线程里完成,窄依赖就被 Spark 划分到同一个 stage 中,而对于宽依赖,只能等父 RDD shuffle 处理完成后,下一个 stage 才能开始接下来的计算。
RDD 持久化是 Spark 很是重要的特性之一。用户可显式将一个 RDD 持久化到内存或磁盘中,以便重用该RDD。RDD 持久化是一个分布式的过程,其内部的每一个 Partition 各自缓存到所在的计算节点上。RDD 持久化存储能大大加快数据计算效率,尤为适合迭代式计算和交互式计算。
Spark 提供了 persist 和 cache 两个持久化函数,其中 cache 将 RDD 持久化到内存中,而 persist 则支持多种存储级别。
persist RDD 存储级别:
持久化级别 | 含义 | |
---|---|---|
MEMORY_ONLY | 以非序列化的Java对象的方式持久化在JVM内存中。若是内存没法彻底存储RDD全部的partition,那么那些没有持久化的partition就会在下一次须要使用它的时候,从新被计算。 | |
MEMORY_AND_DISK | 同上,可是当某些partition没法存储在内存中时,会持久化到磁盘中。下次须要使用这些partition时,须要从磁盘上读取。 | |
MEMORY_ONLY_SER | 同MEMORY_ONLY,可是会使用Java序列化方式,将Java对象序列化后进行持久化。能够减小内存开销,可是须要进行反序列化,所以会加大CPU开销 | |
MEMORY_AND_DSK_SER | 同MEMORY_AND_DSK。可是使用序列化方式持久化Java对象。 | |
DISK_ONLY | 使用非序列化Java对象的方式持久化,彻底存储到磁盘上。 | |
MEMORY_ONLY_2 | ||
MEMORY_AND_DISK_2 | 若是是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其余节点,从而在数据丢失时,不须要再次计算,只须要使用备份数据便可。 |
如何选择RDD持久化策略:
Spark 提供的多种持久化级别,主要是为了在 CPU 和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:
除了 cache 和 persist 以外,Spark 还提供了另一种持久化:checkpoint, 它能将 RDD 写入文件系统,提供相似于数据库快照的功能。
于 cache 和 persist, 区别:
代码实例:
sc.checkpoint("hdfs://spark/rdd"); // 设置存放目录 val data = sc.testFile("hdfs://bigdata:9000/input"); val rdd = data.map(..).reduceByKey(...) rdd.checkpoint rdd.count()