Spark Shuffle

1. Shuffle相关

当Map的输出结果要被Reduce使用时,输出结果须要按key哈希,而且分发到每个Reducer上去,这个过程就是shuffle。因为shuffle涉及到了磁盘的读写和网络的传输,所以shuffle性能的高低直接影响到了整个程序的运行效率。概念上shuffle就是一个沟通数据链接(map和reduce)的桥梁。每一个ReduceTask从每一个Map Task产生数的据中读取一片数据,极限状况下可能触发M*R个数据拷贝通道(M是MapTask数目,R是Reduce Task数目)。算法

在Spark1.1以前,其shuffle只存在一种模式,即hash base。在Spark1.1版本以后加入了sort base。Spark1.1默认采用的shuffle模式仍是hash base。在Spark1.2中,sort base将做为默认模式。固然,你能够经过shuffle manager进行配置。apache

2. Spark shuffle流程

·        首先每个Mapper会根据Reducer的数量建立出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。网络

·        其次Mapper产生的结果会根据设置的partition算法填充到每一个bucket中去。这里的partition算法是能够自定义的,固然默认的算法是根据key哈希到不一样的bucket中去。数据结构

·        当Reducer启动时,它会根据本身task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket做为Reducer的输入进行处理。并发

这里的bucket是一个抽象概念,在实现中每一个bucket能够对应一个文件,能够对应文件的一部分或是其余等。app

一般shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每一个MapTask输出的数据分片数目,有多种方式存放这些数据分片:框架

1) 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上);ide

2) 每一个分片一个文件(如今Spark采用的方式,若干年前MapReduce采用的方式),或者全部分片放到一个数据文件中,外加一个索引文件记录每一个分片在数据文件中的偏移量(如今MapReduce采用的方式)。性能

在Map端,不一样的数据存放方式各有优缺点和适用场景。通常而言,shuffle在Map端的数据要存储到磁盘上,以防止容错触发重算带来的庞大开销(若是保存到Reduce端内存中,一旦Reduce Task挂掉了,全部Map Task须要重算)。但数据在磁盘上存放方式有多种可选方案,在MapReduce前期设计中,采用了如今Spark的方案(目前一直在改进),每一个Map Task为每一个Reduce Task产生一个文件,该文件只保存特定Reduce Task需处理的数据,这样会产生M*R个文件,若是M和R很是庞大,好比均为1000,则会产生100w个文件,产生和读取这些文件会产生大量的随机IO,效率很是低下。解决这个问题的一种直观方法是减小文件数目,经常使用的方法有:1) 将一个节点上全部Map产生的文件合并成一个大文件(MapReduce如今采用的方案),2) 每一个节点产生{(slot数目)*R}个文件(Spark优化后的方案)。对后面这种方案简单解释一下:不论是MapReduce 1.0仍是Spark,每一个节点的资源会被抽象成若干个slot,因为一个Task占用一个slot,所以slot数目可当作是最多同时运行的Task数目。若是一个Job的Task数目很是多,限于slot数目有限,可能须要运行若干轮。这样,只须要由第一轮产生{(slot数目)*R}个文件,后续几轮产生的数据追加到这些文件末尾便可。所以,后一种方案可减小大做业产生的文件数目。优化

在Reduce端,各个Task会并发启动多个线程同时从多个Map Task端拉取数据。因为Reduce阶段的主要任务是对数据进行按组规约。也就是说,须要将数据分红若干组,以便以组为单位进行处理。你们知道,分组的方式很是多,常见的有:Map/HashTable(key相同的,放到同一个value list中)和Sort(按key进行排序,key相同的一组,经排序后会挨在一块儿),这两种方式各有优缺点,第一种复杂度低,效率高,可是须要将数据所有放到内存中,第二种方案复杂度高,但可以借助磁盘(外部排序)处理庞大的数据集。Spark前期采用了第一种方案,而在最新的版本中加入了第二种方案, MapReduce则从一开始就选用了基于sort的方案。

3. shuffle分析


3.1 shuffle写


Spark中须要Shuffle输出的ShuffleMapTask会为每一个ResultTask建立对应的Bucket,ShuffleMapTask产生的结果会根据设置的partitioner获得对应的BucketId,而后填充到相应的Bucket中区。每一个ShuffleMapTask的输出结果可能包含全部的ResultTask所须要的数据,因此每一个ShuffleMapTask建立Bucket的数目是和ResultTask的数目相等。

ShuffleMapTask建立的Bucket对应磁盘上的一个文件,用于存储结果,此文件也被称为BlockFile。经过属性spark.shuffle.file.buffer.kb配置的缓冲区就是用来建立FastBufferedOutputStream输出流的。若是在配置文件中设置了属性spark.shuffle.consolidateFiles为true的话,ShuffleMapTask所产生的Bucket就不必定单独对应一个文件了,而是对应文件的一部分,这样作会大量减小产生的BlockFile文件数量。

ShuffleMapTask在某个节点上第一次执行时,会被每一个ResultTask建立一个输出文件,并把这些文件组织成ShuffleFileGroup,当这个ShuffleMapTask执行结束后,当前建立的ShuffleFileGroup能够被释放掉,进行循环使用,当又有ShuffleMapTask在这个节点执行时,不须要建立新的输出文件,而是在上次的ShuffleFileGroup中已经建立的文件里追加写一个Segment;若是当前的ShuffleMapTask还没执行完,此时又在此节点上启动了新的ShuffleMapTask,那么新的ShuffleMapTask只能又建立新的输出文件再组成一个ShuffleFileGroup来进行结果输出。

3.2 shuffle读

前面ShuffleMapTask写结果,如今轮到ResultTask去读那些数据了。Spark可使用两种方式来读取数据,一种是普通的Socket方式,一种是使用Netty框架。使用Netty方式的话,能够经过配置属性spark.shuffle.use.netty为true来启动。

ResultTask读数据时,会经过BlockManager根据BlockID把相关的数据返回给ResultTask。若是使用是Netty框架,BlockManaget会建立ShuffleSender专门用于发送数据。若是ResultTask所须要的数据刚好在本节点,那就直接去磁盘上读便可,不在经过网络获取,这点比MapReduce作得更好,MapReduce取数据时,即便数据在本地仍是要走一遍网络传输。

Spark默认的Shuffle过程当中的数据都没有通过排序(Hash模式),这一点也要比MapReduce框架节省不少时间。ResultTask读取过来的数据首先存放到HashMap中,若是数据量比较小,占用内存空间不会太大,若是数据量比较大,那就须要较多内存,内存不足该如何解决?

Spark提供了两种方式,根据spark.shuffle.spill的设置,当内存不够时,直接就失败。若是设置了能够Spill到磁盘,那就把内存中的数据溢写到磁盘中。写到磁盘前,先把内存中的HashMap排序,而且把内存缓冲区中的数据排序以后和写到磁盘上文件数据组成一个最小堆,每次从最小堆中读取最小的数据。


4. sort与hash模式

用来配置所使用的shuffle manager,目前可用的有:

org.apache.spark.shuffle.sort.HashShuffleManager(配置参数值为hash)org.apache.spark.shuffle.sort.SortShuffleManager(配置参数值为sort)

可在spark-default.conf中加入以下内容使用SORT模式:

Spark.shuffle.maager SORT

这两个ShuffleManager如何选择呢,首先须要了解他们在实现方式上的区别。

HashShuffleManager,故名思义也就是在Shuffle的过程当中写数据时不作排序操做,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。带来的问题就是若是Reduce分区的数量比较大的话,将会产生大量的磁盘文件。若是文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外因为同时打开的文件句柄数量众多,序列化,以及压缩等操做须要分配的临时内存空间也可能会迅速膨胀到没法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的状况下尤其突出,例如Spark on Yarn模式。

SortShuffleManager,是1.1版本以后实现的一个试验性(也就是一些功能和接口还在开发演变中)的ShuffleManager,它在写入分区数据的时候,首先会根据实际状况对数据采用不一样的方式进行排序操做,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不一样的Reduce分区中去的全部数据均可以写入到同一个外部磁盘文件中去,用简单的Offset标志不一样Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只须要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题

二者的性能比较,取决于内存,排序,文件操做等因素的综合影响。

对于不须要进行排序的Shuffle操做来讲,如repartition等,若是文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager须要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。

而对于原本就须要在Map端进行排序的Shuffle操做来讲,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然须要排序,而SortShuffleManager则能够将写数据和排序两个工做合并在一块儿执行,所以即便不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。


5. Shuffle相关属性


可在conf/spark-defaults.conf中配置,或者是在spark-submit --conf中提供参数

(eg.spark-submit --conf spark.shuffle.spill=false)

属性名

缺省值

含义

spark.shuffle.consolidateFiles

false

若是为true,shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来讲,合并文件能够提升文件系统性能,若是使用的是ext4xfs文件系统,建议设置为true;对于ext3,因为文件系统的限制,设置为true反而会使内核>8的机器下降性能

spark.shuffle.spill

true

若是为true,经过使数据一出道磁盘对reduce阶段内存的使用进行限制。移除的阈值由spark.shuffle.memoryFraction指定

spark.shuffle.spill.compress

true

shuffle时是否压缩溢出的数据。具体压缩方式由spark.io.compression.codec属性设定。

spark.shuffle.memoryFraction

0.2

只有spark.shuffle.spill设为true,此选项才有意义,决定了当shuffle过程当中使用的内存达到总内存多少比例的时候开始Spill,默认为20%,若是Spill的太频繁,能够适当增长该数值,减小Spill次数。

spark.shuffle.compress

true

是否压缩map的输出文件,一般选择压缩。具体压缩方式spark.io.compression.codec属性设定。

spark.shuffle.file.buffer.kb

32

每一个shuffle的文件输出流的内存缓冲区大小,单位是kb。这些缓冲区减小了建立shuffle中间文件时的系统调用以及磁盘寻道的次数。

spark.reducer.maxMbInFlight

48

设定同时从reduce任务中取出的Map输出最大值.单位是MB。由于要为每一份输出建立一个缓冲区进行接收,这表示每一个reduce任务要消耗固定大小的内存,因此,尽可能使这个选项的值较小,除非有大量的内存可用。

spark.shuffle.manager

HASH

对数据进行shuffle时执行的shuffle管理器。基于Hashshuffle管理器是默认的,可是从spark1.1开始,出现了基于排序的shuffle管理器,后者在小的executor环境下,如YARN中会有更好的内存效率。要使用后者,将值设定为SORT

spark.shuffle.sort.

bypassMergeThreshold

200

该参数只适用于spark.shuffle.manager设置为SORT,由于SortShuffleManager在处理不须要排序的shuffle操做时,会因为排序引发性能降低,该参数决定了在Reduce分区少于200时,不使用Merge Sort的方式处理数据,而是与Hash Shuffle相似,直接将分区文件写入调度的文件,不一样的是在最后仍是会将这些文件合并成一个独立的文件。经过取出Sort步骤来加快处理速度,带价是须要并发打开多个文件,致使农村消耗增长,本质是相对HashShuffleManager的一个折衷方案,若是GC问题严重,能够下降该值

相关文章
相关标签/搜索