本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
一张图我已经用过屡次了,不要见怪,由于毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:数组
官方英文介绍以下:缓存
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
复制代码
static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;架构
private final BlockManager blockManager;app
private final IndexShuffleBlockResolver shuffleBlockResolver;框架
private final TaskMemoryManager memoryManager;oop
private final SerializerInstance serializer;post
private final Partitioner partitioner;学习
private final ShuffleWriteMetrics writeMetrics;this
private final int shuffleId;
private final int mapId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。
private final int initialSortBufferSize =>初始化的排序缓冲大小,能够经过spark.shuffle.sort.initialBuffer.size属性设置,默认是4096
private final int inputBufferSizeInBytes;
private final int outputBufferSizeInBytes;
@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
private long peakMemoryUsedBytes = 0; =>使用内存的峰值
看看精彩的代码段:
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next()); <=点睛之笔(将mapTask数据写入排序器)
}
closeAndWriteOutput(); <=点睛之笔(将mapTask数据持久化到磁盘)
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
复制代码
将mapTask数据写入排序器,实现内存中排序,可是无聚合
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key); <=点睛之笔
serBuffer.reset();
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
sorter.insertRecord( <=点睛之笔,将serBuffer字节数组写入Tungsten
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
复制代码
将mapTask数据持久化到磁盘
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
try {
partitionLengths = mergeSpills(spills, tmp); <=点睛之笔(合并全部溢出文件为正式Block文件)
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); <=点睛之笔(写索引)
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
复制代码
UnsafeShuffleWriter内部主要使用Tungsten缓存,固然也可能使用JVM内存。和ExternalSortWriter有明显的区别。
秦凯新 于深圳 1:19