剖析Spark数据分区之Spark RDD分区

本文来自OPPO互联网技术团队,是《剖析Spark数据分区》系列文章的第二篇,将重点分析Spark RDD的数据分区。该系列共分3篇文章,欢迎持续关注。sql

  • 第一篇:主要分析Hadoop中的分片
  • 第二篇:主要分析Spark RDD的分区;
  • 第三篇:主要分析Spark Streaming,TiSpark中的数据分区;

转载请注名做者,同时欢迎关注OPPO互联网技术团队的公众号:OPPO_tech,一同分享OPPO前沿互联网技术及活动。apache

Spark

咱们以Spark on Yarn为例阐述Spark运行原理。segmentfault

任务运行步骤

1.客户端提交Application到RM,RM判断集群资源是否知足需求 ;分布式

2.RM在集群中选择一台NodeManager启动Application Master(cluster模式);ide

3.Driver在AM所在的NodeManager节点启动进程; 函数

4.AM向ResourceManager申请资源,并在每台NodeManager上启动相应的executors;oop

5.Driver开始进行任务调度,经过Transaction操做造成了RDD血缘关系图,即DAG图,最后经过Action的调用,触发Job并调度执行;源码分析

6.DAGScheduler负责Stage级的调度,主要是将DAG切分红若干个Stages,并将每一个Stage打包成Taskset交给TaskScheduler调度;性能

7.TaskScheduler负责Task级的调度,将DAGScheduler给过来的Taskset按照指定的调度策略分发到Executor上执行;大数据

Spark RDD

RDD 弹性分布式数据集,RDD包含5个特征

1.Compute:

RDD在任务计算时是以分区为单位的,经过Compute计算获得每个分片的数据,不一样的RDD子类能够实现本身的compute方法;

2.getPartitions:

计算获取全部分区列表,RDD是一个分区的集合,一个RDD有一个或者多个分区,分区的数量决定了Spark任务的并行度;

3.getDependencies:

获取RDD的依赖,每一个RDD都有依赖关系(源RDD的依赖关系为空),这些依赖关系成为lineage;

4.getPreferredLocations:

对其余RDD的依赖列表,Spark在进行任务调度时,会尝试将任务分配到数据所在的机器上,从而避免了机器间的数据传输,RDD获取优先位置的方法为getPreferredLocations,通常只有涉及到从外部存储结构中读取数据时才会有优先位置,好比HadoopRDD, ShuffleRDD;

5.Partitioner:

决定数据分到哪一个Partition,对于非key-value类型的RDD,Partitioner为None, 对应key-value类型的RDD,Partitioner默认为HashPartitioner。在进行shuffle操做时,如reduceByKey, sortByKey,Partitioner决定了父RDD shuffle的输出时对应的分区中的数据是如何进行map的;

前2个函数是全部RDD必须的,后三个可选,全部的RDD均继承了该接口。

Spark Partition

因为RDD的数据量很大,所以为了计算方便,须要将RDD进行切分并存储在各个节点的分区当中,从而当咱们对RDD进行各类计算操做时,其实是对每一个分区中的数据进行并行的操做。

也就是一份待处理的原始数据会被按照相应的逻辑切分红多分,每份数据对应RDD的一个Partition,partition的数量决定了task的数量,影响程序的并行度,Partition是伴生的,也就是说每一种RDD都有其对应的Partition实现。

HadoopRDD

Spark常常须要从hdfs读取文件生成RDD,而后进行计算分析。这种从hdfs读取文件生成的RDD就是HadoopRDD。

HadoopRDD主要重写了RDD接口的三个方法:

  1. override def getPartitions: Array[Partition]
  2. override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)]
  3. override def getPreferredLocations(split:Partition): Seq[String]

决定分区数量的逻辑在getPartitions中,实际上调用的是InputFormat.getSplits,

InputFormat是一个接口:org.apache.hadoop.mapred.InputFormat,其中getInputSplit就是 图 – 7 中所展现的。

从源码分析可知,在HadoopRDD这种场景下,RDD的分区数在生成RDD以前就已经决定了,是被HADOOP的参数所决定的,咱们能够经过调整:

spark.hadoop.mapreduce.input.fileinputformat.split.minsize;
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;

的大小来调整HadoopRDD的分区数量。

Spark SQL中的分区

Spark SQL 最终将SQL 语句通过逻辑算子树转换成物理算子树。

在物理算子树中,叶子类型的SparkPlan 节点负责从无到有的建立RDD ,每一个非叶子类型的SparkPlan 节点等价于在RDD 上进行一次Transformation ,即经过调用execute()函数转换成新的RDD ,最终执行collect()操做触发计算,返回结果给用户。

重点分析一下叶子节点:

在Spark SQL 中,LeafExecNode 类型的SparkPlan 负责对初始RDD 的建立。

HiveTableScanExec 会根据Hive数据表存储的HDFS 信息直接生成HadoopRDD;FileSourceScanExec 根据数据表所在的源文件生成FileScanRDD 。

当向Hive metastore中读写Parquet表时文件,转化的方式经过 spark.sql.hive.convertMetastoreParquet 控制。

默认为true,若是设置为 true

会使用 :

org.apache.spark.sql.execution.FileSourceScanExec ,

不然会使用 :

org.apache.spark.sql.hive.execution.HiveTableScanExec

目前FileSourceScanExec包括建立分桶表RDD,非分桶表RDD,无论哪一种方式,最终生成的都是一个FileRDD。

下面分析的建立非分桶表的RDD

FileRDD的getPartition方法:

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

要获取maxSplitBytes,那么决定因素在如下三个参数:

结论:

若是想要使得maxSplitBytes值变大,也就是分区数变小。

可经过将defaultMaxSplitBytes值调大,

也就是spark.sql.files.maxPartitionBytes,

将spark.sql.files.openCostInBytes也调大;

若是若是想要使得maxSplitBytes值变小,也就是分区数变大。

能够将defaultMaxSplitBytes值调小,

也就是spark.sql.files.maxPartitionBytes,

将spark.sql.files.openCostInBytes也调小。

下面分析FileSourceScanExec的建立分桶表的RDD。

经过源码分析,分桶表的分区数量跟桶的数量是一对一关系。

HiveTableScanExec

HiveTableScanExec 会根据 Hive数据表存储的 HDFS 信息直接生成 HadoopRDD。

一般状况下,HiveTableScanExec经过文件数量,大小进行分区。

例如:

读入一份 2048M 大小的数据,hdfs 块大小设置为 128M

1) 该目录有1000个小文件

答案:则会生成1000个partition。

2) 若是只有1个文件,

答案:则会生成 16 个partition

3) 若是有一个大文件1024M,其他999 个文件共 1024M

答案:则会生成 1007个分区。

针对HiveTableScanExec类型的调优可参考HadoopRDD。

RDD transformation

一个RDD经过transformation转换成另一个RDD,那么新生成的分区数量是多少呢?

1) filter(), map(), flatMap(), distinct()

partition数量等于parent RDD的数量。

2) rdd.union(other_rdd)

partition数量等于rdd_size + other_rdd_size

3) rdd.intersection(other_rdd)

partition数量等于max(rdd_size, other_rdd_size)

4) rdd.subtract(other_rdd)

partition数量等于rdd_size

5) rdd.cartesian(other_rdd)

partition数量等于rdd_size * other_rdd_size

RDD coalesce 与 repartition

有时候须要从新设置RDD的分区数量,好比RDD的分区中,RDD分区比较多,可是每一个RDD的数量比较小,分区数量增多可增大任务并行度,可是有可能形成每一个分区的数据量偏少,分区数据量太少致使节点间通讯时间在整个任务执行时长占比被放大,因此须要设置一个比较合理的分区。

有两种方法能够重设RDD分区:分别是coalesce()方法和repartition()。

Repartition是coalesce函数中shuffle为true的特例。

分布式计算中,每一个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的所有数据,好比reduceByKey、groupByKey,那就须要把相同key的数据拉取到同一个分区,原分区的数据须要被打乱重组,这个按照必定的规则对数据从新分区的过程就是Shuffle(洗牌)。

Shuffle是链接Map和Reduce之间的桥梁,描述的是数据从Map端到Reduce端的过程。

当增长并行度的时候,额外的shuffle是有利的。例如,数据中有一些文件是不可分割的,那么该大文件对应的分区就会有大量的记录,而不是说将数据分散到尽量多的分区内部来使用全部已经申请cpu。在这种状况下,使用Reparition从新产生更多的分区数,以知足后面转换算子所需的并行度,这会提高很大性能。

分析coalesce函数源码

Shuffle = true

数据进行基于新的分区数量进行hash计算, 随机选择输出分区,将输入分区的数据

输出到输出分区中。

Shuffle = false

分析CoalescedRDD源码的getPartitions方法

PartitionCoalescer的做用是:

1) 保证CoalescedRDD的每一个分区基本上对应于它Parent RDD分区的个数相同;

2) CoalescedRDD的每一个分区,尽可能跟它的Parent RDD的本地性形同。好比说CoalescedRDD的分区1对应于它的Parent RDD的1到10这10个分区,可是1到7这7个分区在节点1.1.1.1上;那么 CoalescedRDD的分区1所要执行的节点就是1.1.1.1。这么作的目的是为了减小节点间的数据通讯,提高处理能力;

3) CoalescedRDD的分区尽可能分配到不一样的节点执行;具体实现可参考DefaultPartitionCoalescer类。

下面以一个例子来分析Repartition和Coalesce。

假设源RDD有N个分区,须要从新划分红M个分区

一、Repartition实现:

若是N<M

通常状况下N个分区有数据分布不均匀的情况,利用HashPartitioner函数将数据从新分区为M个,这时须要将shuffle设置为true;

二、Coalesce实现:

若是N>M

1) N和M相差很少,(假如N是1000,M是100)那么就能够将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时能够将shuffle设置为false;

2) 若是M>N时,coalesce是无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系,没法使文件数(partiton)变多。总之若是shuffle为false时,若是传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不通过shuffle,是没法将RDD的分区数变多的;

3) 若是N>M而且二者相差悬殊,这时要看executor数与要生成的partition关系,若是executor数 <= 要生成partition数,coalesce效率高,反之若是用coalesce会致使(executor数-要生成partiton数)个excutor空跑从而下降效率。

结语

Spark做为当前用户使用最普遍的大数据计算引擎之一,在数据分析师中有着普遍的应用,以其处理速度快著称,经过上述的分析,咱们可以以合理的计算资源,包括CPU, 内存,executor来执行计算任务,使得咱们的集群更高效,在相同的计算资源场景下能得到更多的任务产出。

相关文章
相关标签/搜索