Spark技术内幕:Shuffle的性能调优

经过上面的架构和源码实现的分析,不可贵出Shuffle是Spark Core比较复杂的模块的结论。它也是很是影响性能的操做之一。所以,在这里整理了会影响Shuffle性能的各项配置。尽管大部分的配置项在前文已经解释过它的含义,因为这些参数的确是很是重要,这里算是作一个详细的总结。apache

1.1.1  spark.shuffle.manager

前文也屡次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0以前仅支持Hash Based Shuffle。Spark 1.1的时候引入了Sort Based Shuffle。Spark 1.2的默认Shuffle机制从Hash变成了Sort。若是须要Hash Based Shuffle,能够将spark.shuffle.manager设置成“hash”便可。网络

若是对性能有比较苛刻的要求,那么就要理解这两种不一样的Shuffle机制的原理,结合具体的应用场景进行选择。架构

Hash Based Shuffle,就是将数据根据Hash的结果,将各个Reducer partition的数据写到单独的文件中去,写数据时不会有排序的操做。这个问题就是若是Reducer的partition比较多的时候,会产生大量的磁盘文件。这会带来两个问题:ide

1)       同时打开的文件比较多,那么大量的文件句柄和写操做分配的临时内存会很是大,对于内存的使用和GC带来不少的压力。尤为是在Sparkon YARN的模式下,Executor分配的内存广泛比较小的时候,这个问题会更严重。性能

2)       从总体来看,这些文件带来大量的随机读,读性能可能会遇到瓶颈。测试

更加细节的讨论能够参见7.1节和7.6.6(尝试去解决写的文件太多的问题)。大数据

 

Sort Based Shuffle会根据实际状况对数据采用不一样的方式进行Sort。这个排序可能仅仅是按照Reducer的partition进行排序,保证同一个Shuffle Map Task的对应于不一样的Reducer的partition的数据均可以写到同一个数据文件,经过一个Offset来标记不一样的Reducer partition的分界。所以一个Shuffle Map Task仅仅会生成一个数据文件(还有一个index索引文件),从而避免了Hash Based Shuffle文件数量过多的问题。优化

 

选择Hash仍是Sort,取决于内存,排序和文件操做等因素的综合影响。spa

对于不须要进行排序的Shuffle并且Shuffle产生的文件数量不是特别多,Hash Based Shuffle多是个更好的选择;毕竟Sort Based Shuffle至少会按照Reducer的partition进行排序。netty

而Sort BasedShuffle的优点就在于Scalability,它的出现实际上很大程度上是解决Hash Based Shuffle的Scalability的问题。因为Sort Based Shuffle还在不断的演进中,所以Sort Based Shuffle的性能会获得不断的改善。

对选择那种Shuffle,若是对于性能要求苛刻,最好仍是经过实际的场景中测试后再决定。不过选择默认的Sort,能够知足大部分的场景须要。

1.1.2  spark.shuffle.spill

这个参数的默认值是true,用于指定Shuffle过程当中若是内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置),那么是否须要将部分数据临时写入外部存储。若是设置为false,那么这个过程就会一直使用内存,会有Out Of Memory的风险。所以只有在肯定内存足够使用时,才能够将这个选项设置为false。

对于Hash BasedShuffle的Shuffle Write过程当中使用的org.apache.spark.util.collection.AppendOnlyMap就是全内存的方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap对org.apache.spark.util.collection.AppendOnlyMap有了进一步的封装,在内存使用超过阈值时会将它spill到外部存储,在最后的时候会对这些临时文件进行Merge。

而Sort BasedShuffle Write使用到的org.apache.spark.util.collection.ExternalSorter也会有相似的spill。

而对于ShuffleRead,若是须要作aggregate,也可能在aggregate的过程当中将数据spill的外部存储。

1.1.3  spark.shuffle.memoryFraction和spark.shuffle.safetyFraction

在启用spark.shuffle.spill的状况下,spark.shuffle.memoryFraction决定了当Shuffle过程当中使用的内存达到总内存多少比例的时候开始Spill。在Spark 1.2.0里,这个值是0.2。经过这个参数能够设置Shuffle过程占用内存的大小,它直接影响了Spill的频率和GC。

 若是Spill的频率过高,那么能够适当的增长spark.shuffle.memoryFraction来增长Shuffle过程的可用内存数,进而减小Spill的频率。固然为了不OOM(内存溢出),可能就须要减小RDD cache所用的内存,即须要减小spark.storage.memoryFraction的值;可是减小RDD cache所用的内存有可能会带来其余的影响,所以须要综合考量。

在Shuffle过程当中,Shuffle占用的内存数是估计出来的,并非每次新增的数据项都会计算一次占用的内存大小,这样作是为了下降时间开销。可是估计也会有偏差,所以存在实际使用的内存数比估算值要大的状况,所以参数 spark.shuffle.safetyFraction做为一个保险系数下降实际Shuffle过程所须要的内存值,下降实际内存超出用户配置值的风险。

1.1.4  spark.shuffle.sort.bypassMergeThreshold

这个配置的默认值是200,用于设置在Reducer的partition数目少于多少的时候,Sort Based Shuffle内部不使用Merge Sort的方式处理数据,而是直接将每一个partition写入单独的文件。这个方式和Hash Based的方式是相似的,区别就是在最后这些文件仍是会合并成一个单独的文件,并经过一个index索引文件来标记不一样partition的位置信息。从Reducer看来,数据文件和索引文件的格式和内部是否作过Merge Sort是彻底相同的。

这个能够看作SortBased Shuffle在Shuffle量比较小的时候对于Hash Based Shuffle的一种折衷。固然了它和Hash Based Shuffle同样,也存在同时打开文件过多致使内存占用增长的问题。所以若是GC比较严重或者内存比较紧张,能够适当的下降这个值。

1.1.5  spark.shuffle.blockTransferService

在Spark 1.2.0,这个配置的默认值是netty,而以前是nio。这个主要是用于在各个Executor之间传输Shuffle数据。Netty的实现更加简洁,但实际上用户不用太关心这个选项。除非是有特殊的需求,不然采用默认配置就能够。

1.1.6  spark.shuffle.consolidateFiles

这个配置的默认配置是false。主要是为了解决在Hash Based Shuffle过程当中产生过多文件的问题。若是配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会新产生一个Shuffle文件而是重用原来的。可是每一个Shuffle Map Task仍是须要产生下游Task数量的文件,所以它并无减小同时打开文件的数量。若是须要了解更加详细的细节,能够阅读7.1节。

可是consolidateFiles的机制在Spark 0.8.1就引入了,到Spark 1.2.0仍是没有稳定下来。从源码实现的角度看,实现源码是很是简单的,可是因为涉及本地的文件系统等限制,这个策略可能会带来各类各样的问题。因为它并无减小同时打开文件的数量,所以不能减小由文件句柄带来的内存消耗。若是面临Shuffle的文件数量很是大,那么是否打开这个选项最好仍是经过实际测试后再决定。

 

1.1.7  spark.shuffle.service.enabled

(false)

1.1.8  spark.shuffle.compress和 spark.shuffle.spill.compress

 这两个参数的默认配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用来设置Shuffle过程当中是否对Shuffle数据进行压缩;其中前者针对最终写入本地文件系统的输出文件,后者针对在处理过程须要spill到外部存储的中间数据,后者针对最终的shuffle输出文件。

 如何设置spark.shuffle.compress?

若是下游的Task经过网络获取上游Shuffle Map Task的结果的网络IO成为瓶颈,那么就须要考虑将它设置为true:经过压缩数据来减小网络IO。因为上游Shuffle Map Task和下游的Task现阶段是不会并行处理的,即上游Shuffle Map Task处理完成,而后下游的Task才会开始执行。所以若是须要压缩的时间消耗就是Shuffle MapTask压缩数据的时间 + 网络传输的时间 + 下游Task解压的时间;而不须要压缩的时间消耗仅仅是网络传输的时间。所以须要评估压缩解压时间带来的时间消耗和由于数据压缩带来的时间节省。若是网络成为瓶颈,好比集群广泛使用的是千兆网络,那么可能将这个选项设置为true是合理的;若是计算是CPU密集型的,那么可能将这个选项设置为false才更好。

 如何设置spark.shuffle.spill.compress?

若是设置为true,表明处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。所以要综合考虑CPU因为引入压缩解压的消耗时间和Disk IO由于压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下,这个被设置为true可能比较合适;若是本地硬盘是SSD,那么这个设置为false可能比较合适。

1.1.9  spark.reducer.maxMbInFlight

这个参数用于限制一个ReducerTask向其余的Executor请求Shuffle数据时所占用的最大内存数,尤为是若是网卡是千兆和千兆如下的网卡时。默认值是48MB。设置这个值须要中和考虑网卡带宽和内存。

Spark性能调优之Shuffle调优

   • Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外内存(netty是零拷贝),因此使用了堆外内存

 

    shuffle过程当中常出现的问题

常见问题一:reduce oom?

    问题缘由:

       reduce task 去map端获取数据,reduce一边拉取数据一边聚合,reduce端有一块聚合内存(executor memory * 0.2),也就是这块内存不够

    解决办法:

                          1.增长reduce 聚合操做的内存的比例  

                          2.增长Executor memory的大小  --executor-memory 5G

                          3.减小reduce task每次拉取的数据量 设置spak.reducer.maxSizeInFlight 24m, 拉取的次数就多了,所以创建链接的次数增多,有可能会链接不上(正好遇上map task端进行GC)

 

常见问题二:错误描述--shuffle file cannot find   or   executor lost

   • 什

么时候须要调节Executor的堆外内存大小?

       • shuffle file cannot find (DAGScheduler,resubmitting task)

       • executor lost

       • task lost

       • out of memory

    问题缘由:

        1.map task所运行的executor内存不足,致使executor

挂掉了,executor里面的BlockManager就挂掉了,致使ConnectionManager不能用,也就没法创建链接,从而不能拉取数据

        2.executor并无挂掉

            2.1 BlockManage之间的链接失败(map task所运行的executor正在GC)

            2.2创建链接成功,map task所运行的executor正在GC

        3.reduce task向Driver中的MapOutputTracker获取shuffle file位置的时候出现了问题

    解决办法:

        1.增大Executor内存(即堆内内存) ,申请的堆外内存也会随之增长--executor-memory 5G

        2.增大堆外内存 --conf spark.yarn.executor.memoryoverhead 2048M

   --conf spark.executor.memoryoverhead 2048M

 (默认申请的堆外内存是Executor内存的10%,真正处理大数据的时候,这里都会出现问题,致使spark做业反复崩溃,没法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G)

)

 

buffer 32k    //缓冲区默认大小为32k  SparkConf.set("spark.shuffle.file.buffer","64k")

reduce 48M //reduce端拉取数据的时候,默认大小是48M 

SparkConf.set("spark.reducer.maxSizeInFlight","96M")

    

spark.shuffle.file.buffer

默认值:32k

参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。

调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比64k),从而减小shuffle write过程当中溢写磁盘文件的次数,也就能够减小磁盘IO次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

 

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。

调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

错误:reduce oom

reduce task去map拉数据,reduce 一边拉数据一边聚合   reduce段有一块聚合内存(executor memory * 0.2)

解决办法:一、增长reduce 聚合的内存的比例  设置spark.shuffle.memoryFraction

二、 增长executor memory的大小  --executor-memory 5G

三、减小reduce task每次拉取的数据量  设置spark.reducer.maxSizeInFlight  24m

 

spark.shuffle.io.maxRetries

默认值:3

参数说明:shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。

调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。

shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage

 

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。

调优建议:建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。

 

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数表明了Executor内存中,分配给shuffle read task进行聚合操做的内存比例,默认是20%。

调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给shuffle read的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高10%左右。

 

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug。

 

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。

调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。

 

spark.shuffle.consolidateFiles

默认值:false

参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。

调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

相关文章
相关标签/搜索