Shuffle 过程
上一章里讨论了 job 的物理执行图,也讨论了流入 RDD 中的 records 是怎么被 compute() 后流到后续 RDD 的,同时也分析了 task 是怎么产生 result,以及 result 怎么被收集后计算出最终结果的。然而,咱们尚未讨论
数据是怎么经过 ShuffleDependency 流向下一个 stage 的?
对比 Hadoop MapReduce 和 Spark 的 Shuffle 过程
若是熟悉 Hadoop MapReduce 中的 shuffle 过程,可能会按照 MapReduce 的思路去想象 Spark 的 shuffle 过程。然而,它们之间有一些区别和联系。
从 high-level 的角度来看,二者并无大的差异。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不一样的 partition 送到不一样的 reducer(Spark 里 reducer 多是下一个 stage 里的 ShuffleMapTask,也多是 ResultTask)。Reducer 之内存做缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好之后进行 reduce() (Spark 里多是后续的一系列操做)。
从 low-level 的角度来看,二者差异不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 能够处理大规模的数据,由于其输入数据能够经过
外排获得(mapper 对每段数据先作排序,reducer 的 shuffle 对排好序的每段数据作归并)。目前的 Spark 默认选择的是 hash-based,一般使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提早排序。若是用户须要通过排序的数据,那么须要本身调用相似 sortByKey() 的操做;若是你是Spark 1.1的用户,能够将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将做为默认的Shuffle实现。
从实现角度来看,二者也有很多差异。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每一个阶段各司其职,能够按照过程式的编程思想来逐一实现每一个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不一样的 stage 和一系列的 transformation(),因此 spill, merge, aggregate 等操做须要蕴含在 transformation() 中。 若是咱们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,
问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write
因为不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之因此要持久化,一方面是要减小内存存储空间压力,另外一方面也是为了 fault-tolerance。 shuffle write 的任务很简单,那么实现也很简单:将 shuffle write 的处理逻辑加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,该 stage 的 final RDD 每输出一个 record 就将其 partition 并持久化。图示以下:

上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,能够同时运行两个 task。每一个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每一个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为
spark.shuffle.file.buffer.kb
,默认是 32KB(Spark 1.1 版本之前是 100KB)。
其实 bucket 是一个广义的概念,表明 ShuffleMapTask 输出结果通过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。
ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算获得 finalRDD 中对应 partition 的 records。每获得一个 record 就将其送到对应的 bucket 里,具体是哪一个 bucket 由
partitioner.partition(record.getKey()))
决定。每一个 bucket 里面的数据会不断被写到本地磁盘上,造成一个 ShuffleBlockFile,或者简称
FileSegment。以后的 reducer 会去 fetch 属于本身的 FileSegment,进入 shuffle read 阶段。 这样的实现很简单,但有几个问题:
- 产生的 FileSegment 过多。每一个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。通常 Spark job 的 M 和 R 都很大,所以磁盘上会存在大量的数据文件。
- 缓冲区占用内存空间大。每一个 ShuffleMapTask 须要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 MR 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区能够被回收,但一个 worker node 上同时存在的 bucket 个数能够达到 cores R 个(通常 worker 同时能够运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了
cores * R * 32 KB
。对于 8 核 1000 个 reducer 来讲,占用内存就是 256MB。
目前来看,第二个问题尚未好的方法解决,由于写磁盘终究是要开缓冲区的,缓冲区过小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图:

能够明显看出,在一个 core 上连续执行的 ShuffleMapTasks 能够共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 造成 ShuffleBlock i,后执行的 ShuffleMapTask 能够将输出数据直接追加到 ShuffleBlock i 后面,造成 ShuffleBlock i',每一个 ShuffleBlock 被称为
FileSegment。下一个 stage 的 reducer 只须要 fetch 整个 ShuffleFile 就好了。这样,每一个 worker 持有的文件数降为 cores * R。FileConsolidation 功能能够经过
spark.shuffle.consolidateFiles=true
来开启。
Shuffle read
先看一张包含 ShuffleDependency 的物理执行图,来自 reduceByKey:

很天然地,要计算 ShuffleRDD 中的数据,必须先把 MapPartitionsRDD 中的数据 fetch 过来。那么问题就来了:
- 在何时 fetch,parent stage 中的一个 ShuffleMapTask 执行完仍是等所有 ShuffleMapTasks 执行完?
- 边 fetch 边处理仍是一次性 fetch 完再处理?
- fetch 来的数据存放到哪里?
- 怎么得到要 fetch 的数据的存放位置?
解决问题:
- 在何时 fetch?当 parent stage 的全部 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就能够 fetch,可是为了迎合 stage 的概念(即一个 stage 若是其 parent stages 没有执行完,本身是不能被提交执行的),仍是选择所有 ShuffleMapTasks 执行完再去 fetch。由于 fetch 来的 FileSegments 要先在内存作缓冲,因此一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过
spark.reducer.maxMbInFlight
,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面通常包含多个 FileSegment,但若是某个 FileSegment 特别大的话,这一个就能够填满甚至超过 softBuffer 的界限。
- 边 fetch 边处理仍是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到所有数据都 shuffle-sort 后再开始 reduce()。由于 Spark 不要求 shuffle 后的数据全局有序,所以不必等到所有数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,并且流入的 records 是无序的?答案是使用能够 aggregate 的数据结构,好比 HashMap。每 shuffle 获得(从缓冲的 FileSegment 中 deserialize 出来)一个 \<key, value\=""> record,直接将其放进 HashMap 里面。若是该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是
func(hashMap.get(Key), Value)
,好比上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value
,并将 func 的结果从新 put(key) 到 HashMap 中去。这个 func 功能上至关于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差异,差异至关于下面两段程序的差异。 // MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}
MapReduce 能够在 process 函数里面能够定义任何数据结构,也能够将部分或所有的 values 都 cache 后再进行处理,很是灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另外一个是当前读入的 record,它们通过 func 处理后的结果被下一个 record 处理时使用。所以一些算法好比求平均数,在 process 里面很好实现,直接sum(values)/values.length
,而在 Spark 中 func 能够实现sum(values)
,但很差实现/values.length
。更多的 func 将会在下面的章节细致分析。
- fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,通过处理后的数据放在内存 + 磁盘上。这里咱们主要讨论处理后的数据,能够灵活设置这些数据是“只用内存”仍是“内存+磁盘”。若是
spark.shuffle.spill = false
就只用内存。内存使用的是AppendOnlyMap
,相似 Java 的HashMap
,内存+磁盘使用的是ExternalAppendOnlyMap
,若是内存空间不足时,ExternalAppendOnlyMap
能够将 \<k, v\=""> records 进行 sort 后 spill 到磁盘上,等到须要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在二者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用一样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。
- 怎么得到要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,咱们强调 “一个 ShuffleMapStage 造成后,会将该 stage 最后一个 final RDD 注册到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
,这一步很重要,由于 shuffle 过程须要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。所以,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每一个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。
至此,咱们已经讨论了 shuffle write 和 shuffle read 设计的核心思想、算法及某些实现。接下来,咱们深刻一些细节来讨论。
典型 transformation() 的 shuffle read
1. reduceByKey(func)
上面初步介绍了 reduceByKey() 是如何实现边 fetch 边 reduce() 的。须要注意的是虽然 Example(WordCount) 中给出了各个 RDD 的内容,但一个 partition 里面的 records 并非同时存在的。好比在 ShuffledRDD 中,每 fetch 来一个 record 就当即进入了 func 进行处理。MapPartitionsRDD 中的数据是 func 在所有 records 上的处理结果。从 record 粒度上来看,reduce() 能够表示以下:

能够看到,fetch 来的 records 被逐个 aggreagte 到 HashMap 中,等到全部 records 都进入 HashMap,就获得最后的处理结果。惟一要求是 func 必须是 commulative 的(参见上面的 Spark 的 reduce() 的代码)。 ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操做。 为了减小数据传输量,MapReduce 能够在 map 端先进行 combine(),其实在 Spark 也能够实现,只须要将上图 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也进行一次便可,好比 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。
对比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():
- map 端的区别:map() 没有区别。对于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上进行 combine()。
- reduce 端区别:MapReduce 的 shuffle 阶段先 fetch 数据,数据量到达必定规模后 combine(),再将剩余数据 merge-sort 后 reduce(),reduce() 很是灵活。Spark 边 fetch 边 reduce()(在 HashMap 上执行 func),所以要求 func 符合 commulative 的特性。
从内存利用上来对比:
- map 端区别:MapReduce 须要开一个大型环形缓冲区来暂存和排序 map() 的部分输出结果,但 combine() 不须要额外空间(除非用户本身定义)。 Spark 须要 HashMap 内存数据结构来进行 combine(),同时输出 records 到磁盘上时也须要一个小的 buffer(bucket)。
- reduce 端区别:MapReduce 须要一部份内存空间来存储 shuffle 过来的数据,combine() 和 reduce() 不须要额外空间,由于它们的输入数据分段有序,只需归并一下就能够获得。在 Spark 中,fetch 时须要 softBuffer,处理数据时若是只使用内存,那么须要 HashMap 来持有处理后的结果。若是使用内存+磁盘,那么在 HashMap 存放一部分处理后的数据。
2. groupByKey(numPartitions)

与 reduceByKey() 流程同样,只是 func 变成
result = result ++ record.value
,功能是将每一个 key 对应的全部 values 连接在一块儿。result 来自 hashMap.get(record.key),计算后的 result 会再次被 put 到 hashMap 中。与 reduceByKey() 的区别就是 groupByKey() 没有 map 端的 combine()。对于 groupByKey() 来讲 map 端的 combine() 只是减小了重复 Key 占用的空间,若是 key 重复率不高,不必 combine(),不然,最好可以 combine()。
3. distinct(numPartitions)

与 reduceByKey() 流程同样,只是 func 变成
result = result == null? record.value : result
,若是 HashMap 中没有该 record 就将其放入,不然舍弃。与 reduceByKey() 相同,在map 端存在 combine()。
4. cogroup(otherRDD, numPartitions)

CoGroupedRDD 可能有 0 个、1 个或者多个 ShuffleDependency。但并非要为每个 ShuffleDependency 创建一个 HashMap,而是全部的 Dependency 共用一个 HashMap。与 reduceByKey() 不一样的是,HashMap 在 CoGroupedRDD 的 compute() 中创建,而不是在 mapPartitionsWithContext() 中创建。 粗线表示的 task 首先 new 出一个 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的个数与参与 cogroup 的 RDD 个数相同。func 的逻辑是这样的:每当从 RDD a 中 shuffle 过来一个 \<key, value\=""> record 就将其添加到 hashmap.get(Key) 对应的 Array 中的第一个 ArrayBuffer() 中,每当从 RDD b 中 shuffle 过来一个 record,就将其添加到对应的 Array 中的第二个 ArrayBuffer()。 CoGroupedRDD => MappedValuesRDD 对应 mapValues() 操做,就是将 [ArrayBuffer(), ArrayBuffer()] 变成 [Iterable[V], Iterable[W]]。
5. intersection(otherRDD) 和 join(otherRDD, numPartitions)

这两个操做中均使用了 cogroup,因此 shuffle 的处理方式与 cogroup 同样。
6. sortByKey(ascending, numPartitions)

sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑与 reduceByKey() 不太同样,没有使用 HashMap 和 func 来处理 fetch 过来的 records。 sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑是:将 shuffle 过来的一个个 record 存放到一个 Array 里,而后按照 Key 来对 Array 中的 records 进行 sort。
7. coalesce(numPartitions, shuffle = true)

coalesce() 虽然有 ShuffleDependency,但不须要对 shuffle 过来的 records 进行 aggregate,因此没有创建 HashMap。每 shuffle 一个 record,就直接流向 CoalescedRDD,进而流向 MappedRDD 中。
Shuffle read 中的 HashMap
HashMap 是 Spark shuffle read 过程当中频繁使用的、用于 aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另外一种是内存+磁盘的 ExternalAppendOnlyMap。下面咱们来分析一下
二者特性及内存使用状况。
1. AppendOnlyMap
AppendOnlyMap 的官方介绍是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是相似 HashMap,但没有
remove(key)
方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。以下图:

当要 put(K, V) 时,先 hash(K) 找存放位置,
若是存放位置已经被占用,就使用 Quadratic probing 探测方法来找下一个空闲位置。对于图中的 K6 来讲,第三次查找找到 K4 后面的空闲位置,放进去便可。get(K6) 的时候相似,找三次找到 K6,取出紧挨着的 V6,与先来的 value 作 func,结果从新放到 V6 的位置。 迭代 AppendOnlyMap 中的元素的时候,从前到后扫描输出。 若是 Array 的利用率达到 70%,那么就扩张一倍,并对全部 key 进行 rehash 后,从新排列每一个 key 的位置。 AppendOnlyMap 还有一个
destructiveSortedIterator(): Iterator[(K, V)]
方法,能够返回 Array 中排序后的 (K, V) pairs。实现方法很简单:先将全部 (K, V) pairs compact 到 Array 的前端,并使得每一个 (K, V) 占一个位置(原来占两个),以后直接调用 Array.sort() 排序,不过这样作会破坏数组(key 的位置变化了)。
2. ExternalAppendOnlyMap

相比 AppendOnlyMap,ExternalAppendOnlyMap 的实现略复杂,但逻辑其实很简单,相似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 过程: ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 过程与原始的 AppendOnlyMap 如出一辙。
若是 AppendOnlyMap 快被装满时检查一下内存剩余空间是否能够够扩展,够就直接在内存中扩展,不够就 sort 一下 AppendOnlyMap,将其内部全部 records 都 spill 到磁盘上。图中 spill 了 4 次,每次 spill 完在磁盘上生成一个 spilledMap 文件,而后从新 new 出来一个 AppendOnlyMap。最后一个 (K, V) record insert 到 AppendOnlyMap 后,表示全部 shuffle 来的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已经被处理完,由于每次 insert 的时候,新来的 record 只与 AppendOnlyMap 中的 records 进行 aggregate,并非与全部的 records 进行 aggregate(一些 records 已经被 spill 到磁盘上了)。所以当须要 aggregate 的最终结果时,须要对 AppendOnlyMap 和全部的 spilledMaps 进行全局 merge-aggregate。
全局 merge-aggregate 的流程也很简单:先将 AppendOnlyMap 中的 records 进行 sort,造成 sortedMap。而后利用 DestructiveSortedIterator 和 DiskMapIterator 分别从 sortedMap 和各个 spilledMap 读出一部分数据(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 须要具备相同的 hash(key),因此图中第一个 spilledMap 只读出前三个 records 进入 StreamBuffer。mergeHeap 顾名思义就是使用堆排序不断提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并将其一个个放入 mergeBuffers 中,放入的时候与已经存在于 mergeBuffers 中的 StreamBuffer 进行 merge-combine,第一个被放入 mergeBuffers 的 StreamBuffer 被称为 minBuffer,那么 minKey 就是 minBuffer 中第一个 record 的 key。当 merge-combine 的时候,与 minKey 相同的 records 被 aggregate 一块儿,而后输出。整个 merge-combine 在 mergeBuffers 中结束后,StreamBuffer 剩余的 records 随着 StreamBuffer 从新进入 mergeHeap。一旦某个 StreamBuffer 在 merge-combine 后变为空(里面的 records 都被输出了),那么会使用 DestructiveSortedIterator 或 DiskMapIterator 从新装填 hash(key) 相同的 records,而后再从新进入 mergeHeap。 整个 insert-merge-aggregate 的过程有三点须要进一步探讨一下:
- 内存剩余空间检测 与 Hadoop MapReduce 规定 reducer 中 70% 的空间可用于 shuffle-sort 相似,Spark 也规定 executor 中
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
的空间(默认是0.3 * 0.8
)可用于 ExternalOnlyAppendMap。Spark 略保守是否是?更保守的是这 24% 的空间不是彻底用于一个 ExternalOnlyAppendMap 的,而是由在 executor 上同时运行的全部 reducer 共享的。为此,exectuor 专门持有一个 ShuffleMemroyMap: HashMap[threadId, occupiedMemory]
来监控每一个 reducer 中 ExternalOnlyAppendMap 占用的内存量。每当 AppendOnlyMap 要扩展时,都会计算ShuffleMemroyMap 持有的全部 reducer 中的 AppendOnlyMap 已占用的内存 + 扩展后的内存 是会否会大于内存限制,大于就会将 AppendOnlyMap spill 到磁盘。有一点须要注意的是前 1000 个 records 进入 AppendOnlyMap 的时候不会启动是否要 spill 的检查,须要扩展时就直接在内存中扩展。
- AppendOnlyMap 大小估计 为了获知 AppendOnlyMap 占用的内存空间,能够在每次扩展时都将 AppendOnlyMap reference 的全部 objects 大小都算一遍,而后加和,但这样作很是耗时。因此 Spark 设计了粗略的估算算法,算法时间复杂度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小变化及一共 insert 的 records 的个数来估算大小,具体见
SizeTrackingAppendOnlyMap
和SizeEstimator
。
- Spill 过程 与 shuffle write 同样,在 spill records 到磁盘上的时候,会创建一个 buffer 缓冲区,大小仍为
spark.shuffle.file.buffer.kb
,默认是 32KB。另外,因为 serializer 也会分配缓冲区用于序列化和反序列化,因此若是一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为 spark.shuffle.spill.batchSize
,默认是 10000。
Discussion
经过本章的介绍能够发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不一样的 transformation() 的语义去设计不一样的 shuffle-aggregate 策略,再加上不一样的内存数据结构来混搭出合理的执行流程。 这章主要讨论了 Spark 是怎么在不排序 records 的状况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通讯角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。 另外,Jerry Shao 写的
详细探究Spark的shuffle实现 很赞,里面还介绍了 shuffle 过程在 Spark 中的进化史。目前 sort-based 的 shuffle 也在实现当中,stay tuned。