Spark调优指南

Spark相关问题java

  1. SparkMR快的缘由?

1) Spark的计算结果能够放入内存,支持基于内存的迭代,MR不支持。node

2) SparkDAG有向无环图,能够实现pipeline的计算模式。mysql

3) 资源调度模式:Spark粗粒度资源调度,MR是细粒度资源调度。算法

资源复用:Spark中的task能够复用同一批Executor的资源。sql

MR里面每个map task对应一个jvm,不能复用资源。数据库

  1. Spark中主要进程的做用?

Driver进程:负责任务的分发和结果的回收。apache

Executor进程:负责具体任务的执行。数组

Master进程:Spark资源管理的主进程,负责资源调度。缓存

Worker进程:Spark资源管理的从进程,woker节点主要运行Executor安全

  1. Spark调优

1. 资源调优

1) .搭建Spark集群的时候要给Spark集群足够的资源(core,memory)

spark安装包的conf下spark-env.sh

SPARK_WORKER_CORES

SPARK_WORKER_MEMORY

SPARK_WORKER_INSTANCE

2) .在提交Application的时候给Application分配更多的资源。

提交命令选项:(在提交Application的时候使用选项)

--executor-cores

--executor-memory

--total-executor-cores

配置信息:(在Application的代码中设置

Spark-default.conf中设置)

spark.executor.cores

spark.executor.memory

spark.max.cores

  1. 并行度调优

原则:一个core通常分配2~3个task,每个task通常处理1G数据task的复杂度相似wc)

提升并行度的方式:

1) .若是读取的数据在HDFS上,下降block块的大小

2) .sc.textFile(path,numPartitions)

3) sc.parallelize(list,numPartitions) 通常用于测试

4) coalesce、repartition能够提升RDD的分区数。

5) 配置信息:

spark.default.parallelism  not set (默认executor core的总个数)

spark.sql.shuffle.partitions 200

6) 自定义分区器

  1. 代码调优
    1. 避免建立重复的RDD,复用同一个RDD
    2. 对屡次使用的RDD进行持久化

如何选择一种最合适的持久化策略?

默认状况下,性能最高的固然是MEMORY_ONLY,但前提是你的内存必须足够足够大,能够绰绰有余地存放下整个RDD的全部数据。由于不进行序列化与反序列化操做,就避免了这部分的性能开销;对这个RDD的后续算子操做,都是基于纯内存中的数据的操做,不须要从磁盘文件中读取数据,性能也很高;并且不须要复制一份数据副本,并远程传送到其余节点上。可是这里必需要注意的是,在实际的生产环境中,恐怕可以直接用这种策略的场景仍是有限的,若是RDD中数据比较多时(好比几十亿),直接用这种持久化级别,会致使JVM的OOM内存溢出异常。

若是使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每一个partition仅仅是一个字节数组而已,大大减小了对象数量,并下降了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。可是后续算子能够基于纯内存进行操做,所以性能整体仍是比较高的。此外,可能发生的问题同上,若是RDD中的数据量过多的话,仍是可能会致使OOM内存溢出的异常。

若是纯内存的级别都没法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由于既然到了这一步,就说明RDD的数据量很大,内存没法彻底放下。序列化后的数据比较少,能够节省内存和磁盘的空间开销。同时该策略会优先尽可能尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

一般不建议使用DISK_ONLY和后缀为_2的级别:由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,有时还不如从新计算一次全部RDD。后缀为_2的级别,必须将全部数据都复制一份副本,并发送到其余节点上,数据复制以及网络传输会致使较大的性能开销,除非是要求做业的高可用性,不然不建议使用。

持久化算子:

cache:

MEMORY_ONLY

persist:

MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK_SER

通常不要选择带有_2的持久化级别。

checkpoint:

① 若是一个RDD的计算时间比较长或者计算起来比较复杂,通常将这个RDD的计算结果保存到HDFS上,这样数据会更加安全。

② 若是一个RDD的依赖关系很是长,也会使用checkpoint,会切断依赖关系,提升容错的效率。

 

  1. 尽可能避免使用shuffle类的算子

使用广播变量来模拟使用join,使用状况:一个RDD比较大,一个RDD比较小。

join算子=广播变量+filter、广播变量+map、广播变量+flatMap

  1. 使用map-side预聚合的shuffle操做

即尽可能使用有combiner的shuffle类算子。

combiner概念:

map端,每个map task计算完毕后进行的局部聚合。

combiner好处:

1) 下降shuffle write写磁盘的数据量。

2) 下降shuffle read拉取数据量的大小。

3) 下降reduce端聚合的次数。

combiner的shuffle类算子:

1) reduceByKey:这个算子在map端是有combiner的,在一些场景中可使用reduceByKey代替groupByKey。

2) aggregateByKey(fun1,func2)

  1. 尽可能使用高性能的算子

使用reduceByKey替代groupByKey

使用mapPartition替代map

使用foreachPartition替代foreach

filter后使用coalesce减小分区数

使用使用repartitionAndSortWithinPartitions替代repartition与sort类操做

使用repartition和coalesce算子操做分区。

  1. 使用广播变量

开发过程当中,会遇到须要在算子函数中使用外部变量的场景(尤为是大变量,好比100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提高性能,函数中使用到外部变量时,默认状况下,Spark会将该变量复制多个副本,经过网络传输到task中,此时每一个task都有一个变量副本。若是变量自己比较大的话(好比100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存致使的频繁GC,都会极大地影响性能。若是使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每一个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,能够大大减小变量副本的数量,从而减小网络传输的性能开销,并减小对Executor内存的占用开销,下降GC的频率。

广播大变量发送方式:Executor一开始并无广播变量,而是task运行须要用到广播变量,会找executor的blockManager要,bloackManager找Driver里面的blockManagerMaster要。

使用广播变量能够大大下降集群中变量的副本数。不使用广播变量,变量的副本数和task数一致。使用广播变量变量的副本和Executor数一致。

 

使用广播变量能够大大的下降集群中变量的副本数。

不使用广播变量:变量的副本数和task数一致。

使用广播变量:变量的副本数与Executor数一致。

广播变量最大能够是多大?

ExecutorMemory*60%*90%*80%    = executorMemory *0.42

 

  1. 使用Kryo优化序列化性能

Spark中,主要有三个地方涉及到了序列化:

1) 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

2) 将自定义的类型做为RDD的泛型类型时(好比JavaRDD<SXT>,SXT是自定义类型),全部自定义类型对象,都会进行序列化。所以这种状况下,也要求自定义的类必须实现Serializable接口。

3) 使用可序列化的持久化策略时(好比MEMORY_ONLY_SER),Spark会将RDD中的每一个partition都序列化成一个大的字节数组。

Kryo序列化器介绍:

Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。

对于这三种出现序列化的地方,咱们均可以经过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。可是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高不少。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之因此默认没有使用Kryo做为序列化类库,是由于Kryo要求最好要注册全部须要进行序列化的自定义类型,所以对于开发者来讲,这种方式比较麻烦。

Spark中使用Kryo:

Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.registerKryoClasses(new Class[]{SpeedSortKey.class})

 

  1. 优化数据结构

java中有三种类型比较消耗内存:

1) 对象,每一个Java对象都有对象头、引用等额外的信息,所以比较占用内存空间。

2) 字符串,每一个字符串内部都有一个字符数组以及长度等额外信息。

3) 集合类型,好比HashMap、LinkedList等,由于集合类型内部一般会使用一些内部类来封装集合元素,好比Map.Entry。

所以Spark官方建议,Spark编码实现中,特别是对于算子函数中的代码,尽可能不要使用上述三种数据结构,尽可能使用字符串替代对象,使用原始类型(好比Int、Long)替代字符串,使用数组替代集合类型,这样尽量地减小内存占用,从而下降GC频率,提高性能。

  1. 使用高性能的库fastutil

fasteutil介绍:

fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;fastutil可以提供更小的内存占用,更快的存取速度;咱们使用fastutil提供的集合类,来替代本身平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,能够减少内存的占用,而且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度。fastutil的每一种集合类型,都实现了对应的Java中的标准接口(好比fastutil的map,实现了Java的Map接口),所以能够直接放入已有系统的任何代码中。

fastutil最新版本要求Java 7以及以上版本。

使用:

见RandomExtractCars.java类

  1. 数据本地化
    1. 数据本地化的级别:

1) PROCESS_LOCAL

task要计算的数据在本进程(Executor)的内存中。

 

2) NODE_LOCAL

① task所计算的数据在本节点所在的磁盘上。

② task所计算的数据在本节点其余Executor进程的内存中。

 

3) NO_PREF

task所计算的数据在关系型数据库中,如mysql。

 

4) RACK_LOCAL

task所计算的数据在同机架的不一样节点的磁盘或者Executor进程的内存中

 

5) ANY

跨机架。

  1. Spark数据本地化调优:

 

Spark中任务调度时,TaskScheduler在分发以前须要依据数据的位置来分发,最好将task分发到数据所在的节点上,若是TaskScheduler分发的task在默认3s依然没法执行的话,TaskScheduler会从新发送这个task到相同的Executor中去执行,会重试5次,若是依然没法执行,那么TaskScheduler会下降一级数据本地化的级别再次发送task。

如上图中,会先尝试1,PROCESS_LOCAL数据本地化级别,若是重试5次每次等待3s,会默认这个Executor计算资源满了,那么会下降一级数据本地化级别到2,NODE_LOCAL,若是仍是重试5次每次等待3s仍是失败,那么仍是会下降一级数据本地化级别到3,RACK_LOCAL。这样数据就会有网络传输,下降了执行效率。

1) 如何提升数据本地化的级别?

能够增长每次发送task的等待时间(默认都是3s),将3s倍数调大, 结合WEBUI来调节:

• spark.locality.wait  

• spark.locality.wait.process

• spark.locality.wait.node

• spark.locality.wait.rack

注意:等待时间不能调大很大,调整数据本地化的级别不要本末倒置,虽然每个task的本地化级别是最高了,但整个Application的执行时间反而加长。

2) 如何查看数据本地化的级别?

经过日志或者WEBUI

 

  1. Spark Shuffle调优
    1. SparkShuffle

spark1.x 中有 两种类型的shuffle (hashShuffleManager  另一个是sortShuffleManager)

spark2.x之后  只有一种shuffle 机制  SortShuffle  管理器叫作SortShuffleManager  

  1. SparkShuffle概念

reduceByKey会将上一个RDD中的每个key对应的全部value聚合成一个value,而后生成一个新的RDD,元素类型是<key,value>对的形式,这样每个key对应一个聚合起来的value。

问题:聚合以前,每个key对应的value不必定都是在一个partition中,也不太可能在同一个节点上,由于RDD是分布式的弹性的数据集,RDD的partition极有可能分布在各个节点上。

如何聚合?

– Shuffle Write:上一个stage的每一个map task就必须保证将本身处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不一样的分区文件中。

 – Shuffle Read:reduce task就会从上一个stage的全部task所在的机器上寻找属于己的那些分区文件,这样就能够保证每个key所对应的value都会汇聚到同一个节点上去处理和聚合。

Spark中有两种Shuffle管理类型,HashShufflManager和SortShuffleManager,Spark1.2以前是HashShuffleManager, Spark1.2引入SortShuffleManager,Spark 2.0+版本中已经将HashShuffleManager丢弃。

  1. HashShuffleManager

1) 普通机制

  • 普通机制示意图

 

  • 执行流程

a) 每个map task将不一样结果写到不一样的buffer中,每一个buffer的大小为32K。buffer起到数据缓存的做用。

b) 每一个buffer文件最后对应一个磁盘小文件。

c) reduce task来拉取对应的磁盘小文件。

  • 总结

① .map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。

② .产生的磁盘小文件的个数:

M(map task的个数)*R(reduce task的个数)

  • 存在的问题

产生的磁盘小文件过多,会致使如下问题:

a) Shuffle Write过程当中会产生不少写磁盘小文件的对象。

b) Shuffle Read过程当中会产生不少读取磁盘小文件的对象。

c) JVM堆内存中对象过多会形成频繁的gc,gc还没法解决运行所须要的内存 的话,就会OOM。

d) 在数据传输过程当中会有频繁的网络通讯,频繁的网络通讯出现通讯故障的可能性大大增长,一旦网络通讯出现了故障会致使shuffle file cannot find 因为这个错误致使的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。

2) 合并机制(considation机制)

  • 合并机制示意图

 

  • 总结

产生磁盘小文件的个数:C(core的个数)*R(reduce的个数)

若是核数比较多的话  那么产生的小文件个数 是否是也不少啊?

  1. SortShuffleManager

1) 普通机制

  • 普通机制示意图

 

  • 执行流程

a) map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M

b) shuffle的时候会有一个定时器,不按期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,好比如今内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。

c) 若是申请成功不会进行溢写,若是申请不成功,这时候会发生溢写磁盘。

d) 在溢写以前内存结构中的数据会进行排序分区

e) 而后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据,

f) map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。

g) reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

  • 总结

产生磁盘小文件的个数: 2*M(map task的个数)

2) bypass机制

  • bypass机制示意图

 

  • 总结

① .bypass运行机制的触发条件以下:

shuffle reduce task的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。这个值默认是200。

② .产生的磁盘小文件为:2*M(map task的个数)

  1. Shuffle文件寻址

1) MapOutputTracker

MapOutputTracker是Spark架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。

  • MapOutputTrackerMaster是主对象,存在于Driver中。
  • MapOutputTrackerWorker是从对象,存在于Excutor中。

2) BlockManager

BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。

  • BlockManagerMaster,主对象,存在于Driver中。

BlockManagerMaster会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。

  • BlockManagerSlave,从对象,存在于Excutor中。

BlockManagerSlave会与BlockManagerSlave之间通讯。

¬ 不管在Driver端的BlockManager仍是在Excutor端的BlockManager都含有三个对象:

① DiskStore:负责磁盘的管理。

② MemoryStore:负责内存的管理。

③ BlockTransferService:负责数据的传输。

3) Shuffle文件寻址图

 

4) Shuffle文件寻址流程

a) map task执行完成后,会将task的执行状况和磁盘小文件的地址封装到MpStatus对象中,经过MapOutputTrackerWorker对象向Driver中的MapOutputTrackerMaster汇报。

b) 在全部的map task执行完毕后,Driver中就掌握了全部的磁盘小文件的地址。

c) reduce task执行以前,会经过Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster获取磁盘小文件的地址。

d) 获取到磁盘小文件的地址后,会经过BlockManager链接数据所在节点,而后经过BlockTransferService进行数据的传输。

e) BlockTransferService默认启动5个task去节点拉取数据。默认状况下,5个task拉取数据量不能超过48M。

 

 

  1. Shuffle调优
    1. SparkShuffle调优配置项如何使用?

1) 在代码中,不推荐使用,硬编码。

new SparkConf().set(“spark.shuffle.file.buffer”,”64”)

2) 在提交spark任务的时候,推荐使用。

spark-submit --conf spark.shuffle.file.buffer=64 –conf ….

3) conf下的spark-default.conf配置文件中,不推荐,由于是写死后全部应用程序都要用。

  1. buffer大小
  2. shuffle read拉取数据量的大小
  3. shuffle聚合内存的比例
  4. 拉取数据重试次数
  5. 重试间隔时间60s
  6. Spark Shuffle的种类
  7. HashShuffle 合并机制
  8. SortShuffle bypass机制 200次

 

  1. Spark内存管理

Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责建立SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时须要为须要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。

Spark内存管理分为静态内存管理和统一内存管理,Spark1.6以前使用的是静态内存管理,Spark1.6以后引入了统一内存管理。

静态内存管理存储内存、执行内存和其余内存的大小在 Spark 应用程序运行期间均为固定的,但用户能够应用程序启动前进行配置。

统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,能够互相借用对方的空间。

Spark1.6以上版本默认使用的是统一内存管理,能够经过参数spark.memory.useLegacyMode 设置为true(默认为false)使用静态内存管理。

  1. 静态内存管理分布图

 

  1. 统一内存管理分布图

  

  1. reduce 中OOM如何处理?

1) 减小每次拉取的数据量

2) 提升shuffle聚合的内存比例

3) 提升Excutor的总内存

 

  1. 内存调优

 

好比咱们建立对象  先往伊甸园和s1 中放  满了 发生minoGC 此时 会清空 伊甸园和s1 若是还有对象 那么就往s2中放 若是s2放的下  就放在s2中  s2也满了  会发生小型的minoGC 将对象清空

若是还有数据 将数据+1  加到15  会放入到老年代中   

可是 老年代中的对象 都是经常使用的对象  好比数据库链接池等  老年代若是满了 会发生full GC  若是清空后 还不够用   就会发生GC

 

咱们上面讨论的问题  讨论的task 的内存够不够用   

 

JVM堆内存分为一块较大的Eden和两块较小的Survivor,每次只使用Eden和其中一块Survivor,当回收时将Eden和Survivor中还存活着的对象一次性复制到另一块Survivor上,最后清理掉Eden和刚才用过的Survivor。也就是说当task建立出来对象会首先往Eden和survivor1中存放,survivor2是空闲的,当Eden和survivor1区域放满之后就会触发minor gc小型垃圾回收,清理掉再也不使用的对象。会将存活下来的对象放入survivor2中。

若是存活下来的对象大小大于survivor2的大小,那么JVM就会将多余的对象直接放入到老年代中。

若是这个时候年轻代的内存不是很大的话,就会常常的进行minor gc,频繁的minor gc会致使短期内有些存活的对象(屡次垃圾回收都没有回收掉,一直在用的又不能被释放,这种对象每通过一次minor gc都存活下来)频繁的倒来倒去,会致使这些短生命周期的对象(不必定长期使用)每进行一次垃圾回收就会长一岁。年龄过大,默认15岁,垃圾回收仍是没有回收回去就会跑到老年代里面去了。

这样会致使在老年代中存放大量的短生命周期的对象,老年代应该存放的是数量比较少而且会长期使用的对象,好比数据库链接池对象。这样的话,老年代就会满溢(full gc 由于原本老年代中的对象不多,不多进行full gc 所以采起了不太复杂可是消耗性能和时间的垃圾回收算法)。无论minor gc 仍是 full gc都会致使JVM的工做线程中止。

总结-堆内存不足形成的影响:

1) 频繁的minor gc。

2) 老年代中大量的短生命周期的对象会致使full gc。

3) gc 多了就会影响Spark的性能和运行的速度。

Spark  JVM调优主要是下降gc时间,能够修改Executor内存的比例参数。

RDD缓存、task定义运行的算子函数,可能会建立不少对象,这样会占用大量的堆内存。堆内存满了以后会频繁的GC,若是GC还不可以知足内存的须要的话就会报OOM。好比一个task在运行的时候会建立N个对象,这些对象首先要放入到JVM年轻代中。好比在存数据的时候咱们使用了foreach来将数据写入到内存,每条数据都会封装到一个对象中存入数据库中,那么有多少条数据就会在JVM中建立多少个对象。

Spark中如何内存调优?

Spark Executor堆内存中存放(以静态内存管理为例):RDD的缓存数据和广播变量(spark.storage.memoryFraction 0.6),shuffle聚合内存(spark.shuffle.memoryFraction 0.2),task的运行(0.2)那么如何调优呢?

1) 提升Executor整体内存的大小

2) 下降储存内存比例或者下降聚合内存比例

如何查看gc?

Spark WEBUI中job->stage->task

 

 

 

  1. 调节Executor的堆外内存

Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外内存netty是零拷贝),因此使用了堆外内存。默认状况下,这个堆外内存上限默认是每个executor的内存大小的10%;真正处理大数据的时候,这里都会出现问题,致使spark做业反复崩溃,没法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G。

executor在进行shuffle write,优先从本身本地关联的mapOutPutWorker中获取某份数据,若是本地block manager没有的话,那么会经过TransferService,去远程链接其余节点上executor的block manager去获取,尝试创建远程的网络链接,而且去拉取数据。频繁建立对象让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。处于垃圾回过程当中,全部的工做线程所有中止;至关于只要一旦进行垃圾回收,spark / executor中止工做,没法提供响应,spark默认的网络链接的超时时长是60s;若是卡住60s都没法创建链接的话,那么这个task就失败了。task失败了就会出现shuffle file cannot find的错误。

那么如何调节等待的时长呢?

./spark-submit提交任务的脚本里面添加:

--conf spark.core.connection.ack.wait.timeout=300

Executor因为内存不足或者堆外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件Reducer端不可以拉取数据。咱们能够调节堆外内存的大小,如何调节?

./spark-submit提交任务的脚本里面添加

yarn下:

--conf  spark.yarn.executor.memoryOverhead=2048 单位M

standalone下:

--conf  spark.memory.offHeap.size=2048单位M

  1. 解决数据倾斜
    1. 提升shuffle操做的并行度

方案实现思路:

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,好比reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,好比group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲都有点太小。

方案实现原理:

增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个不一样的key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了。

  1. 双重聚合

方案适用场景:

RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个key都打上一个随机数,好比10之内的随机数,此时原先同样的key就变成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。而后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操做,就能够获得最终结果了,好比(hello, 4)。

方案实现原理:

将本来相同的key经过附加随机前缀的方式,变成多个不一样的key,就可让本来被一个task处理的数据分散到多个task上去作局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就能够获得最终的结果。

 

若是一个RDD中有一个key致使数据倾斜,同时还有其余的key,那么通常先对数据集进行抽样,而后找出倾斜的key,再使用filter对原始的RDD进行分离为两个RDD,一个是由倾斜的key组成的RDD1,一个是由其余的key组成的RDD2,那么对于RDD1可使用加随机前缀进行多分区多task计算,对于另外一个RDD2正常聚合计算,最后将结果再合并起来。

  1. reduce join转为map join

BroadCast+filter(或者map)

方案适用场景:

在对RDD使用join类操做,或者是在Spark SQL中使用join语句时,并且join操做中的一个RDD或表的数据量比较小(好比几百M或者一两G),比较适用此方案。

方案实现思路:

不使用join算子进行链接操做,而使用Broadcast变量与map类算子实现join操做,进而彻底规避掉shuffle类的操做,完全避免数据倾斜的发生和出现。将较小RDD中的数据直接经过collect算子拉取到Driver端的内存中来,而后对其建立一个Broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照链接key进行比对,若是链接key相同的话,那么就将两个RDD的数据用你须要的方式链接起来。

方案实现原理:

普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。

  1. 采样倾斜key并分拆join操做

方案适用场景:

两个RDD/Hive表进行join的时候,若是数据量都比较大,没法采用“解决方案五”,那么此时能够看一下两个RDD/Hive表中的key分布状况。若是出现数据倾斜,是由于其中某一个RDD/Hive表中的少数几个key的数据量过大,而另外一个RDD/Hive表中的全部key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

对包含少数几个数据量过大的key的那个RDD,经过sample算子采样出一份样原本,而后统计一下每一个key的数量,计算出来数据量最大的是哪几个key。而后将这几个key对应的数据从原来的RDD中拆分出来,造成一个单独的RDD,并给每一个key都打上n之内的随机数做为前缀,而不会致使倾斜的大部分key造成另一个RDD。接着将须要join的另外一个RDD,也过滤出来那几个倾斜key对应的数据并造成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会致使倾斜的大部分key也造成另一个RDD。再将附加了随机前缀的独立RDD与另外一个膨胀n倍的独立RDD进行join,此时就能够将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join便可。最后将两次join的结果使用union算子合并起来便可,就是最终的join结果 。

 

  1. 使用随机前缀和扩容RDD进行join

方案适用场景:

若是在进行join操做时,RDD中有大量的key致使数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

该方案的实现思路基本和“解决方案六”相似,首先查看RDD/Hive表中的数据分布状况,找到那个形成数据倾斜的RDD/Hive表,好比有多个key都对应了超过1万条数据。而后将该RDD的每条数据都打上一个n之内的随机前缀。同时对另一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join便可。

 

  1. Spark故障解决(troubleshooting)
    1. shuffle file cannot find:磁盘小文件找不到。

1) connection timeout ----shuffle file cannot find

提升创建链接的超时时间,或者下降gc,下降gc了那么spark不能堆外提供服务的时间就少了,那么超时的可能就会下降。

2) fetch data fail  ---- shuffle file cannot find

提升拉取数据的重试次数以及间隔时间。

3) OOM/executor lost ---- shuffle file cannot find

提升堆外内存大小,提升堆内内存大小。

  1. reduce OOM

BlockManager拉取的数据量大,reduce task处理的数据量小

解决方法:

1) 下降每次拉取的数据量

2) 提升shuffle聚合的内存比例

3) 提升Executor的内存比例

  1. 序列化问题
  2. Null值问题

val rdd = rdd.map{x=>{

x+”~”;

}}

rdd.foreach{x=>{

System.out.println(x.getName())

}}

相关文章
相关标签/搜索