本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
上面图展现了Spark基于任务缓存的数据结构,以下简要介绍不一样数据结构的用途:数组
AppendOnlyMap:封装了任务Task基于内存进行插入,更新,聚合,排序等基本操做方法。缓存
SizeTrackingAppendOnlyMap:以自身的大小进行样本采集和大小估算。数据结构
PartionedAppendOnlyMap:架构
(1)重载了特质WritablePartitionedPairCollection的partitionedDestructiveSortedIterator方法,在该虚方法中调用了AppendOnlyMap的destructiveSortedIterator对底层数组进行整理和排序后得到迭代器。app
(2)重载了特质WritablePartitionedPairCollection的insert方法,使其插入时,根据分区(partition, key)做为key。框架
PartitionedPairBuffer:内存缓存结构,主要功能是插入值是有顺序的,主要起缓冲做用,只有顺序插入,没有更新和聚合操做。也即没有changeValue(聚合)和update(更新)操做。ide
ShuffleManager是一个特质,其惟一的实现类是 SortShuffleManager,2.0移除了HashShuffleManager。那么他的主要功能是什么呢?主要是用来对shuffle进行管理。SortShuffleManager依赖于存储体系,来完成shuffle过程当中MapTask任务数据到内存(AppendOnlyMap)到Spill到磁盘的过程,该过程会涉及到排序,聚合等复杂操做。函数
由上图2.1 逻辑架构所示, ExternalSorter 是SortShuffleManager的底层组件之一,提供的主要功能以下;oop
aggregator :对map任务的输出数据进行聚合的聚合器。
partioner:对map任务的输出数据按照key计算分区的分区计算器Partioner。
ordering: 对map任务的输出数据按照key进行排序的实现类
map: 发现什么没?聚合时使用。 new PartitionedAppendOnlyMap[K, C],当设置了聚合器时,map端将中间结果溢出到磁盘前,先利用此数据结构在内存中对中间结果进行聚合。
Data structures to store in-memory objects before we spill. Depending on whether we have an
Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
store them in an array buffer.
复制代码
buffer : 发现什么没?不聚合时使用,new PartitionedPairBuffer[K, C],当没有设置聚合器时,map端将中间结果溢出到磁盘前,先利用此数据结构将中间结果存储到内存中。
keyComparator : 用于在分区内按照key的hash值进行比较的默认比较器。
spills :缓存溢出的文件数组,new ArrayBuffer[SpilledFile],代码段以下:
private[this] case class SpilledFile(
file: File,
blockId: BlockId,
serializerBatchSizes: Array[Long],
elementsPerPartition: Array[Long])
复制代码
_peakMemoryUsedBytes : 内存中数据结构大小的峰值,用于maybeSpillCollection判断。
Peak size of the in-memory data structure observed so far, in bytes
复制代码
initialMemoryThreshold : 对集合的内存使用进行跟踪的初始内存阈值,由SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)来决定。
myMemoryThreshold :初始时等于initialMemoryThreshold,用于maybe Spill的判断。
spark map任务在执行结束后,会将数据写入磁盘,注意最终都会落盘。但在写入磁盘以前,spark会对Map任务的输出在内存中进行排序或者聚合。注意在内存中,不是在磁盘中。看下面源码吧,清晰的说明了一切。
mergeValue 函数做用在于将新的Value值合并到聚合的结果中。
createCombiner 函数用于建立聚合的初始值。
update 函数做用是当有新的value值时,即(records.hasNext有值时),update函数调用mergeValue将新的value值合并到以前聚合的结果中。不然会调用createCombiner函数以value做为聚合的初始值。
将分区索引与key做为调用AppendOnlyMap的changeValue方法的参数能够。
maybeSpillCollection函数进行可能的磁盘溢出。
insertAll :可谓气绝古今,一鼓作气,若想实现内存缓冲,insertAll就是MapTask数据写入入口。
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update) ====>气绝古今的亮点
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) ====>气绝古今的亮点
maybeSpillCollection(usingMap = false)
}
}
}
复制代码
maybeSpillCollection 用因而否溢出的内存判断,虽然ExternalSorter使用了PartitionedAppendOnlyMap和PartitionedPairBuffer,若数据量较少时不会有问题,一旦数据暴增时,将会引发系统OOM.
maybeSpillCollection控制了数据写入磁盘的频率,若shuffle写入磁盘频率太高,容易下降磁盘I/O的效率。
使用了内存样本采集和大小估算的PartitionedAppendOnlyMap和PartitionedPairBuffer。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
复制代码
估算超过阈值,开始溢出操做,根据myMemoryThreshold进行判断。
* Spill some data to disk to release memory, which will be called by TaskMemoryManager
* when there is not enough memory for the task.
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection) ====>气绝古今的亮点
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
复制代码
由于PartitionedAppendOnlyMap的底层是散列存储,所以溢出过程当中:
发现了什么,调用了PartitionedAppendOnlyMap.destructiveSortedWritablePartitionedIterator,用到了独孤九剑第一式。==>整理==>排序==>返回迭代器
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
复制代码
(1)迭代写入磁盘
(2)建立惟一的blockId和文件,调用diskBlockManager开始写入文件。
(3)按照分区顺序排序。
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
: SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
// These variables are reset after each flush
var objectsWritten: Long = 0
val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is committed at the end of this process.
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
var success = false
try {
while (inMemoryIterator.hasNext) {
val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
inMemoryIterator.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1
if (objectsWritten == serializerBatchSize) {
flush()
}
}
if (objectsWritten > 0) {
flush()
} else {
writer.revertPartialWritesAndClose()
}
success = true
} finally {
if (success) {
writer.close()
} else {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
}
}
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
复制代码
}
ExternalSorter的writePartitionedFile闪亮登场,经过整合内存和多个溢写文件,最终每一个MapTask只会生成一份正式的Block文件。
注意磁盘中若没有溢出文件,spills数组为空,则直接按照分区顺序写入磁盘便可。
注意磁盘中如有溢出文件,spills数组不为空,则开始读取磁盘文件到内存,进行聚合排序后统一写成正式的Block文件。
* Write all the data added into this ExternalSorter into a file in the disk store. This is
* called by the SortShuffleWriter.
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition() ====>气绝古今的亮点
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) { ====>气绝古今的亮点
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
复制代码
先看看Spark官方的解释吧,经过groupByPartition会给每个分区生成一个IteratorForPartition迭代器:
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
* partition without reading the previous one). Guaranteed to return a key-value pair for each
* partition, in order of partition ID.
*
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
* Exposed for testing.
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) { ====>气绝古今的亮点,从新判断一次
* Special case: if we have only in-memory data, we don't need to merge streams,
*and perhaps we don't even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator( ====>气绝古今的亮点
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator))) ====>气绝古今的亮点
}
}
复制代码
经过groupByPartition会给每个分区生成一个IteratorForPartition迭代器,实际上使用的都是相同的数据源,重点在这里IteratorForPartition,成精了:
* Given a stream of ((partition, key), combiner) pairs
* assumed to be sorted by partition ID*,group together the pairs for each partition
* into a sub-iterator.param data an iterator of elements, assumed to already be sorted
* by partition ID
private def groupByPartition(data: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] =
{
val buffered = data.buffered
(0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered)))
}
复制代码
IteratorForPartition可谓是煞费苦心,直接经过判断给定的partitionId和数据源中的分区Id是否对应的上,来过滤源数据:
* An iterator that reads only the elements for a given partition ID from an
* underlying buffered,stream, assuming this partition is the next one to be read. Used to
* make it easier to return partitioned iterators from our in-memory collection.
private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)])
extends Iterator[Product2[K, C]]
{
override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId
override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val elem = data.next()
(elem._1._2, elem._2)
}
}
复制代码
* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
* inside each partition. This can be used to either write out a new file or return data to
* the user.
*
* Returns an iterator over all the data written to this object, grouped by partition. For each
* partition we then have an iterator over its contents, and these are expected to be accessed
* in order (you can't "skip ahead" to one partition without reading the previous one).
* Guaranteed to return a key-value pair for each partition, in order of partition ID.
复制代码
桃李不言,下自成蹊。merge最终返回一个迭代器,方便按照分区顺序写正式的文件:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_)) ====>气绝古今的亮点,读磁盘到内存
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) ====>气绝古今的亮点
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) ====>气绝古今的亮点
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get)) ====>气绝古今的亮点
} else {
(p, iterators.iterator.flatten) ====>气绝古今的亮点,简单累加,输出
}
}
}
复制代码
spark map任务在执行结束后,须要进行持久化过程,所以会出现两种状况:
几经易稿,终于成文,需进一步完善。
秦凯新 于深圳