你真知道如何高效用mapPartitions吗?


1. mappartition简介typescript


首先,说到mapPartitions你们确定想到的是map和MapPartitions的对比。你们都知道mapPartition算子是使用一个函数针对分区计算的,函数参数是一个迭代器。而map只针对每条数据调用的,因此存在访问外部数据库等状况时mapParititons更加高效。
mapPartitions函数:
 /** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
有代码可知mapPartitions的函数参数是传入一个迭代器,返回值是另外一个迭代器。
map函数:
 /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
map函数就是将rdd的元素由T类型转化为U类型。
综上可知,map和foreach这类的是针对一个元素调用一次咱们的函数,也便是咱们的函数参数是单个元素,假如函数内部存在数据库连接、文件等的建立及关闭,那么会致使处理每一个元素时建立一次连接或者句柄,致使性能底下,不少初学者犯过这种毛病。
而foreachpartition/mapPartitions是针对每一个分区调用一次咱们的函数,也便是咱们函数传入的参数是整个分区数据的迭代器,这样避免了建立过多的临时连接等,提高了性能。
下面的例子都是1-20这20个数字,通过map完成a*3的转换:
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
结果
  
    
  
  
   
   
            
   
   
    
      
    
    
     
     
              
     
     

3. mappartitions低效用法数据库


3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
你们一般的作法都是申请一个迭代器buffer,将处理后的数据加入迭代器buffer,而后返回迭代器。以下面的demo。
val a = sc.parallelize(1 to 20, 2) def terFunc(iter: Iterator[Int]) : Iterator[Int] = {  var res = List[Int]()  while (iter.hasNext) {  val cur = iter.next; res.::= (cur*3) ; } res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
结果乱序了,由于个人list是无序的,能够使用LinkList:
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33

4. mappartitions高效用法数组


注意,3中的例子,会在mappartition执行期间,在内存中定义一个数组而且将缓存全部的数据。假如数据集比较大,内存不足,会致使内存溢出,任务失败。对于这样的案例,Spark的RDD不支持像mapreduce那些有上下文的写方法。其实,浪尖有个方法是无需缓存数据的,那就是自定义一个迭代器类。以下例:
  
    
  
  
   
   
            
   
   
    
      
    
    
     
     
              
     
     
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {  def hasNext : Boolean = {   iter.hasNext }      def next : Int= {        val cur = iter.next cur*3  }}  
val result = a.mapPartitions(v => new CustomIterator(v))println(result.collect().mkString(","))
结果:
   
     
   
   
    
    
             
    
    
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
其实,主要问题就是返回值类型和参数类型要一致,那么不一致咋办呢?
欢迎留言~
推荐阅读:
27.scala的类型推断
尝尝鲜|Spark 3.1自适应执行计划
从 PageRank Example 谈 Spark 应用程序调优

本文分享自微信公众号 - 浪尖聊大数据(bigdatatip)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。缓存

相关文章
相关标签/搜索