Spark算子:RDD基本转换操做(5)–mapPartitions、mapPartitionsWithIndex

mapPartitions

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]数据库

该函数和map函数相似,只不过映射函数的参数由RDD中的每个元素变成了RDD中每个分区的迭代器。若是在映射的过程当中须要频繁建立额外的对象,使用mapPartitions要比map高效的过。函数

好比,将RDD中的全部数据经过JDBC链接写入数据库,若是使用map函数,可能要为每个元素都建立一个connection,这样开销很大,若是使用mapPartitions,那么只须要针对每个分区创建一个connection。spa

参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。scala

def main(args: Array[String]): Unit = {
  //默认分区12个
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  val func = (iter : Iterator[(Int)]) => {
    var result = List()
    var i = 0;
    while(iter.hasNext){
      i += iter.next
    }
    result.::(i).iterator
  }
  rdd1.mapPartitions{func}.collect.foreach(println(_))
}

16/12/20 11:17:14 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:30, took 0.793098 s
15
40

16/12/20 11:17:14 INFO SparkContext: Invoking stop() from shutdown hook对象

 

mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]索引

函数做用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。ci

def main(args: Array[String]): Unit = {
  //默认分区12个
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  val func = (partIdx : Int,iter : Iterator[(Int)]) => {
    var part_map = scala.collection.mutable.Map[String,List[(Int)]]()
    while(iter.hasNext){
      var part_name = "part_" + partIdx;
      var elem = iter.next()
      if(part_map.contains(part_name)) {
        var elems = part_map(part_name)
        elems ::= elem
        part_map(part_name) = elems
      } else {
        part_map(part_name) = List[(Int)]{elem}
      }
    }
    part_map.iterator
  }
  rdd1.mapPartitionsWithIndex{func}.collect.foreach(println(_))
}

16/12/20 11:11:54 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:37, took 0.748727 s
(part_0,List(5, 4, 3, 2, 1))
(part_1,List(10, 9, 8, 7, 6))

16/12/20 11:11:54 INFO SparkContext: Invoking stop() from shutdown hookit

相关文章
相关标签/搜索