本篇,咱们来看一下spark内核中另外一个重要的模块,Shuffle管理器ShuffleManager。shuffle能够说是分布式计算中最重要的一个概念了,数据的join,聚合去重等操做都须要这个步骤。另外一方面,spark之因此比mapReduce的性能高其中一个主要的缘由就是对shuffle过程的优化,一方面spark的shuffle过程更好地利用内存(也就是咱们前面在分析内存管理时所说的执行内存),另外一方面对于shuffle过程当中溢写的磁盘文件归并排序和引入索引文件。固然,spark性能高的另外一个主要缘由还有对计算链的优化,把多步map类型的计算chain在一块儿,大大减小中间过程的落盘,这也是spark显著区别于mr的地方。
spark新版本的Shuffle管理器默认是SortShuffleManager。java
SparkEnv初始化部分的代码:apache
val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
看shuffle管理器的源码,咱们首先应该ShuffleManager的调用时机。想一下shuffle的过程,无非就是两个步骤,写和读。写是在map阶段,将数据按照必定的分区规则归类到不一样的分区中,读是在reduce阶段,每一个分区从map阶段的输出中拉取属于本身的数据,因此咱们分析ShuffleManager源码基本也能够沿着这个思路。咱们先来分析写的过程,由于对于一个完整的shuffle过程,确定是先写而后才读的。
回顾一下以前的对做业运行过程的分析,咱们应该还记得做业被切分红任务后是在executor端执行的,而Shuffle阶段的的stage被切分红了ShuffleMapTask,shuffle的写过程正是在这个类中完成的,咱们看一下代码:api
能够看到经过ShuffleManager.getWriter获取了一个shuffle写入器,从而将rdd的计算数据写入磁盘。数组
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() // 反序列化RDD和shuffle, 关键的步骤 // 这里思考rdd和shuffle反序列化时,内部的SparkContext对象是怎么反序列化的 val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { // shuffle管理器 val manager = SparkEnv.get.shuffleManager // 获取一个shuffle写入器 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 这里能够看到rdd计算的核心方法就是iterator方法 // SortShuffleWriter的write方法能够分为几个步骤: // 将上游rdd计算出的数据(经过调用rdd.iterator方法)写入内存缓冲区, // 在写的过程当中若是超过 内存阈值就会溢写磁盘文件,可能会写多个文件 // 最后将溢写的文件和内存中剩余的数据一块儿进行归并排序后写入到磁盘中造成一个大的数据文件 // 这个排序是先按分区排序,在按key排序 // 在最后归并排序后写的过程当中,没写一个分区就会手动刷写一遍,并记录下这个分区数据在文件中的位移 // 因此实际上最后写完一个task的数据后,磁盘上会有两个文件:数据文件和记录每一个reduce端partition数据位移的索引文件 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 主要是删除中间过程的溢写文件,向内存管理器释放申请的内存 writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
这里根据shuffle类型获取不一样的ShuffleWriter对象,大多数状况下,都是SortShuffleWriter类型,因此咱们直接看SortShuffleWriter.write方法。数据结构
/** Get a writer for a given partition. Called on executors by map tasks. */ // 获取一个shuffle存储器,在executor端被调用,在执行一个map task调用 override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
总结一下这个方法的主要逻辑:app
在stop方法中还会作一些收尾工做,统计磁盘io耗时,删除中间溢写文件分布式
override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { // map端进行合并的状况,此时用户应该提供聚合器和顺序 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 将map数据所有写入排序器中, // 这个过程当中可能会生成多个溢写文件 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // mapId就是shuffleMap端RDD的partitionId // 获取这个map分区的shuffle输出文件名 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) // 加一个uuid后缀 val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) // 这一步将溢写到的磁盘的文件和内存中的数据进行归并排序, // 并溢写到一个文件中,这一步写的文件是临时文件名 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 这一步主要是写入索引文件,使用move方法原子第将临时索引和临时数据文件重命名为正常的文件名 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) // 返回一个状态对象,包含shuffle服务Id和各个分区数据在文件中的位移 mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
咱们首先看一下获取shuffle输出文件名,是经过IndexShuffleBlockResolver组件获取的,而它的内部又是经过BlockManager内部的DiskBlockManager分配文件名的,这个DiskBlockManager我在以前分析块管理器时提到过,它的做用就是管理文件名的分配,以及spark使用的目录,子目录的建立删除等。咱们看到对于数据文件和索引文件的命名规则是不同的,他们的命名规则分别定义在ShuffleDataBlockId和ShuffleIndexBlockId中。ide
def getDataFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) }
咱们根据SortShuffleWriter中的调用顺序,首先看一下ExternalSorter.insertAll方法:性能
而后将数据一条一条地循环插入内存的存储结构中,同时考虑到map端合并的状况优化
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 在map端进行合并的状况 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() // 向内存缓冲中插入一条数据 map.changeValue((getPartition(kv._1), kv._1), update) // 若是缓冲超过阈值,就会溢写到磁盘生成一个文件 // 每写入一条数据就检查一遍内存 maybeSpillCollection(usingMap = true) } } else {// 再也不map端合并的状况 // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
咱们看一个稍微复杂一点的结构,AppendOnlyMap,
能够看出,这个结构设计仍是很精良的,这里面有个很重的方法,incrementSize方法中会判断当前数据量的大小,若是超过阈值就会扩容,这个扩容的方法比较复杂,就是一个从新hash再分布的过程,不过有一点,发不管是在插入新数据仍是从新hash再分布的过程当中,对于hash碰撞的处理策略必定要相同,不然可能形成不一致。
// 向数组中插入一个kv对, def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] // 处理key为空的状况 if (k.eq(null)) { // 若是是第一次插入空值,那么须要将大小增长1 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) haveNullValue = true return nullValue } var pos = rehash(k.hashCode) & mask // 线性探测法处理hash碰撞 // 这里是一个加速的线性探测,即第一次碰撞时走1步, // 第二次碰撞时走2步,第三次碰撞时走3步 var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) {// 若是旧值不存在,直接插入 val newValue = updateFunc(false, null.asInstanceOf[V]) data(2 * pos) = k data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] incrementSize() return newValue } else if (k.eq(curKey) || k.equals(curKey)) {// 若是旧值存在,须要更新 val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue } else {// 发生hash碰撞,向后探测,跳跃性的探测 val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
咱们回到ExternalSorter的插入方法中,没插入一条数据都要检查内存占用,判断是否须要溢写到磁盘,若是须要就溢写到磁盘。
这个方法里调用了map.estimateSize来估算当前插入的数据的内存占用大小,对于内存占用的追踪和估算的功能是在SizeTracker特质中实现的,这个特质我在以前分析MemoryStore时提到过,在将对象类型的数据插入内存中时使用了一个中间态的数据结构DeserializedValuesHolder,它的内部有一个SizeTrackingVector,这个类就是经过继承SizeTracker特征从而实现对象大小的追踪和估算。
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }
首先检查当前内存占用是否超过阈值,若是超过会申请一次执行内存,若是没有申请到足够的执行内存,那么依然须要溢写到磁盘
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false // 每写入32条数据检查一次 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold // 向内存管理器申请执行内存 val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection // 若是内存占用超过了阈值,那么就须要溢写 shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) // 溢写到磁盘 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 释放内存 releaseMemory() } shouldSpill }
接着上面的方法,
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 获取一个排序后的迭代器 val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 将数据写入磁盘文件中 val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) spills += spillFile }
这个方法返回按照分区和key排序过的迭代器,其具体的排序逻辑在AppendOnlyMap.destructiveSortedIterator中
这段代码分为两块,首先对数组进行压紧,是的稀疏的数据所有转移到数组的头部;
而后对数组按照比较器进行排序,比较器首先是按照分区进行比较,若是分区相同才按照key进行比较;
而后返回一个迭代器,这个迭代器仅仅是对数组的封装。经过这个方法,咱们大概知道了AppendonlyMap的排序逻辑。
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = { destroyed = true // Pack KV pairs into the front of the underlying array // 这段代码将稀疏的数据所有转移到数组头部,将数据压紧 var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(2 * newIndex) = data(2 * keyIndex) data(2 * newIndex + 1) = data(2 * keyIndex + 1) newIndex += 1 } keyIndex += 1 } assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) // 根据比较器对数据进行排序 new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator) new Iterator[(K, V)] { var i = 0 var nullValueReady = haveNullValue def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { if (nullValueReady) { nullValueReady = false (null.asInstanceOf[K], nullValue) } else { val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V]) i += 1 item } } } }
回到ExternalSorter.spill方法中,在获取了通过排序后 的迭代器以后,咱们就能够将数据溢写到磁盘上了。
这个方法的代码我不贴了,总结一下主要步骤:
总结一下数据经过ExternalSorter向磁盘溢写的全过程:
总结一下主要的步骤:
循环将数据写入磁盘,每当一个分区的数据写完后,进行一次刷写,将数据从os的文件缓冲区同步到磁盘上,而后获取此时的文件长度,记录下每一个分区在文件中的位移
def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) // 若是前面没有数据溢写到磁盘中, // 则只须要将内存中的数据溢写到磁盘 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 返回排序后的迭代器 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } // 写完一个分区刷写一次 val segment = writer.commitAndGet() // 记录下分区的数据在文件中的位移 lengths(partitionId) = segment.length } } else {// 有溢写到磁盘的文件 // We must perform merge-sort; get an iterator by partition and write everything directly. // 封装一个用于归并各个溢写文件以及内存缓冲区数据的迭代器 // TODO 这个封装的迭代器是实现归并排序的关键 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } // 每写完一个分区,主动刷写一次,获取文件位移, // 这个位移就是写入的分区的位移, // reduce端在拉取数据时就会根据这个位移直接找到应该拉取的数据的位置 val segment = writer.commitAndGet() lengths(id) = segment.length } } } writer.close() // 写完后更新一些统计信息 context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) // 返回每一个reduce端分区数据在文件中的位移信息 lengths }
仍然回到SortShuffleWriter.write方法,最后一步调用了IndexShuffleBlockResolver.writeIndexFileAndCommit方法,
这个方法的做用主要是将每一个的分区的位移值写入到一个索引文件中,并将临时的索引文件和临时的数据文件重命名为正常的文件名(重命名操做是一个原子操做)
我总结shuffle写数据的过程,能够分为两个主要的步骤: