大数据开发-Spark-一文理解常见RDD

1.五个基本Properties

  • A list of partitions数组

  • A function for computing each split函数

  • A list of dependencies on other RDDsoop

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)测试

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)大数据

这是RDD的源码中注释中写到的,下面介绍这五种特征属性this

1.1 分区

一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决
定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值人工智能

1.2 计算的函数

一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每一个RDD都会实现 compute 函数以
达到该目的。compute函数会对迭代器进行组合,不须要保存每次计算的结果scala

1.3 依赖关系

RDD之间的存在依赖关系。RDD的每次转换都会生成一个新的RDD,RDD之间造成相似于流水线同样的先后依
赖关系(lineage)。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是
对RDD的全部分区进行从新计算3d

1.4 分区器

对于 key-value 的RDD而言,可能存在分区器(Partitioner)。Spark 实现了两种类型的分片函数,一个是基于
哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有 key-value 的RDD,才可能有
Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD自己的分片数量,也
决定了parent RDD Shuffle输出时的分片数量code

1.5 优先存储位置

一个列表,存储存储每一个Partition的优先位置(preferred location)。对于一个HDFS文件来讲,这个列表保
存的就是每一个Partition所在的块的位置。按照“移动数据不移动计算”的理念,Spark在任务调度的时候,会尽可
能地将计算任务分配到其所要处理数据块的存储位置

2. RDD转换之间的常见算子

从前面的RDD的基本特征入手,在工做中常编写的程序是,建立RDDRDD的转换,RDD的算子的执行,建立对应着外部系统的数据流入Spark集群的必选步骤,至于之间从集合建立的数据,通常在测试时候使用,因此不细述,RDD的转换对应一个专门的算子叫Transformation其是惰性加载使用的, 而行动对应着触发Transformation执行的操做,通常是输出到集合,或者打印出来,或者返回一个值,另外就是从集群输出到别的系统,这有一个专业词叫Action.

2.1 常见转换算子

转换算子,即从一个RDD到另一个RDD的转换操做,对应一些内置的Compute函数,可是这些函数被有没有shuffle来分为宽依赖算子和窄依赖算子

2.1.1 宽依赖和窄依赖的区别

通常网上文章有两种,一种是搬运定义的,便是否一个父RDD分区会被多个子分区依赖,另一种是看有没有Shuffle,有Shuffle就是宽依赖,没有则是窄依赖,第一种还靠谱点,第二种就是拿自己来讲自己,因此没有参考价值,2.1.3 如何区别宽依赖和窄依赖,能够之间看这个

2.1.2 宽依赖和窄依赖的常见算子

窄依赖常见算子

map(func):对数据集中的每一个元素都使用func,而后返回一个新的RDD
filter(func):对数据集中的每一个元素都使用func,而后返回一个包含使func为true的元素构成的RDD
flatMap(func):与 map 相似,每一个输入元素被映射为0或多个输出元素
mapPartitions(func):和map很像,可是map是将func做用在每一个元素上,而mapPartitions是func做用在整个分
区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数
仅被调用M次,一次处理一个分区中的全部元素
mapPartitionsWithIndex(func):与 mapPartitions 相似,多了分区的索引值的信息

glom():将每个分区造成一个数组,造成新的RDD类型 RDD[Array[T]]
sample(withReplacement, fraction, seed):采样算子。以指定的随机种子(seed)随机抽样出数量为fraction的数
据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

coalesce(numPartitions,false):无shuffle,通常用来减小分区

union(otherRDD) : 求两个RDD的并集

cartesian(otherRDD):笛卡尔积

zip(otherRDD):将两个RDD组合成 key-value 形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否
则会抛出异常。

map 与 mapPartitions 的区别
map:每次处理一条数据
mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易致使
OOM
最佳实践:当内存资源充足时,建议使用mapPartitions,以提升处理效率

宽依赖常见算子

groupBy(func):按照传入函数的返回值进行分组。将key相同的值放入一个迭代器

distinct([numTasks])):对RDD元素去重后,返回一个新的RDD。可传入numTasks参数改变RDD分区数

coalesce(numPartitions, true):有shuffle,不管增长分区仍是减小分区,通常用repartition来代替

repartition(numPartitions):增长或减小分区数,有shuffle

sortBy(func, [ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序

intersection(otherRDD) : 求两个RDD的交集

subtract (otherRDD) : 求两个RDD的差集

2.1.3 如何区别宽依赖和窄依赖

这里我建议理解不了的算子,直接从Sparkhistory的依赖图来看,有没有划分Stage,若是划分了就是宽依赖,没有划分就是窄依赖,固然这是实战派的作法,能够在同事或者同窗说明问题的时候,show your code 给他,而后把依赖图拿给他 ,固然做为理论加实践的并行者,我这里再拿一种来判别,是从理解定义开始的,定义说是父RDD分区有没有被多个子分区依赖,那能够从这个角度想一下,父分区单个分区数据,有没有可能流向不一样的子RDD的分区,好比想想distinct算子,或者sortBy算子,全局去重和全局排序,假设刚开始1,2,3在一个分区,通过map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) 去重后,虽然分区数量没有变,可是每一个分区数据必然要看别的分区的数据,才能知道最后本身要不要保留,从输入分区,到输出分区,必然通过汇合重组,因此必然有shuffle的。sortBy同理。

2.2 常见行动算子

Action触发Job。一个Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job;
典型的Action算子: collect / count
collect() => sc.runJob() => ... => dagScheduler.runJob() => 触发了Job

collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)

first():Return the first element in this RDD
take(n):Take the first num elements of the RDD
top(n):按照默认(降序)或者指定的排序规则,返回前num个元素。
takeSample(withReplacement, num, [seed]):返回采样的数据
foreach(func) / foreachPartition(func):与map、mapPartitions相似,区别是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)

3. PairRDD常见操做

RDD总体上分为 Value 类型和 Key-Value 类型。
前面介绍的是 Value 类型的RDD的操做,实际使用更多的是 key-value 类型的RDD,也称为 PairRDD。
Value 类型RDD的操做基本集中在 RDD.scala 中;
key-value 类型的RDD操做集中在 PairRDDFunctions.scala 中;

前面介绍的大多数算子对 Pair RDD 都是有效的,RDD的值为key-value的时候便可隐式转换为PairRDD, Pair RDD还有属于本身的 Transformation、Action 算子;

file

3.1 常见PairRDD的Transformation操做

3.1.1 相似 map 操做

mapValues / flatMapValues / keys / values,这些操做均可以使用 map 操做实现,是简化操做。

3.1.2 聚合操做【重要、难点】

PariRDD(k, v)使用范围广,聚合
groupByKey / reduceByKey / foldByKey / aggregateByKey
combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底层实现
subtractByKey:相似于subtract,删掉 RDD 中键与 other RDD 中的键相同的元素

结论:效率相等用最熟悉的方法;groupByKey在通常状况下效率低,尽可能少用

3.1.3 排序操做

sortByKey:sortByKey函数做用于PairRDD,对Key进行排序

3.1.4 join操做

cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin

file

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// 仿照源码实现join操做
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

3.1.5 Action操做

collectAsMap / countByKey / lookup(key)

file

lookup(key):高效的查找方法,只查找对应分区的数据(若是RDD有分区器的话

4.寄语

实战出真知,想要某种实现的时候,假设刚好你想到某个算子,那么去使用它,不懂的地方看源码,大业可成!
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注
file

相关文章
相关标签/搜索