spark RDD算子详解3

Actions算子数组

本质上在Actions算子中经过SparkContext执行提交做业的runJob操做,触发了RDD DAG的执行。分布式

1.无输出函数

(1)foreach(f)优化

对RDD中的每一个元素都应用f函数操做,不返回RDD和Array,而是返回Uint。spa

图3-25表示foreach算子经过用户自定义函数对每一个数据项进行操做。本例中自定义函数为println(),控制台打印全部数据项。scala

2.HDFScode

saveAsTextFile(path, compressionCodecClass=None)排序

函数将数据输出,存储到HDFS的指定目录。
将RDD中的每一个元素映射转变为(Null, x.toString),而后再将其写入HDFS。内存

图3-26中左侧的方框表明RDD分区,右侧方框表明HDFS的Block。经过函数将RDD的每一个分区存储为HDFS中的一个Block。io

3.Scala集合和数据类型

(1)collect()

collect将分布式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的函数式操做。

图3-28中的左侧方框表明RDD分区,右侧方框表明单机内存中的数组。经过函数操做,将结果返回到Driver程序所在的节点,以数组形式存储。

(2)collectAsMap()

collectAsMap对(K, V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。

图3-29中的左侧方框表明RDD分区,右侧方框表明单机数组。数据经过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

(3)reduceByKeyLocally(func)

实现的是先reduce再collectAsMap的功能,先对RDD的总体进行reduce操做,而后再收集全部结果返回为一个HashMap。

(4)lookup(key)

Lookup函数对(Key, Value)型的RDD操做,返回指定Key对应的元素造成的Seq。这个函数处理优化的部分在于,若是这个RDD包含分区器,则只会对应处理K所在的分区,而后返回由(K, V)造成的Seq。若是RDD不包含分区器,则须要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。

图3-30中的左侧方框表明RDD分区,右侧方框表明Seq,最后结果返回到Driver所在节点的应用中。

(5)count()

count返回整个RDD的元素个数。内部函数实现以下。
在图3-31中,返回数据的个数为5。一个方块表明一个RDD分区。

(6)top(num, key=None)

top可返回最大的k个元素。
相近函数说明以下。

top返回最大的k个元素。

take返回最小的k个元素。

takeOrdered返回最小的k个元素,而且在返回的数组中保持元素的顺序。

first至关于top(1)返回整个RDD中的前k个元素,能够定义排序的方式Ordering[T]。返回的是一个含前k个元素的数组。

(7)reduce(f)

经过函数func(接受两个参数,返回一个参数)汇集数据集中的全部元素。这个功能必须可交换且可关联的,从而能够正确的被并行执行。

例子:

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

(8)fold(zeroValue, op)

fold和reduce的原理相同,可是与reduce不一样,至关于每一个reduce时,迭代器取的第一个元素是zeroValue。

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
相关文章
相关标签/搜索