Spark算子:RDD键值转换操做(4)–cogroup、join

cogroup

##参数为1个RDD函数

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]ui

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]spa

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]it

 

##参数为2个RDDspark

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]io

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]ast

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]test

 

##参数为3个RDDforeach

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]d3

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

 

cogroup至关于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。

参数numPartitions用于指定结果的分区数。

参数partitioner用于指定分区函数。

##参数为1个RDD的例子

def main(args: Array[String]): Unit = {
  //默认分区12个
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "A"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  rdd1.cogroup(rdd2).foreach(println(_))
}

16/12/20 16:54:05 INFO Executor: Finished task 1.0 in stage 2.0 (TID 25). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(A, B)))
(2,(CompactBuffer(B),CompactBuffer(D)))

(3,(CompactBuffer(C, C),CompactBuffer(B, E)))
(4,(CompactBuffer(),CompactBuffer(A)))
16/12/20 16:54:05 INFO Executor: Finished task 4.0 in stage 2.0 (TID 28). 1553 bytes result sent to driver

 

##参数为2个RDD的例子

def main(args: Array[String]): Unit = {
  //默认分区12个
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  val rdd3 = sc.makeRDD(Array((5, "a"), (3, "b"), (4, "c"), (2, "d"), (3, "e"), (1, "f")))
  rdd1.cogroup(rdd2,rdd3).foreach(println(_))
}

16/12/20 16:58:38 INFO Executor: Finished task 1.0 in stage 3.0 (TID 37). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(1, B),CompactBuffer(f)))
(2,(CompactBuffer(B),CompactBuffer(D),CompactBuffer(d)))
(3,(CompactBuffer(C),CompactBuffer(B, E),CompactBuffer(b, e)))
(4,(CompactBuffer(),CompactBuffer(A),CompactBuffer(c)))
(5,(CompactBuffer(),CompactBuffer(),CompactBuffer(a)))

16/12/20 16:58:38 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 43, localhost, partition 7, PROCESS_LOCAL, 5237 bytes)

 

##参数为3个RDD示例略,同上。

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

 

join至关于SQL中的内关联join,只返回两个RDD根据K能够关联上的结果,join只能用于两个RDD之间的关联,若是要多个RDD关联,多关联几回便可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

def main(args: Array[String]): Unit = {
  //默认分区12个
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  rdd1.join(rdd2).foreach(println(_))
}

(1,(A,1))
(1,(A,B))

(2,(B,D)) (3,(C,B)) (3,(C,E))

相关文章
相关标签/搜索