Spark性能优化指南—— shuffle调优

大多数Spark做业的性能主要就是消耗在了shuffle环节,由于该环节包含了大量的磁盘IO、序列化、网络数据传输等操做。所以,若是要让做业的性能更上一层楼,就有必要对shuffle过程进行调优。可是也必须提醒你们的是,影响一个Spark做业性能的因素,主要仍是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。所以你们务必把握住调优的基本原则,千万不要舍本逐末。下面咱们就给你们详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。算法

#ShuffleManager发展概述
在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得愈来愈先进。网络

在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。数据结构

所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。ide

下面咱们详细分析一下HashShuffleManager和SortShuffleManager的原理。性能

#HashShuffleManager运行原理大数据

##未经优化的HashShuffleManager优化

下图说明了未经优化的HashShuffleManager的原理。这里咱们先明确一个假设前提:每一个Executor只有1个CPU core,也就是说,不管这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。spa

咱们先从shuffle write开始提及。shuffle write阶段,主要就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。线程

那么每一个执行shuffle write的task,要为下一个stage建立多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每一个task就要建立多少份磁盘文件。好比下一个stage总共有100个task,那么当前stage的每一个task都要建立100份磁盘文件。若是当前stage有50个task,总共有10个Executor,每一个Executor执行5个Task,那么每一个Executor上总共就要建立500个磁盘文件,全部Executor上会建立5000个磁盘文件。因而可知,未经优化的shuffle write操做所产生的磁盘文件的数量是极其惊人的。排序

接着咱们来讲说shuffle read。shuffle read,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,task给下游stage的每一个task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个task只要从上游stage的全部task所在节点上,拉取属于本身的那一个磁盘文件便可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果。 输入图片说明

##优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指咱们能够设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true便可开启优化机制。一般来讲,若是咱们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制以后,在shuffle write过程当中,task就不是为下游stage的每一个task建立一个磁盘文件了。此时会出现shuffleFileGroup的概念,每一个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。所以,consolidate机制容许不一样的task复用同一批磁盘文件,这样就能够有效将多个task的磁盘文件进行必定程度上的合并,从而大幅度减小磁盘文件的数量,进而提高shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共仍是有10个Executor,每一个Executor执行5个task。那么本来使用未经优化的HashShuffleManager时,每一个Executor会产生500个磁盘文件,全部Executor会产生5000个磁盘文件的。可是此时通过优化以后,每一个Executor建立的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每一个Executor此时只会建立100个磁盘文件,全部Executor只会建立1000个磁盘文件。 输入图片说明

#SortShuffleManager运行原理

SortShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

##普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。

在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是经过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢以后再一次写入磁盘文件中,这样能够减小磁盘IO次数,提高性能。

一个task将全部数据写入内存数据结构的过程当中,会发生屡次磁盘溢写操做,也就会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,这就是merge过程,此时会将以前全部临时磁盘文件中的数据读取出来,而后依次写入最终的磁盘文件之中。此外,因为一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,所以还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量。好比第一个stage有50个task,总共有10个Executor,每一个Executor执行5个task,而第二个stage有100个task。因为每一个task最终只有一个磁盘文件,所以此时每一个Executor上只有5个磁盘文件,全部Executor只有50个磁盘文件。
输入图片说明

##bypass运行机制 下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件以下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(好比reduceByKey)。

此时task会为每一个下游task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不一样在于:第一,磁盘写机制不一样;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。
输入图片说明

#shuffle相关参数调优
如下是Shffule过程当中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。
spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。
  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比64k),从而减小shuffle write过程当中溢写磁盘文件的次数,也就能够减小磁盘IO次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。
  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数表明了Executor内存中,分配给shuffle read task进行聚合操做的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给shuffle read的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。
  • 调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。