HashShuffle调优概述算法
大多数Spark做业的性能主要就是消耗在了shuffle环 节,由于该环节包含了大量的磁盘IO、序列化、网络数据传输等操做。所以,若是要让做业的性能更上一层楼,就有必要对shuffle过程进行调优。可是也 必须提醒你们的是,影响一个Spark做业性能的因素,主要仍是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占 到一小部分而已。所以你们务必把握住调优的基本原则,千万不要舍本逐末。下面咱们就给你们详细讲解shuffle的原理,以及相关参数的说明,同时给出各 个参数的调优建议。网络
ShuffleManager发展概述数据结构
在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。性能
在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。优化
所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。SortShuffleManager相较于 HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,但 是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。spa
未优化的HashShuffleManager线程
图说明了未经优化的HashShuffleManager的原理。这里咱们先明确一个假设前提:每一个Executor只有1个CPU core,也就是说,不管这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。blog
shuffle write:shuffle write阶段就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个 task处理的数据按key进行“分类”。也就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。排序
那么每一个执行shuffle write的task,要为下一个stage建立多少个磁盘文件呢?下一个stage的task有多少个,当前stage的每一个task就要建立多少份磁盘文件。好比下一个stage总共有100个task,那么当前stage的每一个task都要建立100份磁盘文件。若是当前stage有50个 task,则总共会创建5000个磁盘文件。因而可知,未经优化的shuffle write操做所产生的磁盘文件的数量是极其惊人的。索引
shuffle read:shuffle read,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个 节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,task给下游stage的每一个task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个task只要从上游stage的全部task所在节点上,拉取属于本身的那一个磁盘文件便可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数 据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果。
优化后的HashShuffleManager
图说明了优化后的HashShuffleManager的原理。这里说的优化是指咱们能够设置一个参数,spark.shuffle.consolidateFiles=true。该参数默认值为false,一般来讲若是咱们使用HashShuffleManager,那么都建议开启这个选项。
开启consolidate机制以后,在shuffle write过程当中,task不会为下游stage的每一个task建立一个磁盘文件,此时会出现shuffleFileGroup的概念,每一个 shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。 这样就能够有效将多个task的磁盘文件进行必定程度上的合并,从而大幅度减小磁盘文件的数量,进而提高shuffle write的性能。
优化与未优化对比
假设第一个stage有 50个task, 第二个stage有100个task,总共仍是有10个Executor(CPU核心数),每一个Executor执行5个task。
未经优化的HashShuffleManager:每一个Executor会产生500个磁盘文件,全部Executor会产生5000个磁盘文件的。
每一个Executor建立的磁盘文件: 单个Executor执行task数 * 下一个stage的task数量 (5*100)
全部Executor建立的磁盘文件: 当前stage的task数 * 下一个stage的task数量 (50*100)
优化以后的HashShuffleManager: 每一个Executor此时只会建立100个磁盘文件,全部Executor只会建立1000个磁盘文件。
每一个Executor建立的磁盘文件:单个Executor CPU核心数量 * 下一个stage的task数量 (1*100)
全部 Executor建立的磁盘文件: 全部Executor CPU核心数量 * 下一个stage的task数量(10*100)
SortShuffle概述
普通机制:
a) map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
b) 在shuffle的时候会有一个定时器,不按期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,好比如今内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
c) 若是申请成功不会进行溢写,若是申请不成功,这时候会发生溢写磁盘。
d) 在溢写以前内存结构中的数据会进行排序分区
e) 而后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据,
f) map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件(有序),同时生成一个索引文件。
g) reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
产生磁盘小文件的个数: 2*M(map task的个数)索引文件-和磁盘文件
bypass机制:
a) bypass运行机制的触发条件以下:
shuffle reduce task的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。这个值默认是200。
b)产生的磁盘小文件为:2*M(map task的个数)