ShuffleManager 原理

在 Spark 的源码中,负责 shuffle 过程的执行、计算、处理的组件主要是 ShuffleManager。网络

在 Spark 1.2 之前,默认的 shuffle 计算引擎是 HashShuffleManager。该 ShuffleMananger 有一个很是严重的弊端,就是会产生大量的磁盘文件,进而有大量的磁盘 IO 操做,比较影响性能。数据结构

所以在 Spark 1.2 以后,默认的 ShuffleManager 改为了 SortShuffleManager。SortShuffleManager 相对来讲,有了必定的改进。主要就在于,每一个 Task 在 Shuffle Write 操做时,虽然也会产生较大的磁盘文件,但最后会将全部的临时文件合并 (merge) 成一个磁盘文件,所以每一个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取本身数据的时候,只要根据索引拉取每一个磁盘文件中的部分数据便可。ide

一,HashShuffleManager 运行原理

普通模式下,在 Shuffle Write 阶段,每一个 Task 将数据按照 Key 进行 Hash 计算,而后按照计算结果,将相同的 Key 对应的数据写入内存缓冲区,当内存缓冲区写满以后会直接溢写到磁盘文件。这里须要写多少个磁盘文件,和下一个 stage 的 Shuffle Read Task 的数量一致。性能

而后,Shuffle Read 阶段的每一个 Task 会拉取 Shuffle Write 阶段全部相同 Key 的文件,一遍拉取一遍聚合。每一个 Shuffle Read 阶段的 Task 都有本身的缓冲区,每次只能拉取与缓冲区大小一致的数据,而后经过内存中的 Map 进行聚合等操做,聚合完一批再取下一批数据。大数据

好比,当前 Stage 有 5 个 Executor,每一个 Executor 分配一个 cpu core,有 50 个 task,每一个 Executor 执行 10 个 task;下一个 stage 有100 个 task。那么在 Shuffle Write 阶段每一个 task 要建立 100 个磁盘文件,每一个 Executor 进程要建立 1000 个文件,一共要建立 1000 * 5 = 5000 个磁盘文件,数量不少。优化

具体执行原理图以下图所示:spa

针对 HashShuffleManager 咱们能够设置一个参数:spark.shuffle.consolidateFiles。这个参数的值默认是 fasle,若是设置成 true 以后就会开启优化机制。3d

当开启这个参数以后,在 Shuffle Write 阶段写文件的时候会复用文件,每一个 task 不会为 Shuffle Read 阶段的 task 都建立一份文件。此时会出现一个 shuffleFileGroup 的概念,每一个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量和 Shuffle Read 阶段的 task 数量一致。每一个 Executor 上有多少个 cpu core 就会并行执行几个 task,每一个 task 会建立一个 shuffleFileGroup,而后后续并行执行的 task 会复用前面生成的这个 shuffleFileGroup。code

好比,当前 stage 有 5 个 Executor,每一个 Executor 分配 3 个 cpu core,一共有 50 个 task,每一个 Executor 执行 10 个 task,Shuffle Read 阶段有 100 个 task。那么此时,每一个 Executor 进程会建立 3 * 100 个文件,一共会建立 5 * 3 * 100 个文件。cdn

具体原理如图示:

二,SortShuffleManager 运行原理

SortShuffleManager 运行机制有两种,一种是普通运行机制,另外一种是 bypass 运行机制。当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数值时 (默认是 200 ) ,就会启用 bypass 机制。

1,普通机制

在该模式下,Shuffle Write 阶段会将数据写入一个内存的数据结构中,此时根据不一样的算子会有不一样的数据结构。好比是 reduceByKey 这种聚合类的 shuffle 算子,会选用 Map 数据结构,一遍用 Map 进行聚合(HashShuffleManager 聚合操做是放在 Shuffle Read 阶段),一遍写入内存;若是是 join 相关的普通 shuffle 算子的话,会用 Array 数据结构,直接写入内存。当内存达到临界阈值以后,会将内存中的数据进行排序,而后分批次写入磁盘 (默认每批次有 1W 条数据),在写入磁盘的时候不会像 HashShuffleManager 那样直接写入磁盘,这里会先写入内存缓冲流,当缓冲流满溢以后一次性写入磁盘。

此时也会生成大批量的文件,最后会将以前全部的临时磁盘文件进行合并,这就是 merge 过程 (就是将全部的临时磁盘文件中的数据读取出来,而后依次写入最终的文件中)。每一个 task 最终会生成一份磁盘文件和一份索引文件,索引文件中标示了下游每一个 task 的数据在文件中的 start offset 和 end offset。

好比,当前 stage 有 5 个 Executor,每一个 Executor 分配 1 个 cpu core,共有 50 个 task,每一个 Executor 执行 10 个 task;下一个 stage 有 100 个 task。那么每一个 Executor 建立 10 个磁盘文件,一共有 50 个磁盘文件。

具体以下图所示:

2,bypass 机制

触发该机制的条件:

1,shuffle reduce 端的 task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数值的时候;

2,不是聚合类的shuffle算子(好比reduceByKey);

该机制下,当前 stage 的每一个 task 会将数据的 key 进行 hash,而后将相同 hash 的 key 锁对应的数据写入到同一个内存缓冲区,缓冲写满后会溢写到磁盘文件,这里和 HashShuffleManager一致。

而后会进入 merge 阶段,将全部的磁盘文件合并成一个磁盘文件,并建立一个索引文件。

相比较于普通机制,这里有两个地方不一样:

1,将数据写入内存时候,普通模式是将数据写入 Map 或者 Array 这样的内存数据结构中,这里是根据 key 的 Hash 值直接写入内存;

2,该模式下在写入磁盘以前不会排序;

3,磁盘写机制不一样。

具体如图示:

三,shuffle 相关的参数

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件以前,会先写入 buffer 缓冲中,待缓冲写满以后,才会溢写到磁盘。
  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比 64k),从而减小 shuffle write 过程当中溢写磁盘文件的次数,也就能够减小磁盘 IO 次数,进而提高性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提高。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个 buffer 缓冲决定了每次可以拉取多少数据。
  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比 96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提高。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。
  • 调优建议:对于那些包含了特别耗时的 shuffle 操做的做业,建议增长重试最大次数(好比 60 次),以免因为 JVM 的 full gc 或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数能够大幅度提高稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是 5s。
  • 调优建议:建议加大间隔时长(好比 60s),以增长 shuffle 操做的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数表明了 Executor 内存中,分配给 shuffle read task 进行聚合操做的内存比例,默认是 20%。
  • 调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给 shuffle read 的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高 10% 左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置 ShuffleManager 的类型。Spark 1.5 之后,有三个可选项:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 之前的默认选项,可是 Spark 1.2 以及以后的版本默认都是 SortShuffleManager 了。tungsten-sort 与 sort 相似,可是使用了 tungsten 计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:因为 SortShuffleManager 默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的 SortShuffleManager 就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过 bypass 机制或优化的 HashShuffleManager 来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort 要慎用,由于以前发现了一些相应的 bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当 ShuffleManager 为 SortShuffleManager 时,若是 shuffle read task 的数量小于这个阈值(默认是200),则 shuffle write 过程当中不会进行排序操做,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。
  • 调优建议:当你使用 SortShuffleManager 时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会自动启用 bypass 机制,map-side 就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以 shuffle write 性能有待提升。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:若是使用 HashShuffleManager,该参数有效。若是设置为 true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于 shuffle read task 数量特别多的状况下,这种方法能够极大地减小磁盘 IO 开销,提高性能。
  • 调优建议:若是的确不须要 SortShuffleManager 的排序机制,那么除了使用 bypass 机制,还能够尝试将 spark.shffle.manager 参数手动指定为 hash,使用 HashShuffleManager,同时开启 consolidate 机制。在实践中尝试过,发现其性能比开启了 bypass 机制的 SortShuffleManager 要高出 10%~30%。
相关文章
相关标签/搜索