本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
一张图我已经用过屡次了,不要见怪,由于毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:缓存
官方英文介绍以下:架构
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
复制代码
管理基于排序的shuffle(也即输入的记录按照目标分区ID排序数据,这些记录最终会输出一份正式的单独文件到磁盘,一个是写,一个是读)app
写的话,举例如:经过SortShuffleWriter,利用其Write()函数,把MapTask的数据通过缓冲区,聚合排序后,写入磁盘。中间过程如溢出,Merge等操做,最终落盘。框架
读的话,举例如:经过BlockStoreShuffleReader,利用其read()方法,利用ShuffleBlockFetcherIterator来实现数据的迭代读取,经过缓冲区,聚合,排序到内存中,部分会溢出到磁盘。ide
英文解释,很是精准:函数
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
* to produce the final output file.
复制代码
numMapsForShuffle :成员变量,shuffle ID 与map任务的数量之间的映射关系。oop
shuffleBlockResolver :IndexShuffleBlockResolverpost
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
* The offsets of the data blocks in the data file are stored in a separate index file.
*
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
复制代码
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
复制代码
根据map任务的输出的分区数据文件中从startPartition to endPartition-1范围内的数据进行读取的读取器(BlockStoreShuffleReader)学习
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
复制代码
用于根据ShuffleHandle获取ShuffleWriter。
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf) <=点睛之笔
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf) <=点睛之笔
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) <=点睛之笔
}
}
复制代码
秦凯新 于深圳