本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
ShuffleExternalSorter和 ExternalSorter 外部排序器功能相似,可是也有不一样的地方。不过在详细剖析ShuffleExternalSorter以前,咱们先看看ShuffleExternalSorter在下图中所处的位置。能够看到最终的调用方是unsafeShuffleWriter。在下一节,我会详细剖析UnsafeShuffleWriter。数组
ShuffleInMemorySorter :用于在内存中对插入的记录进行排序,算法仍是TimSort。缓存
spills :溢出文件的元数据信息列表。数据结构
numElementsForSpillThreshold :磁盘溢出的元素数量。能够经过spark.shuffle.spill.numElementsForceSpillThreshold属性来进行配置,默认是1M架构
taskMemoryManager:app
allocatedPages:已经分配的Page列表(即MemoryBlock)列表框架
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
* itself)。
复制代码
数据溢出,经过inMemSorter.numRecords() >= numElementsForSpillThreshold来判断,若知足直接溢出操做。oop
growPointerArrayIfNecessary:进行空间检查和数据容量扩容。post
acquireNewPageIfNecessary:进行空间检查,若不知足申请新page。学习
Platform.copyMemory:将数据拷贝到Page所表明的的内存块中。
inMemSorter.insertRecord:将记录的元数据存到内部的长整型数组中,便于排序。其中高24位是存储分区ID,中间13位为存储页号,低27位存储偏移量。
Write a record to the shuffle sorter.
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
throws IOException {
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { <= 神来之笔
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
}
growPointerArrayIfNecessary(); <= 神来之笔
// Need 4 bytes to store the record length.
final int required = length + 4;
acquireNewPageIfNecessary(required);
assert(currentPage != null);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, length);
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); <= 神来之笔
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId); <= 神来之笔,排序后写入内存。
}
复制代码
writeSortedFile:做用在于将内存中的记录排序后输出到磁盘中,排序规则有两种: 一种:对分区ID进行排序。二种是采用基数排序(Radix Sort)
public long spill(long size, MemoryConsumer trigger) throws IOException {
if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
return 0L;
}
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() > 1 ? " times" : " time");
writeSortedFile(false); <= 神来之笔
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages
holding the records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
复制代码
本篇须要挖掘的点还有不少,鉴于可参考的资料太少,只能暂时到此结束,后续会继续完善
秦凯新 于深圳