Shuffle过程主要分为Shuffle write和Shuffle read两个阶段,2.0版本以后hash shuffle被删除,只保留sort shuffle,下面结合代码分析:java
Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManagerapache
// Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
这里能够看到包含sort和tungsten-sort两种shuffle,经过反射建立了ShuffleManager,ShuffleManager是一个特质,核心方法有下面几个:数组
private[spark] trait ShuffleManager { /** * 注册一个shuffle返回句柄 */ def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle /** 获取一个Writer根据给定的分区,在executors执行map任务时被调用 */ def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] /** * 获取一个Reader根据reduce分区的范围,在executors执行reduce任务时被调用 */ def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] ... }
SortShuffleManager是ShuffleManager的惟一实现类,对于以上三个方法的实现以下:缓存
/** * Obtains a [[ShuffleHandle]] to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { // 1.首先检查是否符合BypassMergeSort if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) // 2.不然检查是否可以序列化 } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: new SerializedShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, dependency) } }
1.首先检查是否符合BypassMergeSort,这里须要知足两个条件,首先是当前shuffle依赖中没有map端的聚合操做,其次是分区数要小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,若是知足这两个条件,会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制数据结构
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) { false } else { // 默认值为200 val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold } }
2.若是不知足上面条件,检查是否知足canUseSerializedShuffle()方法,若是知足该方法中的3个条件,则会返回SerializedShuffleHandle,启用tungsten-sort shuffle机制app
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions // 序列化器须要支持Relocation if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false // 不能有map端聚合操做 } else if (dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + s"map-side aggregation") false // 分区数不能大于16777215+1 } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } }
3.若是以上两个条件都不知足的话,会返回BaseShuffleHandle,采用基本sort shuffle机制ide
/** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. */ override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) }
这里返回BlockStoreShuffleReader函数
/** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ => new OpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) } val env = SparkEnv.get // 根据handle获取不一样ShuffleWrite handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, metrics, shuffleExecutorComponents) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, mapId, env.conf, metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter( shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) } }
这里会根据handle获取不一样ShuffleWrite,若是是SerializedShuffleHandle,使用UnsafeShuffleWriter,若是是BypassMergeSortShuffleHandle,采用BypassMergeSortShuffleWriter,不然使用SortShuffleWriterui
如上文所说,当开启bypass机制后,会使用BypassMergeSortShuffleWriter,若是serializer支持relocation而且map端没有聚合同时分区数目不大于16777215+1三个条件都知足,使用UnsafeShuffleWriter,不然使用SortShuffleWriterthis
BypassMergeSortShuffleWriter继承ShuffleWriter,用java实现,会将map端的多个输出文件合并为一个文件,同时生成一个索引文件,索引记录到每一个分区的初始地址,write()方法以下:
@Override public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); // 新建一个ShuffleMapOutputWriter ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions); try { // 若是没有数据的话 if (!records.hasNext()) { // 返回全部分区的写入长度 partitionLengths = mapOutputWriter.commitAllPartitions(); // 更新mapStatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); // 建立和分区数相等的DiskBlockObjectWriter FileSegment partitionWriters = new DiskBlockObjectWriter[numPartitions]; partitionWriterSegments = new FileSegment[numPartitions]; // 对于每一个分区 for (int i = 0; i < numPartitions; i++) { // 建立一个临时的block final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); // 获取temp block的file和id final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); // 对于每一个分区,建立一个DiskBlockObjectWriter partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. // 建立文件和写入文件都须要大量时间,也须要包含在shuffle写入时间里面 writeMetrics.incWriteTime(System.nanoTime() - openStartTime); // 若是有数据的话 while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); // 对于每条数据按key写入相应分区对应的文件 partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { try (DiskBlockObjectWriter writer = partitionWriters[i]) { // 提交 partitionWriterSegments[i] = writer.commitAndGet(); } } // 将全部分区文件合并成一个文件 partitionLengths = writePartitionedData(mapOutputWriter); // 更新mapStatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); } catch (Exception e) { try { mapOutputWriter.abort(e); } catch (Exception e2) { logger.error("Failed to abort the writer after failing to write map output.", e2); e.addSuppressed(e2); } throw e; } }
合并文件的方法writePartitionedData()以下,默认采用零拷贝的方式来合并文件:
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException { // Track location of the partition starts in the output file if (partitionWriters != null) { // 开始时间 final long writeStartTime = System.nanoTime(); try { for (int i = 0; i < numPartitions; i++) { // 获取每一个文件 final File file = partitionWriterSegments[i].file(); ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i); if (file.exists()) { // 采起零拷贝方式 if (transferToEnabled) { // Using WritableByteChannelWrapper to make resource closing consistent between // this implementation and UnsafeShuffleWriter. Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper(); // 在这里会调用Utils.copyFileStreamNIO方法,最终调用FileChannel.transferTo方法拷贝文件 if (maybeOutputChannel.isPresent()) { writePartitionedDataWithChannel(file, maybeOutputChannel.get()); } else { writePartitionedDataWithStream(file, writer); } } else { // 不然采起流的方式拷贝 writePartitionedDataWithStream(file, writer); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } } finally { writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; } return mapOutputWriter.commitAllPartitions(); }
UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法以下:
@Override public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { // Keep track of success so we know if we encountered an exception // We do this rather than a standard try/catch/re-throw to handle // generic throwables. // 跟踪异常 boolean success = false; try { while (records.hasNext()) { // 将数据插入ShuffleExternalSorter进行外部排序 insertRecordIntoSorter(records.next()); } // 合并并输出文件 closeAndWriteOutput(); success = true; } finally { if (sorter != null) { try { sorter.cleanupResources(); } catch (Exception e) { // Only throw this error if we won't be masking another // error. if (success) { throw e; } else { logger.error("In addition to a failure during writing, we failed during " + "cleanup.", e); } } } } }
这里主要有两个方法:
@VisibleForTesting void insertRecordIntoSorter(Product2<K, V> record) throws IOException { assert(sorter != null); // 获取key和分区 final K key = record._1(); final int partitionId = partitioner.getPartition(key); // 重置缓冲区 serBuffer.reset(); // 将key和value写入缓冲区 serOutputStream.writeKey(key, OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG); serOutputStream.flush(); // 获取序列化数据大小 final int serializedRecordSize = serBuffer.size(); assert (serializedRecordSize > 0); // 将序列化后的数据插入ShuffleExternalSorter处理 sorter.insertRecord( serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId); }
该方法会将数据进行序列化,而且将序列化后的数据经过insertRecord()方法插入外部排序器中,insertRecord()方法以下:
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException { // for tests assert(inMemSorter != null); // 若是数据条数超过溢写阈值,直接溢写磁盘 if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } // Checks whether there is enough space to insert an additional record in to the sort pointer // array and grows the array if additional space is required. If the required space cannot be // obtained, then the in-memory data will be spilled to disk. // 检查是否有足够的空间插入额外的记录到排序指针数组中,若是须要额外的空间对数组进行扩容,若是空间不够,内存中的数据将会被溢写到磁盘上 growPointerArrayIfNecessary(); final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. // 须要额外的4或8个字节存储数据长度 final int required = length + uaoSize; // 若是须要更多的内存,会想TaskMemoryManager申请新的page acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); //Given a memory page and offset within that page, encode this address into a 64-bit long. //This address will remain valid as long as the corresponding page has not been freed. // 经过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型 final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); // 写长度值 UnsafeAlignedOffset.putSize(base, pageCursor, length); // 移动指针 pageCursor += uaoSize; // 写数据 Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); // 移动指针 pageCursor += length; // 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序 inMemSorter.insertRecord(recordAddress, partitionId); }
在这里对于数据的缓存和溢写不借助于其余高级数据结构,而是直接操做内存空间
growPointerArrayIfNecessary()方法以下:
/** * Checks whether there is enough space to insert an additional record in to the sort pointer * array and grows the array if additional space is required. If the required space cannot be * obtained, then the in-memory data will be spilled to disk. */ private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); // 若是没有空间容纳新的数据 if (!inMemSorter.hasSpaceForAnotherRecord()) { // 获取当前内存使用量 long used = inMemSorter.getMemoryUsage(); LongArray array; try { // could trigger spilling // 分配给缓存原来两倍的容量 array = allocateArray(used / 8 * 2); } catch (TooLargePageException e) { // The pointer array is too big to fix in a single page, spill. // 若是超出了一页的大小,直接溢写,溢写方法见后面 // 一页的大小为128M,在PackedRecordPointer类中 // static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes spill(); return; } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); throw e; } return; } // check if spilling is triggered or not if (inMemSorter.hasSpaceForAnotherRecord()) { // 若是有了剩余空间,则代表不必扩容,释放分配的空间 freeArray(array); } else { // 不然把原来的数组复制到新的数组 inMemSorter.expandPointerArray(array); } } }
spill()方法以下:
@Override public long spill(long size, MemoryConsumer trigger) throws IOException { if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) { return 0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() > 1 ? " times" : " time"); // Sorts the in-memory records and writes the sorted records to an on-disk file. // This method does not free the sort data structures. // 对内存中的数据进行排序而且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构 writeSortedFile(false); final long spillSize = freeMemory(); // 重置ShuffleInMemorySorter inMemSorter.reset(); // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the // records. Otherwise, if the task is over allocated memory, then without freeing the memory // pages, we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; }
writeSortedFile()方法:
private void writeSortedFile(boolean isLastFile) { // This call performs the actual sort. // 返回一个排序好的迭代器 final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords = inMemSorter.getSortedIterator(); // If there are no sorted records, so we don't need to create an empty spill file. if (!sortedRecords.hasNext()) { return; } final ShuffleWriteMetricsReporter writeMetricsToUse; // 若是为true,则为输出文件,不然为溢写文件 if (isLastFile) { // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. writeMetricsToUse = writeMetrics; } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = new ShuffleWriteMetrics(); } // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; // 建立一个字节缓冲数组,大小为1m final byte[] writeBuffer = new byte[diskWriteBufferSize]; // Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. // 建立一个临时的shuffle block final Tuple2<TempShuffleBlockId, File> spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); // 获取文件和id final File file = spilledFileInfo._2(); final TempShuffleBlockId blockId = spilledFileInfo._1(); final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. // Our write path doesn't actually use this serializer (since we end up calling the `write()` // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. // 不作任何转换的序列化器,由于须要一个实例来构造DiskBlockObjectWriter final SerializerInstance ser = DummySerializerInstance.INSTANCE; int currentPartition = -1; final FileSegment committedSegment; try (DiskBlockObjectWriter writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // 遍历 while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition); if (partition != currentPartition) { // Switch to the new partition // 若是切换到了新的分区,提交当前分区,而且记录当前分区大小 if (currentPartition != -1) { final FileSegment fileSegment = writer.commitAndGet(); spillInfo.partitionLengths[currentPartition] = fileSegment.length(); } // 而后切换到下一个分区 currentPartition = partition; } // 获取指针,经过指针获取页号和偏移量 final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); final Object recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); // 获取剩余数据 int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); // 跳过数据前面存储的长度 long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); // 将数据拷贝到缓冲数组中 Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); // 从缓冲数组中转入DiskBlockObjectWriter writer.write(writeBuffer, 0, toTransfer); // 更新位置 recordReadPosition += toTransfer; // 更新剩余数据 dataRemaining -= toTransfer; } writer.recordWritten(); } // 提交 committedSegment = writer.commitAndGet(); } // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. // 记录溢写文件的列表 if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); } // 若是是溢写文件,更新溢写的指标 if (!isLastFile) { writeMetrics.incRecordsWritten( ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled( ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten()); } }
encodePageNumberAndOffset()方法以下:
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { // 若是开启了堆外内存,偏移量为绝对地址,可能须要64位进行编码,因为页大小限制,将其减去当前页的基地址,变为相对地址 if (tungstenMemoryMode == MemoryMode.OFF_HEAP) { // In off-heap mode, an offset is an absolute address that may require a full 64 bits to // encode. Due to our page size limitation, though, we can convert this into an offset that's // relative to the page's base offset; this relative offset will fit in 51 bits. offsetInPage -= page.getBaseOffset(); } return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } @VisibleForTesting public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page"; // 高13位为页号,低51位为偏移量 // 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFL return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS); }
ShuffleInMemorySorter的insertRecord()方法以下:
public void insertRecord(long recordPointer, int partitionId) { if (!hasSpaceForAnotherRecord()) { throw new IllegalStateException("There is no space for new record"); } array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++; }
PackedRecordPointer.packPointer()方法:
public static long packPointer(long recordPointer, int partitionId) { assert (partitionId <= MAXIMUM_PARTITION_ID); // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. // 将页号右移24位,和低27位拼在一块儿,这样逻辑地址被压缩成40位 final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24; final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS); // 将分区号放在高24位上 return (((long) partitionId) << 40) | compressedAddress; }
getSortedIterator()方法:
public ShuffleSorterIterator getSortedIterator() { int offset = 0; // 使用基数排序对内存分区ID进行排序。基数排序要快得多,可是在添加指针时须要额外的内存做为保留内存 if (useRadixSort) { offset = RadixSort.sort( array, pos, PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); // 不然采用timSort排序 } else { MemoryBlock unused = new MemoryBlock( array.getBaseObject(), array.getBaseOffset() + pos * 8L, (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter<PackedRecordPointer, LongArray> sorter = new Sorter<>(new ShuffleSortDataFormat(buffer)); sorter.sort(array, 0, pos, SORT_COMPARATOR); } return new ShuffleSorterIterator(pos, array, offset); }
@VisibleForTesting void closeAndWriteOutput() throws IOException { assert(sorter != null); updatePeakMemoryUsed(); serBuffer = null; serOutputStream = null; // 获取溢写文件 final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; try { // 合并溢写文件 partitionLengths = mergeSpills(spills); } finally { // 删除溢写文件 for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } } // 更新mapstatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); }
mergeSpills()方法:
private long[] mergeSpills(SpillInfo[] spills) throws IOException { long[] partitionLengths; // 若是没有溢写文件,建立空的 if (spills.length == 0) { final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); return mapWriter.commitAllPartitions(); // 若是只有一个溢写文件,将它合并输出 } else if (spills.length == 1) { Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId); if (maybeSingleFileWriter.isPresent()) { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. partitionLengths = spills[0].partitionLengths; maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); } // 若是有多个,合并输出,合并的时候有NIO和BIO两种方式 } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); } return partitionLengths; }
SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,若是超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对以前的全部输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法:
override def write(records: Iterator[Product2[K, V]]): Unit = { // 建立一个外部排序器,若是map端有预聚合,就传入aggregator和keyOrdering,不然不须要传入 sorter = if (dep.mapSideCombine) { new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 将数据放入ExternalSorter进行排序 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // 建立一个输出Wrtier val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) // 将外部排序的数据写入Writer sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) val partitionLengths = mapOutputWriter.commitAllPartitions() // 更新mapstatus mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) }
insertAll()方法:
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 是否须要map端聚合 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap // 使用AppendOnlyMap在内存中聚合values // 获取mergeValue()函数,将新值合并到当前聚合结果中 val mergeValue = aggregator.get.mergeValue // 获取createCombiner()函数,建立聚合初始值 val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null // 若是一个key当前有聚合值,则合并,若是没有建立初始值 val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 遍历 while (records.hasNext) { // 增长读取记录数 addElementsRead() kv = records.next() // map为PartitionedAppendOnlyMap,将分区和key做为key,聚合值做为value map.changeValue((getPartition(kv._1), kv._1), update) // 是否须要溢写到磁盘 maybeSpillCollection(usingMap = true) } // 若是不须要map端聚合 } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() // buffer为PartitionedPairBuffer,将分区和key加进去 buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) // 是否须要溢写到磁盘 maybeSpillCollection(usingMap = false) } } }
该方法主要是判断在插入数据时,是否须要在map端进行预聚合,分别采用两种数据结构来保存
maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否须要溢写,若是发生溢写,从新构造一个map或者buffer结构从头开始缓存,以下:
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() // 判断是否须要溢写 if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() // 判断是否须要溢写 if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } } protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false // 若是读取的记录数是32的倍数,而且预估map或者buffer内存占用大于默认的5m阈值 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool // 尝试申请2*currentMemory-5m的内存 val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) // 更新阈值 myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection // 判断,若是仍是不够,肯定溢写 shouldSpill = currentMemory >= myMemoryThreshold } // 若是shouldSpill为false,可是读取的记录数大于Integer.MAX_VALUE,也是须要溢写 shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { // 溢写次数+1 _spillCount += 1 logSpillage(currentMemory) // 溢写缓存的集合 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 释放内存 releaseMemory() } shouldSpill }
maybeSpill()方法里面会调用spill()进行溢写,以下:
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 根据给定的比较器进行排序,返回排序结果的迭代器 val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 将迭代器中的数据溢写到磁盘文件中 val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) // ArrayBuffer记录全部溢写的文件 spills += spillFile }
spillMemoryIteratorToDisk()方法以下:
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. // 建立一个临时块 val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics // 建立溢写文件的DiskBlockObjectWriter val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) // List of batch sizes (bytes) in the order they are written to disk // 记录写入批次大小 val batchSizes = new ArrayBuffer[Long] // How many elements we have in each partition // 记录每一个分区条数 val elementsPerPartition = new Array[Long](numPartitions) // Flush the disk writer's contents to disk, and update relevant variables. // The writer is committed at the end of this process. // 将内存中的数据按批次刷写到磁盘中 def flush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false try { // 遍历map或者buffer中的记录 while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") // 写入并更新计数值 inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 // 写入条数达到10000条时,将这批刷写到磁盘 if (objectsWritten == serializerBatchSize) { flush() } } // 遍历完之后,将剩余的刷写到磁盘 if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } // 返回溢写文件 SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) }
接下来就是排序合并操做,调用ExternalSorter.writePartitionedMapOutput()方法:
def writePartitionedMapOutput( shuffleId: Int, mapId: Long, mapOutputWriter: ShuffleMapOutputWriter): Unit = { var nextPartitionId = 0 // 若是没有发生溢写 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 根据指定的比较器进行排序 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext()) { val partitionId = it.nextPartition() var partitionWriter: ShufflePartitionWriter = null var partitionPairsWriter: ShufflePartitionPairsWriter = null TryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(partitionId) val blockId = ShuffleBlockId(shuffleId, mapId, partitionId) partitionPairsWriter = new ShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics) // 将分区内的数据依次取出 while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(partitionPairsWriter) } } { if (partitionPairsWriter != null) { partitionPairsWriter.close() } } nextPartitionId = partitionId + 1 } // 若是发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter } else { // We must perform merge-sort; get an iterator by partition and write everything directly. // 这里会进行归并排序 for ((id, elements) <- this.partitionedIterator) { val blockId = ShuffleBlockId(shuffleId, mapId, id) var partitionWriter: ShufflePartitionWriter = null var partitionPairsWriter: ShufflePartitionPairsWriter = null TryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(id) partitionPairsWriter = new ShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics) if (elements.hasNext) { for (elem <- elements) { partitionPairsWriter.write(elem._1, elem._2) } } } { if (partitionPairsWriter != null) { partitionPairsWriter.close() } } nextPartitionId = id + 1 } } context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) }
partitionedIterator()方法:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID // 若是没有溢写,而且没有排序,只按照分区id排序 if (ordering.isEmpty) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) // 若是没有溢写可是排序,先按照分区id排序,再按key排序 } else { // We do need to sort by both partition ID and key groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data // 若是有溢写,就将溢写文件和内存中的数据归并排序 merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } }
归并方法以下:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = { // 读取溢写文件 val readers = spills.map(new SpillReader(_)) val inMemBuffered = inMemory.buffered // 遍历分区 (0 until numPartitions).iterator.map { p => val inMemIterator = new IteratorForPartition(p, inMemBuffered) // 合并溢写文件和内存中的数据 val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) // 若是有聚合逻辑,按分区聚合,对key按照keyComparator排序 if (aggregator.isDefined) { // Perform partial aggregation across partitions (p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) // 若是没有聚合,可是有排序逻辑,按照ordering作归并 } else if (ordering.isDefined) { // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); // sort the elements without trying to merge them (p, mergeSort(iterators, ordering.get)) // 什么都没有直接归并 } else { (p, iterators.iterator.flatten) } } }
在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,以下:
def writeIndexFileAndCommit( shuffleId: Int, mapId: Long, lengths: Array[Long], dataTmp: File): Unit = { // 建立索引文件和临时索引文件 val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { // 获取shuffle data file val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. // 对于每一个executor只有一个IndexShuffleBlockResolver,确保原子性 synchronized { // 检查索引是否和数据文件已经有了对应关系 val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. // 若是存在对应关系,说明shuffle write已经完成,删除临时索引文件 System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } } else { // 若是不存在,建立一个BufferedOutputStream // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. // 获取每一个分区的大小,累加偏移量,写入临时索引文件 var offset = 0L out.writeLong(offset) for (length <- lengths) { offset += length out.writeLong(offset) } } { out.close() } // 删除可能存在的其余索引文件 if (indexFile.exists()) { indexFile.delete() } // 删除可能存在的其余数据文件 if (dataFile.exists()) { dataFile.delete() } // 将临时文件重命名成正式文件 if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } } } } finally { if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } } }
Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager,包含sort和tungsten-sort两种shuffle
ShuffleManager是一个特质,核心方法有registerShuffle()、getReader()、getWriter(),
SortShuffleManager是ShuffleManager的惟一实现类,在registerShuffle()方法里面选择采用哪一种shuffle机制,getReader()方法只会返回一种BlockStoreShuffleReader,getWriter()方法根据不一样的handle选择不一样的Writer,共有三种
BypassMergeSortShuffleWriter:若是当前shuffle依赖中没有map端的聚合操做,而且分区数小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,启用bypass机制,核心方法有:write()、writePartitionedData()(合并全部分区文件,默认采用零拷贝方式)
UnsafeShuffleWriter:若是serializer支持relocation而且map端没有聚合同时分区数目不大于16777215+1三个条件都知足,采用该Writer,核心方法有:write()、insertRecordIntoSorter()(将数据插入外部选择器排序)、closeAndWriteOutput()(合并并输出文件),前一个方法里核心方法有:insertRecord()(将序列化数据插入外部排序器)、growPointerArrayIfNecessary()(若是须要额外空间须要对数组扩容或溢写到磁盘)、spill()(溢写到磁盘)、writeSortedFile()(将内存中的数据进行排序并写出到磁盘文件中)、encodePageNumberAndOffset()(对当前数据的逻辑地址进行编码,转成long型),后面的方法里核心方法有:mergeSpills()(合并溢写文件),合并文件的时候有BIO和NIO两种方式
SortShuffleWriter:若是上面二者都不知足,采用该Writer,该Writer会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,若是超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对以前的全部输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合核心方法有:write()、insertAll()(将数据放入ExternalSorter进行排序)、maybeSpillCollection()(是否须要溢写到磁盘)、maybeSpill()、spill()、spillMemoryIteratorToDisk()(将内存中数据溢写到磁盘)、writePartitionedMapOutput()、commitAllPartitions()里面调用writeIndexFileAndCommit()方法写出数据和索引文件