在MapReduce框架中,shuffle是链接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须通过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark做为MapReduce框架的一种实现,天然也实现了shuffle的逻辑,本文就深刻研究Spark的shuffle是如何实现的,有什么优缺点,与Hadoop MapReduce的shuffle有什么不一样。git
Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果须要按key哈希,而且分发到每个Reducer上去,这个过程就是shuffle。因为shuffle涉及到了磁盘的读写和网络的传输,所以shuffle性能的高低直接影响到了整个程序的运行效率。github
下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle phase是介于Map phase和Reduce phase之间。算法
概念上shuffle就是一个沟通数据链接的桥梁,那么实际上shuffle这一部分是如何实现的的呢,下面咱们就以Spark为例讲一下shuffle在Spark中的实现。apache
先以图为例简单描述一下Spark中shuffle的整一个流程:缓存
首先每个Mapper会根据Reducer的数量建立出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。网络
其次Mapper产生的结果会根据设置的partition算法填充到每一个bucket中去。这里的partition算法是能够自定义的,固然默认的算法是根据key哈希到不一样的bucket中去。app
当Reducer启动时,它会根据本身task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket做为Reducer的输入进行处理。框架
这里的bucket是一个抽象概念,在实现中每一个bucket能够对应一个文件,能够对应文件的一部分或是其余等。socket
接下来咱们分别从shuffle write和shuffle fetch这两块来说述一下Spark的shuffle进化史。ide
在Spark 0.6和0.7的版本中,对于shuffle数据的存储是以文件的方式存储在block manager中,与rdd.persist(StorageLevel.DISk_ONLY)
采起相同的策略,能够参看:
override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions ... // Partition the map output. val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) for (elem <- rdd.iterator(split, taskContext)) { val pair = elem.asInstanceOf[(Any, Any)] val bucketId = dep.partitioner.getPartition(pair._1) buckets(bucketId) += pair } ... val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i // Get a Scala iterator from Java map val iter: Iterator[(Any, Any)] = buckets(i).iterator val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) totalBytes += size } ...}
我已经将一些干扰代码删去。能够看到Spark在每个Mapper中为每一个Reducer建立一个bucket,并将RDD计算结果放进bucket中。须要注意的是每一个bucket是一个ArrayBuffer
,也就是说Map的输出结果是会先存储在内存。
接着Spark会将ArrayBuffer中的Map输出结果写入block manager所管理的磁盘中,这里文件的命名方式为:shuffle_ + shuffle_id + "_" + map partition id + "_" + shuffle partition id
。
早期的shuffle write有两个比较大的问题:
Map的输出必须先所有存储到内存中,而后写入磁盘。这对内存是一个很是大的开销,当内存不足以存储全部的Map output时就会出现OOM。
每个Mapper都会产生Reducer number个shuffle文件,若是Mapper个数是1k,Reducer个数也是1k,那么就会产生1M个shuffle文件,这对于文件系统是一个很是大的负担。同时在shuffle数据量不大而shuffle文件又很是多的状况下,随机写也会严重下降IO的性能。
在Spark 0.8版本中,shuffle write采用了与RDD block write不一样的方式,同时也为shuffle write单首创建了ShuffleBlockManager
,部分解决了0.6和0.7版本中遇到的问题。
首先咱们来看一下Spark 0.8的具体实现:
override def run(attemptId: Long): MapStatus = { ... val blockManager = SparkEnv.get.blockManager var shuffle: ShuffleBlocks = null var buckets: ShuffleWriterGroup = null try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) buckets = shuffle.acquireWriters(partition) // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, taskContext)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) buckets.writers(bucketId).write(pair) } // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.size() totalBytes += size MapOutputTracker.compressSize(size) } ... } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. if (buckets != null) { buckets.writers.foreach(_.revertPartialWrites()) } throw e } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && buckets != null) { shuffle.releaseWriters(buckets) } // Execute the callbacks on task completion. taskContext.executeOnCompleteCallbacks() } }}
在这个版本中为shuffle write添加了一个新的类ShuffleBlockManager
,由ShuffleBlockManager
来分配和管理bucket。同时ShuffleBlockManager
为每个bucket分配一个DiskObjectWriter
,每一个write handler拥有默认100KB的缓存,使用这个write handler将Map output写入文件中。能够看到如今的写入方式变为buckets.writers(bucketId).write(pair)
,也就是说Map output的key-value pair是逐个写入到磁盘而不是预先把全部数据存储在内存中在总体flush到磁盘中去。
ShuffleBlockManager
的代码以下所示:
private[spark]class ShuffleBlockManager(blockManager: BlockManager) { def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId) blockManager.getDiskBlockWriter(blockId, serializer, bufferSize) } new ShuffleWriterGroup(mapId, writers) } override def releaseWriters(group: ShuffleWriterGroup) = { // Nothing really to release here. } } }}
Spark 0.8显著减小了shuffle的内存压力,如今Map output不须要先所有存储在内存中,再flush到硬盘,而是record-by-record写入到磁盘中。同时对于shuffle文件的管理也独立出新的ShuffleBlockManager
进行管理,而不是与rdd cache文件在一块儿了。
可是这一版Spark 0.8的shuffle write仍然有两个大的问题没有解决:
首先依旧是shuffle文件过多的问题,shuffle文件过多一是会形成文件系统的压力过大,二是会下降IO的吞吐量。
其次虽然Map output数据再也不须要预先在内存中evaluate显著减小了内存压力,可是新引入的DiskObjectWriter
所带来的buffer开销也是一个不容小视的内存开销。假定咱们有1k个Mapper和1k个Reducer,那么就会有1M个bucket,于此同时就会有1M个write handler,而每个write handler默认须要100KB内存,那么总共须要100GB的内存。这样的话仅仅是buffer就须要这么多的内存,内存的开销是惊人的。固然实际状况下这1k个Mapper是分时运行的话,所需的内存就只有cores * reducer numbers * 100KB
大小了。可是reducer数量不少的话,这个buffer的内存开销也是蛮厉害的。
为了解决shuffle文件过多的状况,Spark 0.8.1引入了新的shuffle consolidation,以期显著减小shuffle文件的数量。
首先咱们以图例来介绍一下shuffle consolidation的原理。
假定该job有4个Mapper和4个Reducer,有2个core,也就是能并行运行两个task。咱们能够算出Spark的shuffle write共须要16个bucket,也就有了16个write handler。在以前的Spark版本中,每个bucket对应的是一个文件,所以在这里会产生16个shuffle文件。
而在shuffle consolidation中每个bucket并不是对应一个文件,而是对应文件中的一个segment,同时shuffle consolidation所产生的shuffle文件数量与Spark core的个数也有关系。在上面的图例中,job的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到以前的8个文件后面,这样一共就只有8个shuffle文件,而在文件内部这有16个不一样的segment。所以从理论上讲shuffle consolidation所产生的shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。
须要注意的是当 M=C时shuffle consolidation所产生的文件数和以前的实现是同样的。
Shuffle consolidation显著减小了shuffle文件的数量,解决了以前版本一个比较严重的问题,可是writer handler的buffer开销过大依然没有减小,若要减小writer handler的buffer开销,咱们只能减小Reducer的数量,可是这又会引入新的问题,下文将会有详细介绍。
讲完了shuffle write的进化史,接下来要讲一下shuffle fetch了,同时还要讲一下Spark的aggregator,这一块对于Spark实际应用的性能相当重要。
Shuffle write写出去的数据要被Reducer使用,就须要shuffle fetcher将所需的数据fetch过来,这里的fetch包括本地和远端,由于shuffle数据有可能一部分是存储在本地的。Spark对shuffle fetcher实现了两套不一样的框架:NIO经过socket链接去fetch数据;OIO经过netty server去fetch数据。分别对应的类是BasicBlockFetcherIterator
和NettyBlockFetcherIterator
。
在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator
,而BasicBlockFetcherIterator
在shuffle数据量比较大的状况下performance始终不是很好,没法充分利用网络带宽,为了解决这个问题,添加了新的shuffle fetcher来试图取得更好的性能。对于早期shuffle性能的评测能够参看Spark usergroup。固然如今BasicBlockFetcherIterator
的性能也已经好了不少,使用的时候能够对这两种实现都进行测试比较。
接下来讲一下aggregator。咱们都知道在Hadoop MapReduce的shuffle过程当中,shuffle fetch过来的数据会进行merge sort,使得相同key下的不一样value按序归并到一块儿供Reducer使用,这个过程能够参看下图:
全部的merge sort都是在磁盘上进行的,有效地控制了内存的使用,可是代价是更多的磁盘IO。
那么Spark是否也有merge sort呢,仍是以别的方式实现,下面咱们就细细说明。
首先虽然Spark属于MapReduce体系,可是对传统的MapReduce算法进行了必定的改变。Spark假定在大多数用户的case中,shuffle数据的sort不是必须的,好比word count,强制地进行排序只会使性能变差,所以Spark并不在Reducer端作merge sort。既然没有merge sort那Spark是如何进行reduce的呢?这就要说到aggregator了。
aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。当咱们在作word count reduce计算count值的时候,它会将shuffle fetch到的每个key-value pair更新或是插入到hashmap中(若在hashmap中没有查找到,则插入其中;若查找到则更新value值)。这样就不须要预先把全部的key-value进行merge sort,而是来一个处理一个,省下了外部排序这一步骤。但同时须要注意的是reducer的内存必须足以存放这个partition的全部key和count值,所以对内存有必定的要求。
在上面word count的例子中,由于value会不断地更新,而不须要将其所有记录在内存中,所以内存的使用仍是比较少的。考虑一下若是是group by key这样的操做,Reducer须要获得key对应的全部value。在Hadoop MapReduce中,因为有了merge sort,所以给予Reducer的数据已是group by key了,而Spark没有这一步,所以须要将key和对应的value所有存放在hashmap中,并将value合并成一个array。能够想象为了可以存放全部数据,用户必须确保每个partition足够小到内存可以容纳,这对于内存是一个很是严峻的考验。所以Spark文档中建议用户涉及到这类操做的时候尽可能增长partition,也就是增长Mapper和Reducer的数量。
增长Mapper和Reducer的数量当然能够减少partition的大小,使得内存能够容纳这个partition。可是咱们在shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增长的更多,由此带来write handler所需的buffer也会更多。在一方面咱们为了减小内存的使用采起了增长task数量的策略,另外一方面task数量增多又会带来buffer开销更大的问题,所以陷入了内存使用的两难境地。
为了减小内存的使用,只能将aggregator的操做从内存移到磁盘上进行,Spark社区也意识到了Spark在处理数据规模远远大于内存大小时所带来的问题。所以PR303提供了外部排序的实现方案,相信在Spark 0.9 release的时候,这个patch应该能merge进去,到时候内存的使用量能够显著地减小。
本文详细地介绍了Spark的shuffle实现是如何进化的,以及遇到问题解决问题的过程。shuffle做为Spark程序中很重要的一个环节,直接影响了Spark程序的性能,现现在的Spark版本虽然shuffle实现还存在着种种问题,可是相比于早期版本,已经有了很大的进步。开源代码就是如此不停地迭代推动,随着Spark的普及程度愈来愈高,贡献的人愈来愈多,相信后续的版本会有更大的提高。