Spark的shuffle剖析!php
1、什么是shuffle?python
shuffle是洗牌的意思,总的来讲,就是分散在各个节点的数据,在通过计算以后,须要从新将数据进行分配,以进行下一步的计算。好比wordcount,显示在3台节点上,分别计算了spark的数量、hadoop的数量、scala的数量,结果以下:算法
节点1: (spark 1) (hadoop 1)数组
节点2: (hadoop 1) (scala 1)网络
节点3: (hadoop 1) (spark 3)数据结构
在通过计算以后,下一步就须要汇总了,那么汇总就涉及到shuffle,把数据要分类传输,好比spark的都到节点1,hadoop 的都到节点2,scala的都到节点3。框架
2、为何shuffle重要?
ide
Spark大会上,全部的演讲嘉宾都认为shuffle是最影响性能的地方,在MapReduce框架中,shuffle是链接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须通过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。函数
3、从技术上结识shuffle?
oop
Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果须要按key哈希,而且分发到每个Reducer上去,这个过程就是shuffle。因为shuffle涉及到了磁盘的读写和网络的传输,所以shuffle性能的高低直接影响到了整个程序的运行效率。
下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle phase是介于Map phase和Reduce phase之间。
4、shuffle阶段的划分
Spark的操做模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等相似的操做的时候,就须要有shuffle了。再拿出reduceByKey这个来说。
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func) }
reduceByKey的时候,咱们能够手动设定reduce的个数,若是不指定的话,就可能不受控制了。
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } }
View Code
若是不指定reduce个数的话,就按默认的走:
一、若是自定义了分区函数partitioner的话,就按你的分区函数来走。
二、若是没有定义,那么若是设置了 spark.default.parallelism ,就使用哈希的分区方式,reduce个数就是设置的这个值。
三、若是这个也没设置,那就按照输入数据的分片的数量来设定。若是是hadoop的输入数据的话,这个就多了。。。你们可要当心啊。
设定完以后,它会作三件事情,也就是以前讲的3次RDD转换。
//map端先按照key合并一次val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true)//reduce抓取数据val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)//合并数据,执行reduce计算partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true)
View Code
一、在第一个MapPartitionsRDD这里先作一次map端的聚合操做。
二、SHuffledRDD主要是作从这个抓取数据的工做。
三、第二个MapPartitionsRDD把抓取过来的数据再次进行聚合操做。
四、步骤1和步骤3都会涉及到spill的过程。
怎么作的聚合操做,回去看RDD那章。
5、Shuffle的中间结果如何存储
做业提交的时候,DAGScheduler会把Shuffle的过程切分红map和reduce两个Stage(以前一直被我叫作shuffle前和shuffle后),具体的切分的位置在上图的虚线处。
map端的任务会做为一个ShuffleMapTask提交,最后在TaskRunner里面调用了它的runTask方法。
override def runTask(context: TaskContext): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions metrics = Some(context.taskMetrics) val blockManager = SparkEnv.get.blockManager val shuffleBlockManager = blockManager.shuffleBlockManager var shuffle: ShuffleWriterGroup = null var success = false try { // serializer为空的状况调用默认的JavaSerializer,也能够经过spark.serializer来设置成别的 val ser = Serializer.getSerializer(dep.serializer) // 实例化Writer,Writer的数量=numOutputSplits=前面咱们说的那个reduce的数量 shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // 遍历rdd的元素,按照key计算出来它所在的bucketId,而后经过bucketId找到相应的Writer写入 for (elem <- rdd.iterator(split, context)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) shuffle.writers(bucketId).write(pair) } // 提交写入操做. 计算每一个bucket block的大小 var totalBytes = 0L var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) } // 更新 shuffle 监控参数. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) success = true new MapStatus(blockManager.blockManagerId, compressedSizes) } catch { case e: Exception => // 出错了,取消以前的操做,关闭writer if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) { writer.revertPartialWrites() writer.close() } } throw e } finally { // 关闭writer if (shuffle != null && shuffle.writers != null) { try { shuffle.releaseWriters(success) } catch { case e: Exception => logError("Failed to release shuffle writers", e) } } // 执行注册的回调函数,通常是作清理工做 context.executeOnCompleteCallbacks() } }
View Code
遍历每个记录,经过它的key来肯定它的bucketId,再经过这个bucket的writer写入数据。
下面咱们看看ShuffleBlockManager的forMapTask方法吧。
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件 blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) } } else { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 按照blockId来生成文件,文件数为map数*reduce数 val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) if (blockFile.exists) { if (blockFile.delete()) { logInfo(s"Removed existing shuffle file $blockFile") } else { logWarning(s"Failed to remove existing shuffle file $blockFile") } } blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } }
几个重要的结论:
一、map的中间结果是写入到本地硬盘的,而不是内存。
二、默认是一个map的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true以后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。
三、consolidateFiles采用的是一个reduce一个文件,它还记录了每一个map的写入起始位置,因此查找的时候,先经过reduceId查找到哪一个文件,再同坐mapId查找索引当中的起始位置offset,长度length=(mapId + 1).offset -(mapId).offset,这样就能够肯定一个FileSegment(file, offset, length)。
四、Finally,存储结束以后, 返回了一个new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一块儿返回。
6、Shuffle的数据如何拉取过来
ShuffleMapTask结束以后,最后在DAGScheduler的handleTaskCompletion方法当中。
一、把结果添加到Stage的outputLocs数组里,它是按照数据的分区Id来存储映射关系的partitionId->MapStaus。
二、stage结束以后,经过mapOutputTracker的registerMapOutputs方法,把这次shuffle的结果outputLocs记录到mapOutputTracker里面。
这个stage结束以后,就到ShuffleRDD运行了,咱们看一下它的compute函数。
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)
它是经过ShuffleFetch的fetch方法来抓取的,具体实如今BlockStoreShuffleFetcher里面。
一、MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。
二、把map结果信息构形成BlockManagerId --> Array(BlockId, size)的映射关系。
三、经过BlockManager的getMultiple批量拉取block。
四、返回一个可遍历的Iterator接口,并更新相关的监控参数。
咱们继续看getMultiple方法。分两种状况处理,分别是netty的和Basic的,Basic的就不讲了,就是经过ConnectionManager去指定的BlockManager那里获取数据,上一章恰好说了。
咱们讲一下Netty的吧,这个是须要设置的才能启用的,不知道性能会不会好一些呢?
看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,发现Basic的不能同时抓取超过48Mb的数据。
在NettyBlockFetcherIterator的sendRequest方法里面,发现它是经过ShuffleCopier来试下的。
这块接下来就是netty的客户端调用的方法了,我对这个不了解。在服务端的处理是在DiskBlockManager内部启动了一个ShuffleSender的服务,最终的业务处理逻辑是在FileServerHandler。
它是经过getBlockLocation返回一个FileSegment,下面这段代码是ShuffleBlockManager的getBlockLocation方法。
获取的方法前面说了,经过reduceId找到文件,再经过mapId找到它的起始位置。可是这里有个疑问了,若是启用了consolidateFiles,一个reduce的所需数据都在一个文件里,是否是就能够把整个文件一块儿返回呢,而不是经过N个map过不须要来分屡次读取?仍是惧怕一次发送一个大文件容易失败?这就不得而知了。
到这里整个过程就讲完了。能够看得出来Shuffle这块仍是作了一些优化的,可是这些参数并无启用,有须要的朋友能够本身启用一下试试效果。
参考资料: