Spark的算子的分类数组
从大方向来讲,Spark 算子大体能够分为如下两类:缓存
1)Transformation 变换/转换算子:这种变换并不触发提交做业,完成做业中间过程处理。app
Transformation 操做是延迟计算的,也就是说从一个RDD 转换生成另外一个 RDD 的转换操做不是立刻执行,须要等到有 Action 操做的时候才会真正触发运算。分布式
2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 做业。ide
Action 算子会触发 Spark 提交做业(Job),并将数据输出 Spark系统。函数
从小方向来讲,Spark 算子大体能够分为如下三类:oop
1)Value数据类型的Transformation算子,这种变换并不触发提交做业,针对处理的数据项是Value型的数据。
2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交做业,针对处理的数据项是Key-Value型的数据对。优化
3)Action算子,这类算子会触发SparkContext提交Job做业。this
1)Value数据类型的Transformation算子 .net
1、输入分区与输出分区一对一型
一、map算子
二、flatMap算子
三、mapPartitions算子
四、glom算子
2、输入分区与输出分区多对一型
五、union算子
六、cartesian算子
3、输入分区与输出分区多对多型
七、grouBy算子
4、输出分区为输入分区子集型
八、filter算子
九、distinct算子
十、subtract算子
十一、sample算子
十二、takeSample算子
5、Cache型
1三、cache算子
1四、persist算子
2)Key-Value数据类型的Transfromation算子
1、输入分区与输出分区一对一
1五、mapValues算子
2、对单个RDD或两个RDD汇集
单个RDD汇集
1六、combineByKey算子
1七、reduceByKey算子
1八、partitionBy算子
两个RDD汇集
1九、Cogroup算子
3、链接
20、join算子
2一、leftOutJoin和 rightOutJoin算子
3)Action算子
1、无输出
2二、foreach算子
2、HDFS
2三、saveAsTextFile算子
2四、saveAsObjectFile算子
3、Scala集合和数据类型
2五、collect算子
2六、collectAsMap算子
2七、reduceByKeyLocally算子
2八、lookup算子
2九、count算子
30、top算子
3一、reduce算子
3二、fold算子
3三、aggregate算子
将原来 RDD 的每一个数据项经过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子至关于初始化一个 RDD, 新 RDD 叫作 MappedRDD(this, sc.clean(f))。
图 1中每一个方框表示一个 RDD 分区,左侧的分区通过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。可是,实际只有等到 Action算子触发后,这个 f 函数才会和其余函数在一个stage 中对数据进行运算。在图 1 中的第一个分区,数据记录 V1 输入 f,经过 f 转换输出为转换后的分区中的数据记录 V’1。
图1 map 算子对 RDD 转换
将原来 RDD 中的每一个元素经过函数 f 转换为新的元素,并将生成的 RDD 的每一个集合中的元素合并为一个集合,内部建立 FlatMappedRDD(this,sc.clean(f))。
图 2 表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 做, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 能够是任意的数据类型。将分区中的数据经过用户自定义函数 f 转换为新的数据。外部大方框能够认为是一个 RDD 分区,小方框表明一个集合。 V一、 V二、 V3 在一个集合做为 RDD 的一个数据项,可能存储为数组或其余容器,转换为V’一、 V’二、 V’3 后,将原来的数组或容器结合拆散,拆散的数据造成为 RDD 中的数据项。
图2 flapMap 算子对 RDD 转换
mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 做。 内 部 实 现 是 生 成
MapPartitionsRDD。图 3 中的方框表明一个 RDD 分区。图 3 中,用户经过函数 f (iter)=>iter.f ilter(_>=3) 对分区中全部数据进行过滤,大于和等于 3 的数据保留。一个方块表明一个 RDD 分区,含有 一、 二、 3 的分区过滤只剩下元素 3。
图3 mapPartitions 算子对 RDD 转换
glom函数将每一个分区造成一个数组,内部实现是返回的GlommedRDD。 图4中的每一个方框表明一个RDD分区。图4中的方框表明一个分区。 该图表示含有V一、 V二、 V3的分区经过函数glom造成一数组Array[(V1),(V2),(V3)]。
图 4 glom算子对RDD转换
使用 union 函数时须要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操做,保存全部元素。若是想去重
可使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,经过 ++ 符号至关于 union 函数操做。
图 5 中左侧大方框表明两个 RDD,大方框内的小方框表明 RDD 的分区。右侧大方框表明合并后的 RDD,大方框内的小方框表明分区。
含有V一、V二、U一、U二、U三、U4的RDD和含有V一、V八、U五、U六、U七、U8的RDD合并全部元素造成一个RDD。V一、V一、V二、V8造成一个分区,U一、U二、U三、U四、U五、U六、U七、U8造成一个分区。
图 5 union 算子对 RDD 转换
对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 做。 操 做 后, 内 部 实 现 返 回CartesianRDD。图6中左侧大方框表明两个 RDD,大方框内的小方框表明 RDD 的分区。右侧大方框表明合并后的 RDD,大方框内的小方框表明分区。图6中的大方框表明RDD,大方框中的小方框表明RDD分区。
例 如: V1 和 另 一 个 RDD 中 的 W一、 W二、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。
图 6 cartesian 算子对 RDD 转换
groupBy :将元素经过函数生成相应的 Key,数据就转化为 Key-Value 格式,以后将 Key 相同的元素分为一组。
函数实现以下:
1)将用户函数预处理:
val cleanF = sc.clean(f)
2)对数据 map 进行函数操做,最后再进行 groupByKey 分组操做。
this.map(t => (cleanF(t), t)).groupByKey(p)
其中, p 肯定了分区个数和分区函数,也就决定了并行化的程度。
图7 中方框表明一个 RDD 分区,相同key 的元素合并到一个组。例如 V1 和 V2 合并为 V, Value 为 V1,V2。造成 V,Seq(V1,V2)。
图 7 groupBy 算子对 RDD 转换
filter 函数功能是对元素进行过滤,对每一个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。
下面代码为函数的本质实现:
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
图 8 中每一个方框表明一个 RDD 分区, T 能够是任意的类型。经过用户自定义的过滤函数 f,对每一个数据项操做,将知足条件、返回结果为 true 的数据项保留。例如,过滤掉 V2 和 V3 保留了 V1,为区分命名为 V’1。
图 8 filter 算子对 RDD 转换
distinct将RDD中的元素进行去重操做。图9中的每一个方框表明一个RDD分区,经过distinct函数,将数据去重。 例如,重复数据V一、 V1去重后只保留一份V1。
图9 distinct算子对RDD转换
subtract至关于进行集合的差操做,RDD 1去除RDD 1和RDD 2交集中的全部元素。图10中左侧的大方框表明两个RDD,大方框内的小方框表明RDD的分区。 右侧大方框
表明合并后的RDD,大方框内的小方框表明分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。
图10 subtract算子对RDD转换
sample 将 RDD 这个集合内的元素进行采样,获取全部元素的子集。用户能够设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。
函数参数设置:
‰ withReplacement=true,表示有放回的抽样。
‰ withReplacement=false,表示无放回的抽样。
图 11中 的 每 个 方 框 是 一 个 RDD 分 区。 通 过 sample 函 数, 采 样 50% 的 数 据。V一、 V二、 U一、 U二、U三、U4 采样出数据 V1 和 U一、 U2 造成新的 RDD。
图11 sample 算子对 RDD 转换
takeSample()函数和上面的sample函数是一个原理,可是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果再也不是RDD,而是至关于对采样后的数据进行
Collect(),返回结果的集合为单机的数组。
图12中左侧的方框表明分布式的各个节点上的分区,右侧方框表明单机上返回的结果数组。 经过takeSample对数据采样,设置为采样一份数据,返回结果为V1。
图12 takeSample算子对RDD转换
cache 将 RDD 元素从磁盘缓存到内存。 至关于 persist(MEMORY_ONLY) 函数的功能。
图13 中每一个方框表明一个 RDD 分区,左侧至关于数据分区都存储在磁盘,经过 cache 算子将数据缓存在内存。
图 13 Cache 算子对 RDD 转换
persist 函数对 RDD 进行缓存操做。数据缓存在哪里依据 StorageLevel 这个枚举类型进行肯定。 有如下几种类型的组合(见10), DISK 表明磁盘,MEMORY 表明内存, SER 表明数据是否进行序列化存储。
下面为函数定义, StorageLevel 是枚举类型,表明存储模式,用户能够经过图 14-1 按需进行选择。
persist(newLevel:StorageLevel)
图 14-1 中列出persist 函数能够进行缓存的模式。例如,MEMORY_AND_DISK_SER 表明数据能够存储在内存和磁盘,而且以序列化的方式存储,其余同理。
图 14-1 persist 算子对 RDD 转换
图 14-2 中方框表明 RDD 分区。 disk 表明存储在磁盘, mem 表明存储在内存。数据最初所有存储在磁盘,经过 persist(MEMORY_AND_DISK) 将数据缓存到内存,可是有的分区没法容纳在内存,将含有 V一、 V二、 V3 的RDD存储到磁盘,将含有U1,U2的RDD仍旧存储在内存。
图 14-2 Persist 算子对 RDD 转换
mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操做,而不对 Key 进行处理。
图 15 中的方框表明 RDD 分区。 a=>a+2 表明对 (V1,1) 这样的 Key Value 数据对,数据只对 Value 中的 1 进行加 2 操做,返回结果为 3。
图 15 mapValues 算子 RDD 对转换
下面代码为 combineByKey 函数的定义:
combineByKey[C](createCombiner:(V) C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
说明:
‰ createCombiner: V => C, C 不存在的状况下,好比经过 V 建立 seq C。
‰ mergeValue: (C, V) => C,当 C 已经存在的状况下,须要 merge,好比把 item V
加到 seq C 中,或者叠加。
mergeCombiners: (C, C) => C,合并两个 C。
‰ partitioner: Partitioner, Shuff le 时须要的 Partitioner。
‰ mapSideCombine : Boolean = true,为了减少传输量,不少 combine 能够在 map
端先作,好比叠加,能够先在一个 partition 中把全部相同的 key 的 value 叠加,
再 shuff le。
‰ serializerClass: String = null,传输须要序列化,用户能够自定义序列化类:
例如,至关于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。图 16中的方框表明 RDD 分区。如图,经过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))。
图 16 comBineByKey 算子对 RDD 转换
reduceByKey 是比 combineByKey 更简单的一种状况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),好比叠加。因此 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。
函数实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
图17中的方框表明 RDD 分区。经过用户自定义函数 (A,B) => (A + B) 函数,将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 相加运算,结果为( V1,3)。
图 17 reduceByKey 算子对 RDD 转换
partitionBy函数对RDD进行分区操做。
函数定义以下。
partitionBy(partitioner:Partitioner)
若是原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,若是不一致,则至关于根据分区器生成一个新的ShuffledRDD。
图18中的方框表明RDD分区。 经过新的分区策略将原来在不一样分区的V一、 V2数据都合并到了一个分区。
图18 partitionBy算子对RDD转换
cogroup函数将两个RDD进行协同划分,cogroup函数的定义以下。
cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
对在两个RDD中的Key-Value类型的元素,每一个RDD相同Key的元素分别聚合为一个集合,而且返回两个RDD中对应Key的元素集合的迭代器。
(K, (Iterable[V], Iterable[W]))
其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。
图19中的大方框表明RDD,大方框内的小方框表明RDD中的分区。 将RDD1中的数据(U1,1)、 (U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。
图19 Cogroup算子对RDD转换
join 对两个须要链接的 RDD 进行 cogroup函数操做,将相同 key 的数据可以放到一个分区,在 cogroup 操做以后造成的新 RDD 对每一个key 下的元素进行笛卡尔积的操做,返回的结果再展平,对应 key 下的全部元组造成一个集合。最后返回 RDD[(K, (V, W))]。
下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
图 20是对两个 RDD 的 join 操做示意图。大方框表明 RDD,小方框表明 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 作链接后结果为 (V1,(1,1)) 和 (V1,(1,2))。
图 20 join 算子对 RDD 转换
LeftOutJoin(左外链接)和RightOutJoin(右外链接)至关于在join的基础上先判断一侧的RDD元素是否为空,若是为空,则填充为空。 若是不为空,则将数据进行链接运算,并
返回结果。
下面代码是leftOutJoin的实现。
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
}
本质上在 Action 算子中经过 SparkContext 进行了提交做业的 runJob 操做,触发了RDD DAG 的执行。
例如, Action 算子 collect 函数的代码以下,感兴趣的读者能够顺着这个入口进行源码剖析:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
/* 提交 Job*/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
foreach 对 RDD 中的每一个元素都应用 f 函数操做,不返回 RDD 和 Array, 而是返回Uint。图22表示 foreach 算子经过用户自定义函数对每一个数据项进行操做。本例中自定义函数为 println(),控制台打印全部数据项。
图 22 foreach 算子对 RDD 转换
函数将数据输出,存储到 HDFS 的指定目录。
下面为 saveAsTextFile 函数的内部实现,其内部
经过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将 RDD 中的每一个元素映射转变为 (null, x.toString),而后再将其写入 HDFS。
图 23中左侧方框表明 RDD 分区,右侧方框表明 HDFS 的 Block。经过函数将RDD 的每一个分区存储为 HDFS 中的一个 Block。
图 23 saveAsHadoopFile 算子对 RDD 转换
saveAsObjectFile将分区中的每10个元素组成一个Array,而后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
下面代码为函数内部实现。
map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
图24中的左侧方框表明RDD分区,右侧方框表明HDFS的Block。 经过函数将RDD的每一个分区存储为HDFS上的一个Block。
图24 saveAsObjectFile算子对RDD转换
collect 至关于 toArray, toArray 已通过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操做。
图 25中左侧方框表明 RDD 分区,右侧方框表明单机内存中的数组。经过函数操做,将结果返回到 Driver 程序所在的节点,以数组形式存储。
图 25 Collect 算子对 RDD 转换
collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。
图26中的左侧方框表明RDD分区,右侧方框表明单机数组。 数据经过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。
图26 CollectAsMap算子对RDD转换
实现的是先reduce再collectAsMap的功能,先对RDD的总体进行reduce操做,而后再收集全部结果返回为一个HashMap。
下面代码为lookup的声明。
lookup(key:K):Seq[V]
Lookup函数对(Key,Value)型的RDD操做,返回指定Key对应的元素造成的Seq。 这个函数处理优化的部分在于,若是这个RDD包含分区器,则只会对应处理K所在的分区,而后返回由(K,V)造成的Seq。 若是RDD不包含分区器,则须要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
图28中的左侧方框表明RDD分区,右侧方框表明Seq,最后结果返回到Driver所在节点的应用中。
图28 lookup对RDD转换
count 返回整个 RDD 的元素个数。
内部函数实现为:
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
图 29中,返回数据的个数为 5。一个方块表明一个 RDD 分区。
图29 count 对 RDD 算子转换
top可返回最大的k个元素。 函数定义以下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]
相近函数说明以下。
·top返回最大的k个元素。
·take返回最小的k个元素。
·takeOrdered返回最小的k个元素,而且在返回的数组中保持元素的顺序。
·first至关于top(1)返回整个RDD中的前k个元素,能够定义排序的方式Ordering[T]。
返回的是一个含前k个元素的数组。
reduce函数至关于对RDD中的元素进行reduceLeft函数的操做。 函数实现以下。
Some(iter.reduceLeft(cleanF))
reduceLeft先对两个元素<K,V>进行reduce函数操做,而后将结果和迭代器取出的下一个元素<k,V>进行reduce函数操做,直到迭代器遍历完全部元素,获得最后结果。在RDD中,先对每一个分区中的全部元素<K,V>的集合分别进行reduceLeft。 每一个分区造成的结果至关于一个元素<K,V>,再对这个结果集合进行reduceleft操做。
例如:用户自定义函数以下。
f:(A,B)=>(A._1+”@”+B._1,A._2+B._2)
图31中的方框表明一个RDD分区,经过用户自定函数f将数据进行reduce运算。 示例
最后的返回结果为V1@[1]V2U!@U2@U3@U4,12。
图31 reduce算子对RDD转换
fold和reduce的原理相同,可是与reduce不一样,至关于每一个reduce时,迭代器取的第一个元素是zeroValue。
图32中经过下面的用户自定义函数进行fold运算,图中的一个方框表明一个RDD分区。 读者能够参照reduce函数理解。
fold((”V0@”,2))( (A,B)=>(A._1+”@”+B._1,A._2+B._2))
图32 fold算子对RDD转换
aggregate先对每一个分区的全部元素进行aggregate操做,再对分区的结果进行fold操做。
aggreagate与fold和reduce的不一样之处在于,aggregate至关于采用归并的方式进行数据汇集,这种汇集是并行化的。 而在fold和reduce函数的运算过程当中,每一个分区中须要进行串行处理,每一个分区串行计算完结果,结果再按以前的方式进行汇集,并返回最终汇集结果。
函数的定义以下。
aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
图33经过用户自定义函数对RDD 进行aggregate的汇集操做,图中的每一个方框表明一个RDD分区。
rdd.aggregate(”V0@”,2)((A,B)=>(A._1+”@”+B._1,A._2+B._2)),(A,B)=>(A._1+”@”+B_1,A._@+B_.2))
最后,介绍两个计算模型中的两个特殊变量。
广播(broadcast)变量:其普遍用于广播Map Side Join中的小表,以及广播大变量等场景。 这些数据集合在单节点内存可以容纳,不须要像RDD那样在节点之间打散存储。
Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算能够复用。 相比Hadoo的distributed cache,广播的内容能够跨做业共享。 Broadcast的底层实现采用了BT机制。
图33 aggregate算子对RDD转换
②表明V。 ③表明U。 accumulator变量:容许作全局累加操做,如accumulator变量普遍使用在应用中记录当前的运行指标的情景。--------------------- 做者:魅影猎鹰 来源:CSDN 原文:https://blog.csdn.net/qq_32595075/article/details/79918644 版权声明:本文为博主原创文章,转载请附上博文连接!