def zipWithIndex(): RDD[(T, Long)]算法
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。函数
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2) var rdd3 = rdd2.zipWithIndex() rdd3.collect.foreach(println(_)) }
16/12/20 14:23:41 INFO DAGScheduler: Job 1 finished: collect at ShellTest.scala:23, took 0.050251 s
(A,0)
(B,1)
(C,2)
(D,3)
(E,4)
16/12/20 14:23:41 INFO SparkContext: Invoking stop() from shutdown hookspa
def zipWithUniqueId(): RDD[(T, Long)]scala
该函数将RDD中元素和一个惟一ID组合成键/值对,该惟一ID生成算法以下:索引
每一个分区中第一个元素的惟一ID值为:该分区索引号,ip
每一个分区中第N个元素的惟一ID值为:(前一个元素的惟一ID值) + (该RDD总的分区数)it
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2) var rdd3 = rdd2.zipWithUniqueId() rdd3.collect.foreach(println(_)) }
16/12/20 14:25:55 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:23, took 0.568861 s
(A,0)
(B,2)
(C,1)
(D,3)
(E,5)
16/12/20 14:25:55 INFO SparkContext: Invoking stop() from shutdown hookspark