分为几个部分:
开发调优、资源调优、数据倾斜调优、shuffle调优html
主要包括这几个方面 RDD lineage设计、算子的合理使用、特殊操做的优化等java
shuffle过程当中,各个节点上的相同key都会先写入本地磁盘文件中,而后其余节点须要经过网络传输拉取各个节点上的磁盘文件中的相同key。并且相同key都拉取到同一个节点进行聚合操做时,还有可能会由于一个节点上处理的key过多,致使内存不够存放,进而溢写到磁盘文件中。所以在shuffle过程当中,可能会发生大量的磁盘文件读写的IO操做,以及数据的网络传输操做。磁盘IO和网络数据传输也是shuffle性能较差的主要缘由。数据库
使用map-side预聚合的shuffle操做
所谓的map-side预聚合,说的是在每一个节点本地对相同的key进行一次聚合操做,相似于MapReduce中的本地combiner。map-side预聚合以后,每一个节点本地就只会有一条相同的key,由于多条相同的key都被聚合起来了。其余节点在拉取全部节点上的相同key时,就会大大减小须要拉取的数据数量,从而也就减小了磁盘IO以及网络传输开销。数组
使用reduceByKey/aggregateByKey替代groupByKey
由于reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每一个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来讲比较差。网络
使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition全部的数据,而不是一次函数调用处理一条,性能相对来讲会高一些。可是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。由于单次函数调用就要处理掉一个partition全部的数据,若是内存不够,垃圾回收时是没法回收掉太多对象的,极可能出现OOM异常。因此使用这类操做时要慎重!数据结构
使用foreachPartitions替代foreach
原理相似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的全部数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提高仍是颇有帮助的。好比在foreach函数中,将RDD中全部数据写MySQL,那么若是是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会建立一个数据库链接,此时就势必会频繁地建立和销毁数据库链接,性能是很是低下;可是若是用foreachPartitions算子一次性处理一个partition的数据,那么对于每一个partition,只要建立一个数据库链接便可,而后执行批量插入操做,此时性能是比较高的。多线程
使用repartitionAndSortWithinPartitions替代repartition与sort类操做并发
运行时的关键点
Spark是根据shuffle类算子来进行stage的划分。若是咱们的代码中执行了某个shuffle类算子(好比reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。能够大体理解为,shuffle算子执行以前的代码会被划分为一个stage,shuffle算子执行以及以后的代码会被划分为下一个stage。所以一个stage刚开始执行的时候,它的每一个task可能都会从上一个stage的task所在的节点,去经过网络传输拉取须要本身处理的全部key,而后对拉取到的全部相同的key使用咱们本身编写的算子函数执行聚合操做(好比reduceByKey()算子接收的函数)。这个过程就是shuffle。ide
所以Executor的内存主要分为三块:
第一块是让task执行咱们本身编写的代码时使用,默认是占Executor总内存的20%;
第二块是让task经过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操做时使用,默认也是占Executor总内存的20%;
第三块是让RDD持久化时使用,默认占Executor总内存的60%。函数
task的执行速度是跟每一个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每一个Executor进程上分配到的多个task,都是以每一个task一条线程的方式,多线程并发运行的。若是CPU core数量比较充足,并且分配到的task数量比较合理,那么一般来讲,能够比较快速和高效地执行完这些task线程。
了解完了Spark做业运行的基本原理以后,对资源相关的参数就容易理解了。
num-executors
设置Spark做业总共要用多少个Executor进程来执行。建议:50~100个左右的Executor进程
executor-memory
设置每一个Executor进程的内存。检建议:每一个Executor进程的内存设置4G~8G较为合适,num-executors乘以executor-memory,是不能超过队列的最大内存量的,通常1/3~1/2
executor-cores
设置每一个Executor进程的CPU core数量。建议:Executor的CPU core数量设置为2~4个较为合适
driver-memory
要使用collect算子将RDD的数据所有拉取到Driver上进行处理,那么必须确保Driver的内存足够大,不然会出现OOM内存溢出的问题
spark.default.parallelism
==该参数用于设置每一个stage的默认task数量。这个参数极为重要,若是不设置可能会直接影响你的Spark做业性能。==
Spark做业的默认task数量为500~1000个较为合适。若是不设置这个参数,那么此时就会致使Spark本身根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。一般来讲,Spark默认设置的数量是偏少的(好比就几十个task),若是task数量偏少的话,就会致使你前面设置好的Executor的参数都前功尽弃。试想一下,不管你的Executor进程有多少个,内存和CPU core有多大,可是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!所以Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,好比Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充分地利用Spark集群的资源。
spark.storage.memoryFraction
设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。当操做中有较多的RDD持久化操做,该参数的值能够适当提升一些,少的话就相对少一点
spark.shuffle.memoryFraction
设置shuffle过程当中一个task拉取到上个stage的task的输出后,进行聚合操做时可以使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操做。shuffle操做在进行聚合时,若是发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地下降性能。
如下是一份spark-submit命令的示例,你们能够参考一下,并根据本身的实际状况进行调节: ./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
数据倾斜发生的原理:
在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,好比按照key进行聚合或join等操做。此时若是某个key对应的数据量特别大的话,就会发生数据倾斜。好比大部分key对应10条数据,可是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,而后1秒钟就运行完了;可是个别task可能分配到了100万数据,要运行一两个小时。所以,整个Spark做业的运行进度是由运行时间最长的那个task决定的。
数据倾斜只会发生在shuffle过程当中。这里给你们罗列一些经常使用的而且可能会触发shuffle操做的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所致使的。
咱们均可以在Spark Web UI上深刻看一下当前这个stage各个task分配的数据量,从而进一步肯定是否是task分配的数据不均匀致使了数据倾斜
增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了
该方案一般没法完全解决数据倾斜,由于若是出现一些极端状况,好比某个key对应的数据量有100万,那么不管你的task数量增长到多少,这个对应着100万数据的key确定仍是会分配到一个task中去处理,所以注定仍是会发生数据倾斜的。
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个key都打上一个随机数,好比10之内的随机数,此时原先同样的key就变成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。而后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操做,就能够获得最终结果了,==仅仅适用于聚合类的shuffle操做==。
普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。具体原理以下图所示
适用场景较少,由于这个方案只适用于一个大表和一个小表的状况。
对于join致使的数据倾斜,若是只是某几个key致使了倾斜,能够将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了
==若是致使倾斜的key特别多的话,好比成千上万个key都致使数据倾斜,那么这种方式也不适合==
将原先同样的key经过附加随机前缀变成不同的key,而后就能够将这些处理后的“不一样key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案上一个的不一样之处就在于,上一种方案是尽可能只对少数倾斜key对应的数据进行特殊处理,因为处理过程须要扩容RDD,所以上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的状况,无法将部分key拆分出来进行单独处理,所以只能对整个RDD进行数据扩容,对内存资源要求很高。
大多数Spark做业的性能主要就是消耗在了shuffle环节,由于该环节包含了大量的磁盘IO、序列化、网络数据传输等操做。所以,若是要让做业的性能更上一层楼,就有必要对shuffle过程进行调优。
负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得愈来愈先进。
在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。
所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。
shuffle write阶段
一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子,而将每一个task处理的数据按key进行“分类”,将相同key都写入(经过一个内存缓冲buffer)同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。
shuffle read
一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取write阶段建立的属于本身的那一个磁盘文件到本身所在的节点上,而后进行key的聚合或链接等操做。
spark.shuffle.consolidateFiles默认为false,若是设置为ture开启优化机制。 write过程当中,task就不是为下游stage的每一个task建立一个磁盘文件了。==此时会出现shuffleFileGroup的概念==,每一个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件。
在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。
在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件
会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,这就是merge过程,此时会将以前全部临时磁盘文件中的数据读取出来,而后依次写入最终的磁盘文件之中。
SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量。好比第一个stage有50个task,总共有10个Executor,每一个Executor执行5个task,而第二个stage有100个task。因为每一个task最终只有一个磁盘文件,所以此时每一个Executor上只有5个磁盘文件,全部Executor只有50个磁盘文件。
bypass运行机制
下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件以下:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
不是聚合类的shuffle算子(好比reduceByKey)。
此时task会为每一个下游task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不一样在于:第一,磁盘写机制不一样;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。
spark.shuffle.file.buffer
默认32K ,shuffle write task缓冲大小,适当跳大(64K)能够减小shuffle write过程当中溢写磁盘文件的次数
spark.reducer.maxSizeInFlight
默认48M,shuffle read task的buffer缓冲大小
spark.shuffle.io.maxRetries
默认3,huffle read task从shuffle write task拉取数据若是失败的重试次数,数据量特别大的时候能够增长,调节该参数能够大幅度提高稳定性。
spark.shuffle.io.retryWait
默认5S,每次重试拉取数据的等待间隔,增大(60S),提升稳定性
spark.shuffle.memoryFraction
默认0.2,Executor内存中,分配给shuffle read task进行聚合操做的内存比例
spark.shuffle.manager
设置ShuffleManager的类型,可选项:hash、sort和tungsten-sort。业务若是须要排序,就用sort,不须要排序就用bypass机制,避免排序。
spark.shuffle.sort.bypassMergeThreshold
默认200,当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做
spark.shuffle.consolidateFiles
默认false,设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件
原本绝大部分观点来自美团点评(https://tech.meituan.com/spark-tuning-basic.html),通过本身的实践发如今开发,资源,数据倾斜方面的调优能够有着明显的效果,而shuffle调优的结果并不明显。因此你们在调优的过程当中必定要注意优先级。