Spark Partition

分区的意义node

Spark RDD 是一种分布式的数据集,因为数据量很大,所以它被切分红不一样分区并存储在各个Worker节点的内存中。从而当咱们对RDD进行操做时,其实是对每一个分区中的数据并行操做。Spark根据字段进行partition相似于关系型数据库中的分区,能够加大并行度,提升执行效率。Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。 数据库

     

1. RDD repartition和partitionBy的区别apache

spark中RDD两个经常使用的重分区算子,repartition 和 partitionBy 都是对数据进行从新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd(key-value类型的数据),可是当它们同时都用于 PairRdd时,效果也是不同的。reparation的分区比较的随意,没有什么规律,而partitionBy把相同的key都分到了同一个分区。分布式

val parRDD = pairRDD.repartition(10) //重分区为10;
val parRDD = pairRDD.partitionBy(new HashPartitioner(10)) //重分区为10;ide

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD 

object PartitionDemo {
 
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    
    val rdd = sc.parallelize(List("hello", "jason", "what", "are", "you", "doing","hi","jason","do","you","eat","dinner",
            "hello","jason","do","you","have","some","time","hello","jason","time","do","you","jason","jason"),4) //设置4个分区;
    val word_count = rdd.flatMap(_.split(",")).map((_,1)) 
    val repar = word_count.repartition(10)                       //重分区为10;
    val parby = word_count.partitionBy(new HashPartitioner(10))  //重分区为10;
    print(repar)
    print(parby)
  }
  
  def print(rdd : RDD[(String, Int)]) = {
    rdd.foreachPartition(pair=>{
      println("partion " + TaskContext.get.partitionId + ":") 
      pair.foreach(p=>{ println("  " + p) })
    })
    println
  }
}
View Code

partitionBy的三种分区方式:url

一、HashPartitioner
val parRDD= pairRDD.partitionBy(new HashPartitioner(3))
HashPartitioner肯定分区的方式:partition = key.hashCode () % numPartitionsspa

二、RangePartitioner
val parRDD= pairRDD.partitionBy(new RangePartitioner(3,counts))
RangePartitioner会对key值进行排序,而后将key值被划分红3份key值集合。.net

三、CustomPartitioner
CustomPartitioner能够根据本身具体的应用需求,自定义分区。3d

class CustomPartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts
 override def getPartition(key: Any): Int =
 {
      if(key==1)){ 0 }
      else if (key==2){ 1} 
      else{ 2 }
  } 
}
val parRDD = pairRDD.partitionBy(new CustomPartitioner(3))        

2. DataFrame分区 指针

1. repartition:根据字段分区
val regionDf = peopleDf.repartition($"region")

2. coalesce: coalesce通常用于合并/减小分区,将数据从一个分区移到另外一个分区。
val peopleDF2= peopleDF.coalesce(2) // 原来分区为4,减小到2, 没法增长分区数,例如peopleDF.coalesce(6)执行完分区仍是4

两者区别The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

为何使用repartition而不用coalesce? A full data shuffle is an expensive operation for large data sets, but our data puddle is only 2,000 rows. The repartition method returns equal sized text files, which are more efficient for downstream consumers. (non-partitioned) It took 241 seconds to count the rows in the data puddle when the data wasn’t repartitioned (on a 5 node cluster). (partitioned) It only took 2 seconds to count the data puddle when the data was partitioned — that’s a 124x speed improvement!

3. DataFrameWriter 分段和分区

1. bucketBy:分段和排序仅适用于持久表。 对于基于文件的数据源,能够对输出进行分类。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

2. partitionBy:分区则能够同时应用于save和saveAsTable
peopleDF.write.partitionBy("region").format("parquet").save("people_partitioned.parquet")

saveAsTable 保存数据并持久化表
         DataFrame可使用saveAsTable 命令将其做为持久表保存到Hive Metastore中。Spark将为您建立一个默认的本地Hive Metastore(使用Derby)。与createOrReplaceTempView命令不一样的是, saveAsTable将实现DataFrame的内容并建立指向Hive Metastore中的数据的指针。即便您的Spark程序从新启动后,永久性表格仍然存在,只要您保持与同一Metastore的链接便可。用于持久表的DataFrame能够经过使用表的名称调用tablea方法来建立SparkSession。
       持久化表时您能够自定义表格路径 ,例如df.write.option("path", "/some/path").saveAsTable("t")。当表被删除时,自定义表路径将不会被删除,表数据仍然存在。若是没有指定自定义表格路径,Spark会将数据写入仓库目录下的默认表格路径。当表被删除时,默认的表路径也将被删除。

4. JDBC partition

Spark提供jdbc方法操做数据库,每一个RDD分区都会创建一个单独的JDBC链接。 尽管用户能够设置RDD的分区数目,在一些分布式的shuffle操做(例如reduceByKey 和join)以后,RDD又会变成默认的分区数spark.default.parallelism,这种状况下JDBC链接数可能超出数据库的最大链接。Spark 2.1提供numPartitions 参数来设置JDBC读写时的分区数,能够解决前面说的问题。若是写数据时的分区数超过最大值,咱们能够在写以前使用方法coalesce(numPartitions)来减小分区数

val userDF = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"886500", "partitionColumn"->"user_id", "numPartitions"->"10")).load()

userDF.write.option("maxRecordsPerFile", 10000).mode("overwrite").parquet(outputDirectory)
userDF.repartition(10000).write.mode("overwrite").parquet(outputDirectory)

 

分区案例

val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"825485207", "partitionColumn"->"org_id", "numPartitions"->"10")).load()

(1) jdbc partition: df.write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_readpar")
(2) maxRecordsPerFile: df.write.option("maxRecordsPerFile", 10000).format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_maxRecd")
(3) repartition: df.repartition(4).write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_repar")
(4) rdd key-value partitionBy: df.rdd.map(r => (r.getInt(1), r)).partitionBy(new HashPartitioner(10)).values.saveAsTextFile(s"$filePath/$filename"+"_rddhash")

(1) jdbc partition: 数据分布不均匀,有些分区数据多有的少; key是有序的,根据bound区间将key分红不一样分区

  

(2) maxRecordsPerFile: 同上,当一个分区条数超过maxRecordsPerFile,会被拆分红多个子分区,同一个Key可能所以被分到不一样分区

(3) repartition: 分红同等大小的分区(不能保证每一个分区的条数是同样的); key是无序的,一样的key可能在不一样分区

 (4) rdd key-value partitionBy: 使用partition方法将数据按照必定规则分区,能够自定义分区规则

   

--------------------- 

做者:zhangzeyuan56 
来源:CSDN 
原文:https://blog.csdn.net/zhangzeyuan56/article/details/80935034 
版权声明:本文为博主原创文章,转载请附上博文连接!

---------------------
做者:junzhou134
来源:CSDN
原文:https://blog.csdn.net/m0_37138008/article/details/78936029
版权声明:本文为博主原创文章,转载请附上博文连接!

--------------------- 
做者:JasonLeeblog 
来源:CSDN 
原文:https://blog.csdn.net/xianpanjia4616/article/details/84328928 

相关文章
相关标签/搜索