注:某些函数只有PairRDD只有,而普通的RDD则没有,好比gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操做数组
RDD基本转换:ide |
|||
RDD[U] map(f: T => U)函数 T:原RDD中元素类型spa U:新RDD中元素类型orm |
函数将T元素转换为新的U元素排序 |
rdd.map(x => x + 1)索引 |
{1, 2, 3, 3}ip =>{2, 3, 4, 4}ci |
RDD[U] flatMap(f: T => TraversableOnce[U])it TraversableOnce:集合与迭代器的父类 |
函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素造成新的RDD |
rdd.flatMap(x => x.to(3)) |
{1, 2, 3, 3} =>{1, 2, 3, 2, 3, 3, 3} |
RDD[T] filter(f: T => Boolean) |
函数对每一个元素进行过滤,经过的元素造成新的RDD |
rdd.filter(x => x != 1) |
{1, 2, 3, 3} =>{2, 3, 3} |
RDD[T] distinct() |
去重 |
rdd.distinct() |
{1, 2, 3, 3} =>{1, 2, 3} |
RDD[U] mapPartitions(f: Iterator[T] => Iterator[U]) |
与map同样,只是转换时是以分区为单位,将一个分区全部元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每一个元素都会调用一个函数,即这里有几个分区则才调用几回函数
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次 |
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) rdd.mapPartitions((it: Iterator[Int]) => { var l = List[Int](); it.foreach((e: Int) => l = e * 2 :: l); l.iterator }) |
=>{2, 4, 6, 8, 10} |
RDD[U] mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U]) |
与mapPartitions相似,不一样的时函数多了个分区索引的参数 |
|
|
RDD[T] union(other: RDD[T]) |
两个RDD 并集,包括重复的元素 |
rdd.union(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, 2, 2, 3, 3, 3, 4, 5} |
RDD[T] intersection(other: RDD[T]) |
两个RDD 交集 |
rdd.intersection(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{3} |
RDD[T] subtract(other: RDD[T]) |
两个RDD相减 |
rdd.subtract(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, 2, 2} |
RDD[(T, U)] cartesian(other: RDD[U]) |
两个RDD相减笛卡儿积 |
rdd.cartesian(otherRdd) |
{ 1, 2 } { 3, 4} =>{(1,3),(1,4),(2,3),(2,4)} |
RDD[T] sortBy( f: (T) => K, ascending: Boolean,numPartitions: Int)
|
根据转换后的值进行排序,传入的是一个(T) => K 转换函数 |
rdd.sortBy(_._2, false, 1) 这里根据value进行降序排序 |
{("leo", 65), ("tom", 50), ("marry", 100), ("jack", 80)} =>{("marry", 100),("jack", 80),("leo", 65), ("leo", 65)}
|
RDD[Array[T]] glom() |
将RDD的每一个分区中的类型为T的元素转换换数组Array[T] |
|
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) val arrRDD = rdd.glom()arrRDD.foreach { (arr: Array[Int]) => { println("[ " + arr.mkString(" ") + " ]"); } } =>[ 1 2 ], [ 3 4 5 ] |
|
|
|
|
键-值RDD转换: |
|||
RDD[(K, U)] mapValues[U](f: V => U) K:key类型 V:value类型 |
将value转换为新的U元素,Key不变 |
rdd.mapValues(_ + 1) |
{"class1", 80), ("class2", 70)} =>{"class1", 81), ("class2", 71)}
|
RDD[(K, U)] flatMapValues(f: V => TraversableOnce[U]) |
对[K,V]型数据中的V值flatmap操做 |
rdd.flatMapValues(_.toCharArray()) |
{ (1, "ab"), (2, "bc")} =>{(1, 'a'), (1, 'b'), (2, 'b'), (2, 'c')} |
RDD[(K, Iterable[V])] groupByKey()
|
根据key进行分组,同一组的元素组成Iterable<V>,并以(key, Iterable<V>)元组类型为元素做为新的RDD返回 |
rdd.groupByKey() |
{("class1", 80), ("class2", 75), ("class1", 90), ("class2", 60)} =>{("class1",[80,90]),("class2",[75,60])}
|
RDD[(K, Iterable[T])] groupBy(f: T => K) T:原RDD元素类型 K:新RDD中元素Key的类型 |
根据函数将元素T映射成相应K后,以此K进行分组 |
rdd.groupBy({ case 1 => 1; case 2 => 2; case "二" => 2 }) |
{ 1, 2, "二" } =>{(1,[1]),(2,[2, "二"])} |
RDD[(K, V)] reduceByKey(func: (V, V) => V) |
先根据key进行分组,再对同一组中的的value进行reduce操做:第一次调用函数时传入的是两个Key所对应的value,从第二次日后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value |
rdd. reduceByKey(_ + _) |
{("class1", 80), ("class2", 75), ("class1", 90), ("class2", 60)} =>{("class1", 170),("class2", 135)} |
RDD[(K, V)] sortByKey() |
根据key的大小进行排序(注:并非先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序) |
rdd.sortByKey(false) |
{(65, "leo"), (50, "tom"),(100, "marry"), (85, "jack")} =>{(100, "marry"),(85, "jack"),(65, "eo"),(50, "tom")} |
|
|
|
|
RDD[(K, V)] foldByKey(zeroValue: V)(func: (V, V) => V):
zeroValue:每一个分区相同Key累计时的初始值,以及不一样分区相同Key合并时的初始值 e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication |
对每一个value先进行func操做,且funcfoldByKey函数是经过调用comineByKey函数实现的。 zeroVale:对V进行初始化,其实是经过CombineByKey的createCombiner实现的 V => (zeroValue,V),再经过func函数映射成新的值,即func(zeroValue,V)
func: Value将经过func函数按Key值进行合并(其实是经过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的) |
val people = List(("Mobin", 1), ("Lucy", 2), ("Amy", 3), ("Amy", 4), ("Lucy", 5)) val rdd = sc.parallelize(people,2) val foldByKeyRDD = rdd.foldByKey(10)((v1, v2) => { println(v1 + " + " + v2 + " = " + (v1 + v2)); v1 + v2 }) //先对每一个V都加10,再对相同Key的value值相加 foldByKeyRDD.foreach(println) |
//处理第一个分区数据 10 + 1 = 11 // ("Mobin", 1) 10 + 2 = 12 // ("Lucy", 2) ===================== //处理第二个分区数据 10 + 3 = 13 // ("Amy", 3) 13 + 4 = 17 // ("Amy", 4)同分区同Key的Val先合并 10 + 5 = 15 // ("Lucy", 5) ===================== //将不一样分区相同Key的Value合并起来 12 + 15 = 27 // "Lucy"跨分区,因此需合并 (Amy,17) (Mobin,11) (Lucy,27) |
RDD[(K, (V, Option[W]))] leftOuterJoin[W](other: RDD[(K, W)]): |
左外链接,包含左RDD的全部数据,若是右边没有与之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2), ("B", 1)) val arr1 = List(("A", "A1"), ("A", "A2")) val rdd = sc.parallelize(arr, 2) val rdd1=sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.leftOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
=> (B,(1,None)) (A,(1,Some(A1))) (A,(1,Some(A2))) (A,(2,Some(A1))) (A,(2,Some(A2))) |
RDD[(K, (Option[V], W))] rightOuterJoin[W](other: RDD[(K, W)]) |
右外链接,包含右RDD的全部数据,若是左边没有与之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2)) val arr1 = List(("A", "A1"), ("A", "A2"), ("B", 1)) val rdd = sc.parallelize(arr, 2) val rdd1 = sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.rightOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
(B,(None,1)) (A,(Some(1),A1)) (A,(Some(1),A2)) (A,(Some(2),A1)) (A,(Some(2),A2)) |
RDD[(K, (V, W))] join(other: RDD[(K, W)) W:另外一RDD元素的value的类型 |
对两个包含<key,value>对的RDD根据key进行join操做,返回类型<key,Tuple2(key,value)> |
rdd.join(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,("leo",100)),(1,("leo",70)),(2, ("jack",90),(2, ("jack",80),(3, ("tom",60),(3, ("tom",50))} |
RDD[(K, (Iterable[V], Iterable[W]))] cogroup(other: RDD[(K, W)]) |
同join,也是根据key进行join,只不过相同key的value分别存放到Iterable<value>中 |
rdd.cogroup(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,(["leo"],[100,70])),(2, (["jack"],[90,80])),(3, (["tom","lily"],[60,50]))} |
|
|
|
|
T reduce(f: (T, T) => T) |
对全部元素进行reduce操做 |
rdd.reduce(_ + _) |
{1, 2, 2, 3, 3, 3} =>14 |
Array[T] collect() |
将RDD中全部元素返回到一个数组里 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.collect() |
{1, 2, 3, 3} =>[1, 2, 3, 3] |
Map[K, V] collectAsMap() |
做用于K-V类型的RDD上,做用与collect不一样的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素 |
rdd.collectAsMap() |
{ ("leo", 65), ("tom", 50), ("tom", 100)} =>{ ("leo", 65), ("tom", 100)} |
Long count() |
统计RDD 中的元素个数 |
rdd.count() |
{1, 2, 3, 3} =>4 |
Map[T, Long] countByValue() |
各元素在 RDD 中出现的次数 注意:This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map. |
rdd.countByValue() |
{1, 2, 3, 3} =>Map(1 -> 1, 3 -> 2, 2 -> 1) |
Map[K, Long] countByKey() |
先根据Key进行分组,再对每组里的value分别进行计数统计 注意:This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory.
To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map. |
|
{ ("leo", 65), ("tom", 50), ("tom", 100), ("tom", 100) } =>Map(leo -> 1, tom -> 3) |
T first() |
取第一个元素,实质上是调用take(1)实现的 |
rdd.first() |
{3, 2, 1, 4} =>3 |
Array[T] take(num: Int) |
从 RDD 中返回前 num 个元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.take(2)
|
{3, 2, 1, 4} =>[3, 2] |
Array[T] top(num: Int ) (implicit ord: Ordering[T])
若是没有传递 ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,能够传递一个自定义的Ordering来覆盖默认提供。 top实现是将Ordering反序后再调用 takeOrdered的:takeOrdered(num)(ord.reverse) |
默认从 RDD 中返回最最大的 num个元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.top(2) |
{3, 2, 1, 4} =>[4, 3] |
Array[T] takeOrdered(num: Int)(implicit ord: Ordering[T])
若是没有传递 ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,能够传递一个自定义的Ordering来覆盖默认提供 |
与top相反,默认取的是前面最小的num个元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.takeOrdered(2)(myOrdering) |
{3, 2, 1, 4} =>[1, 2] |
T fold(zeroValue: T)(op: (T, T) => T)
zeroValue:为每一个分区累计的初始值,以及不一样分区累计的初始值 e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication |
和 reduce() 一 样, 但 是 需 要 提供初始值。注意:每一个分区应用op函数时,都会以zeroValue为初始值进行计算,而后将每一个分区的结果合并时,仍是会以zeroValue为初始值进行合并计算 |
val arr = Array(1, 2, 3, 4, 5); val rdd = sc.parallelize(arr, 2) //分红两分区[1, 2] [3, 4, 5] println(rdd.fold(10)((v1, v2) => { println(v1 + " + " + v2 + " = " + (v1 + v2)); v1 + v2 })) |
//处理第一个分区数据 10 + 1 = 11 11 + 2 = 13 //从第二个元素起,每分区内先累加 ===================== //处理第一个分区数据 10 + 3 = 13 13 + 4 = 17 //从第二个元素起,每分区内先累加 17 + 5 = 22 //从第二个元素起,每分区内先累加 ===================== //将各分区汇总起来 10 + 13 = 23 // 汇总时还会使用初始值来做起始 23 + 22 = 45 45 |
|
|
|
|
U aggregate (zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
初始值类型与原始数据类型能够不一样,但初始值类型决定了返回值类型 |
与fold同样,计算时须要提供初始值,不一样的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不一样的,但fold分区计算函数与分区合并计算函数是同一函数 |
rdd.fold(5)(_ + _, _ + _) |
val arr = Array(1, 2, 3, 4); val rdd = sc.parallelize(arr, 2) println(rdd.aggregate(5)( (v1, v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 + v2 }, (v1, v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 + v2 }) ) 过程与结果与上面的fold函数同样 |
Unit saveAsTextFile(path: String) |
将RDD元素保存到文件中,对每一个元素调用toString方法 |
|
|
Unit foreach(f: T => Unit) |
遍历RDD中的每一个元素 |
rdd.foreach(println(_)) |
无
|
|
|
|
|
|
|
|
|
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
createCombiner:在第一次遇到Key时建立组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),
mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与此次传入的V类型值合并成一个C类型值(C,V)=>C
mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值
partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner
mapSideCombine:是否在map端进行Combine操做,默认为true
例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出
object CombineByKey {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("combinByKey")
val sc = new SparkContext(conf)
val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
val rdd = sc.parallelize(people)
val combinByKeyRDD = rdd.combineByKey(
(x: String) => (List(x), 1),
(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2 + 1),
(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD.foreach(println)
sc.stop()
}
}
输出:
(male,(List(Lufei, Kpop, Mobin),3))
(female,(List(Amy, Lucy),2))
计算过程:
Partition1:
K="male" --> ("male","Mobin") --> createCombiner("Mobin") => peo1 = ( List("Mobin") , 1 )
K="male" --> ("male","Kpop") --> mergeValue(peo1,"Kpop") => peo2 = ( "Kpop" :: peo1_1 , 1 + 1 ) //Key相同调用mergeValue函数对值进行合并
K="female" --> ("female","Lucy") --> createCombiner("Lucy") => peo3 = ( List("Lucy") , 1 )
Partition2:
K="male" --> ("male","Lufei") --> createCombiner("Lufei") => peo4 = ( List("Lufei") , 1 )
K="female" --> ("female","Amy") --> createCombiner("Amy") => peo5 = ( List("Amy") , 1 )
Merger Partition:
K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))