有三个RDD ,分别是 rddA,rddB,rddC.取数据1,2,3,4,5
而且分红三个分区,对输入的数据的每个数据*2 ,只取大于 6 的数据.web
val rddA = sc.parallelize(List(1, 2, 3, 4, 5),3) //rddA: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] val rddB = rddA.map(_*2) //rddB: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] val rddC = rddB.filter(_>6) //rddC: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] rddC.collect() //res0: Array[Int] = Array(8, 10)
使用代码rddC.toDebugString
打印依赖关系apache
res1: String = (2) MapPartitionsRDD[2] at filter at <console>:25 [] | MapPartitionsRDD[1] at map at <console>:25 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 []
OneToOneDependency
RangeDependency
,它仅仅被org.apache.spark.rdd.UnionRDD
使用。UnionRDD
是把多个RDD合成一个RDD,这些RDD是被拼接而成,每一个父RDD的Partition
的相对顺序不会变,只不过每一个父RDD在UnionRDD
中的Partition
的起始位置不一样map
, filter
, union
, join
, mapPartitions
, mapValues
ide
Shuffle
的操做.在会 job
中产生一个stage
groupByKey
, join
,partitionBy
,reduce
常见算子ui
val path = "/user/spark/data/wc.txt" val lines = sc.textFile(path, 3) //查看每一个分区的数据 // lines.mapPartitionsWithIndex((n, partition) => { // partition.map(x => (s"分区编号${n}", s"分区数据${x}")) // }).foreach(println) val words = lines.flatMap(_.split(",")) val wordPair = words.map(x => (x, 1)) val result = wordPair.reduceByKey(_ + _) result.collect().foreach(println)
若是以为文章不错的话记得关注下公号哦idea