map的输入变换函数应用于RDD中全部元素,而mapPartitions应用于全部分区。区别于mapPartitions主要在于调用粒度不一样。
mapPartition能够倒过来理解,先partition,再把每一个partition进行map函数,
适用场景:
若是在映射的过程当中须要频繁建立额外的对象,使用mapPartitions要比map高效的多。mysql
val numbers: RDD[Int] = sc.parallelize(seqs,3) //map numbers.map(x => { println("AAA")//打印6次 x * 3 }).collect().foreach(println(_)) /** * 遍历分区(3个) */ numbers.mapPartitions(par => { println("aaa")//打印3次 par.map(p => p * 3) }).collect().foreach(println(_))
过滤操做,知足filter内function函数为true的RDD内全部元素组成一个新的数据集es6
val seqs = Seq(1,2,3,4,5,6) //4,5,6 seqs.filter(x=> x > 3).foreach(println(_))
map是对RDD元素逐一进行函数操做映射为另一个RDD,
而flatMap操做是将函数应用于RDD之中的每个元素,将
返回迭代器的全部内容构成的新的RDD。
flatMap和Map区别在于map为“映射”,而flatMap则是“先映射,后扁平化”。sql
val seqs = Array("aaa AAA","bbb BBB","ccc CCC","ddd DDD") val numbers = sc.parallelize(seqs) scala> numbers.map(x => x.split(" ")).collect() res1: Array[Array[String]] = Array(Array(aaa, AAA), Array(bbb, BBB), Array(ccc, CCC), Array(ddd, DDD)) scala> numbers.flatMap(x=>x.split(" ")).collect() res2: Array[String] = Array(aaa, AAA, bbb, BBB, ccc, CCC, ddd, DDD)
与mapPartitions相似,但须要提供一个表示分区索引值的整型值做为参数,所以function必须是(int, Iterator\<T\>)=>Iterator\<U\>类型的。shell
//统计键值对中的各个分区的元素 val rdd = sc.parallelize(List((1,1), (1,2), (2,3), (2,4), (3,5), (3,6),(4,7), (4,8),(5,9), (5,10)),3) def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={ var res = List[(Int,(Int,Int))]() while(iter.hasNext){ var next = iter.next() res=res.::(i1,next) } res.iterator } val mapPartIndexRDD = rdd.mapPartitionsWithIndex(mapPartIndexFunc) mapPartIndexRDD.foreach(println(_)) //计算结果 (0,(1,1)) (0,(1,2)) (0,(2,3)) (1,(2,4)) (1,(3,5)) (1,(3,6)) (2,(4,7)) (2,(4,8)) (2,(5,9)) (2,(5,10))
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样apache
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> rdd.sample(true,0.57,5).collect res10: Array[Int] = Array(8, 8, 8, 9)
合并,它只是将rdd1和rdd2在逻辑上合并,并不会进行数据的合并以传输,不去重ide
scala>var rdd1 = sc.parallelize(List("aa","aa","bb","cc","dd")); rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala>var rdd2 = sc.parallelize(List("aa","dd","ff")); rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24 scala>rdd1.union(rdd2).collect(); res3: Array[String] = Array(aa, aa, bb, cc, dd, aa, dd, ff)
RDD1.intersection(RDD2),返回两个RDD的交集,而且去重
intersection须要混洗数据,比较浪费性能函数
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd")) RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24 scala> var RDD2 = sc.parallelize(List("aa","dd","ff")) RDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> RDD1.intersection(RDD2).collect res5: Array[String] = Array(aa, dd)
distinct用于去重, 咱们生成的RDD可能有重复的元素,使用distinct方法能够去掉重复的元素, 不过此方法涉及到混洗,操做开销很大 oop
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd")) RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24 scala> RDD1.collect res4: Array[String] = Array(aa, aa, bb, cc, dd) scala> val distinctRDD = RDD1.distinct.collect distinctRDD: Array[String] = Array(aa, dd, bb, cc)
groupByKey会将RDD[key,value] 按照相同的key进行分组,造成RDD[key,Iterable[value]]的形式, 有点相似于sql中的groupby,例如相似于mysql中的group_concat 性能
//按照学生姓名对学生成绩进行分组 scala> val scoreDetail = sc.parallelize(List(("xiaoming",75),("xiaoming",90),("lihua",95),("lihua",100),("xiaofeng",85))) scoreDetail: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> scoreDetail.groupByKey().collect().foreach(println(_)) (lihua,CompactBuffer(95, 100)) (xiaoming,CompactBuffer(75, 90)) (xiaofeng,CompactBuffer(85))
接收一个函数,按照相同的key进行reduce操做,相似于scala的reduce的操做
例如RDD {(1, 2), (3, 4), (3, 6)}进行reduce ,key不变,value相加es5
scala> var mapRDD = sc.parallelize(List((1,2),(3,4),(3,6))) mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> var reduceRDD = mapRDD.reduceByKey(_+_) reduceRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26 scala> reduceRDD.foreach(x=>println(x)) (1,2) (3,10)
对PairRDD中相同Key的值进行聚合操做,在聚合过程当中一样使用了一个中立的初始值,由于aggregateByKey是对相同Key中的值进行聚合操做,因此aggregateByKey函数最终返回的类型仍是Pair RDD,对应的结果是Key和聚合好的值
val data = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,7),(3,8)),1) //println(data.partitions.size) /** * 比较相同key得两个value中的最大值,第一次为max(1,3),1为初始值,得:3,第二次为max(3,2),得3, * 第三次为max(3,4),得:4,因此key为1的结果为:(1,4) */ def seq(a:Int, b:Int) : Int ={ math.max(a,b) } /** * 不一样分区中相同key的value相加,若是只有一个分区,此方法不起效果 */ def comb(a:Int, b:Int) : Int ={ a + b } //聚合打印结果 data.aggregateByKey(1)(seq, comb).collect.foreach(println(_)) //查看各个分区数据 data.mapPartitionsWithIndex { (partid, iter) => { var part_map = scala.collection.mutable.Map[String, List[(Int,Int)]]() var part_name = "part_" + partid part_map(part_name) = List[(Int,Int)]() while (iter.hasNext) { part_map(part_name) :+= iter.next() //:+= 列表尾部追加元素 } part_map.iterator } }.collect().foreach(println(_))
用于对pairRDD按照key进行排序,第一个参数能够设置true或者false,默认是true
scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6))) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> rdd.sortByKey().collect res4: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5)) scala> rdd.sortByKey(true).collect res5: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5)) scala> rdd.sortByKey(false).collect res6: Array[(Int, Int)] = Array((6,5), (5,6), (4,4), (3,4), (2,5), (1,2))
RDD1.join(RDD2) ,能够把RDD1,RDD2中的相同的key给链接起来,相似于sql中的inner join操做,返回两边都匹配的数据
scala> val RDD1 = sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1"),("D","d1"),("E","e1"),("F","f1"))) RDD1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[19] at parallelize at <console>:24 scala> val RDD2 = sc.parallelize(Array(("A","a2"),("B","b2"),("C","c1"),("C","c2"),("C","c3"),("E","e2"))) RDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24 scala> RDD1.join(RDD2).collect res8: Array[(String, (String, String))] = Array((B,(b1,b2)), (A,(a1,a2)), (C,(c1,c1)), (C,(c1,c2)), (C,(c1,c3)), (E,(e1,e2))) scala> RDD2.join(RDD1).collect res9: Array[(String, (String, String))] = Array((B,(b2,b1)), (A,(a2,a1)), (C,(c1,c1)), (C,(c2,c1)), (C,(c3,c1)), (E,(e2,e1)))
其余操做:
left outer join:是以左边为基准,向左靠(左边(a)的记录必定会存在,右边(b)的记录有的返回Some(x),没有的补None。)
scala> RDD1.leftOuterJoin(RDD2).collect res11: Array[(String, (String, Option[String]))] = Array((B,(b1,Some(b2))), (A,(a1,Some(a2))), (C,(c1,Some(c1))), (C,(c1,Some(c2))), (C,(c1,Some(c3))), (E,(e1,Some(e2))), (F,(f1,None)), (D,(d1,None))) scala> RDD2.leftOuterJoin(RDD1).collect res12: Array[(String, (String, Option[String]))] = Array((B,(b2,Some(b1))), (A,(a2,Some(a1))), (C,(c1,Some(c1))), (C,(c2,Some(c1))), (C,(c3,Some(c1))), (E,(e2,Some(e1))))
right outer join:是以右边为基准,向右靠(右边(b)的记录必定会存在,左边(a)的记录有的返回Some(x),没有的补None。)
scala> RDD1.rightOuterJoin(RDD2).collect res13: Array[(String, (Option[String], String))] = Array((B,(Some(b1),b2)), (A,(Some(a1),a2)), (C,(Some(c1),c1)), (C,(Some(c1),c2)), (C,(Some(c1),c3)), (E,(Some(e1),e2))) scala> RDD2.rightOuterJoin(RDD1).collect res14: Array[(String, (Option[String], String))] = Array((B,(Some(b2),b1)), (A,(Some(a2),a1)), (C,(Some(c1),c1)), (C,(Some(c2),c1)), (C,(Some(c3),c1)), (E,(Some(e2),e1)), (F,(None,f1)), (D,(None,d1)))
full outer join:左边和右边的都必定存在
scala> RDD1.fullOuterJoin(RDD2).collect res16: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b1),Some(b2))), (A,(Some(a1),Some(a2))), (C,(Some(c1),Some(c1))), (C,(Some(c1),Some(c2))), (C,(Some(c1),Some(c3))), (E,(Some(e1),Some(e2))), (F,(Some(f1),None)), (D,(Some(d1),None))) scala> RDD2.fullOuterJoin(RDD1).collect res17: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b2),Some(b1))), (A,(Some(a2),Some(a1))), (C,(Some(c1),Some(c1))), (C,(Some(c2),Some(c1))), (C,(Some(c3),Some(c1))), (E,(Some(e2),Some(e1))), (F,(None,Some(f1))), (D,(None,Some(d1))))
对两个RDD中的KV元素,每一个RDD中相同key中的元素分别聚合成一个集合。
与reduceByKey不一样的是针对两个RDD中相同的key的元素进行合并。
例子中将多个RDD中同一个Key对应的Value组合到一块儿。rdd1中不存在Key为dd的元素(天然就不存在Value了),在组合的过程当中将rdd1对应的位置
设置为CompactBuffer()了,而不是去掉了。
scala> val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(Array(("aa",3),("bb",4),("cc",5),("dd",6),("aa",8))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24 scala> val rdd3 = rdd1.cogroup(rdd2).collect rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((aa,(CompactBuffer(1),CompactBuffer(3, 8))), (dd,(CompactBuffer(),CompactBuffer(6))), (bb,(CompactBuffer(2),CompactBuffer(4))), (cc,(CompactBuffer(6),CompactBuffer(5))))
RDD1.cartesian(RDD2) 返回RDD1和RDD2的笛卡儿积,这个开销很是大
scala> var RDD1 = sc.parallelize(List("1","2","3")) RDD1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> var RDD2 = sc.parallelize(List("a","b","c")) RDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> RDD1.cartesian(RDD2).collect res0: Array[(String, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c))
有种特殊的Rdd,即pipedRdd,提供了调用外部程序如基于CUDA的C++程序,使其可以更快的进行计算。caffe on spark 和tensorflow on spark 也是基于此机制
#准备脚本 #!/bin/sh echo "Running shell script" while read LINE; do echo ${LINE}! done # Spark RDD调用 scala> val data = sc.parallelize(List("hi","hello","how","are","you")) data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24 scala> val scriptPath = "/home/hadoop/echo.sh" scriptPath: String = /home/hadoop/echo.sh scala> val pipeRDD = data.pipe(scriptPath) pipeRDD: org.apache.spark.rdd.RDD[String] = PipedRDD[53] at pipe at <console>:28 scala> pipeRDD.collect() res21: Array[String] = Array(Running shell script, hi!, hello!, how!, are!, you!)
他们两个都是RDD的分区进行从新划分,repartition只是coalesce接口中shuffle为true的简易实现。
repartition必定会发生shuffle过程
coalesce则不必定
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
假设RDD有N个分区,须要从新划分红M个分区
1)N<M。通常状况下N个分区有数据分布不均匀的情况,利用HashPartitioner函数将数据从新分区为M个,这时须要将shuffle设置为true。 2)若是N>M而且N和M相差很少,(假如N是1000,M是100)那么就能够将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时能够将shuff设置为false,在shuffl为false的状况下,若是M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。 3)若是N>M而且二者相差悬殊,这时若是将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能形成spark程序的并行度不够,从而影响性能,若是在M为1的时候,为了使coalesce以前的操做有更好的并行度,能够讲shuffle设置为true。 总之:若是shuff为false时,若是传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不通过shuffle,是没法将RDDde分区数变多的。
根据给定的分区程序对RDD进行从新分区,并在每一个生成的分区内按键对记录进行排序。 这比调用从新分区,而后在每一个分区内进行排序更有效率,由于它能够将排序压入洗牌机器。
repartitionAndSortWithinPartitions算是一个高效的算子,是由于它要比使用repartition And sortByKey 效率高,这是因为它的排序是在shuffle过程当中进行,一边shuffle,一边排序;
package core import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} object TransformationDemo { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("test") val sc = new SparkContext(sparkConf) val rdd1: RDD[(Int, Int)] = sc.parallelize(List((1,2),(2,3),(3,7),(4,8),(5,2),(6,5),(7,7))) //1 println(rdd1.partitions.size) /** * (0,(7,7)) * (0,(6,5)) * (0,(5,2)) * (0,(4,8)) * (0,(3,7)) * (0,(2,3)) * (0,(1,2)) */ rdd1.mapPartitionsWithIndex(mapPartIndexFunc).foreach(println) //从新分区并排序(默认根据key升序排序) val rdd2: RDD[(Int, Int)] = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3)) //3 println(rdd2.partitions.size) /** * (0,(6,5)) * (0,(3,7)) * (1,(7,7)) * (1,(4,8)) * (1,(1,2)) * (2,(5,2)) * (2,(2,3)) */ rdd2.mapPartitionsWithIndex(mapPartIndexFunc).foreach(println) /** * (3,7) * (6,5) * (1,2) * (4,8) * (7,7) * (2,3) * (5,2) */ rdd2.collect().foreach(println) sc.stop() } /** * 遍历获取每一个分区中的数据 * @param i1 * @param iter * @return */ def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={ var res = List[(Int,(Int,Int))]() while(iter.hasNext){ var next = iter.next() res=res.::(i1,next) } res.iterator } }