spark shuffle写操做三部曲之SortShuffleWriter

提出问题

1. spark shuffle的预聚合操做是如何作的,其中底层的数据结构是什么?在数据写入到内存中有预聚合,在读溢出文件合并到最终的文件时是否也有预聚合操做?html

2. shuffle数据的排序是如何作的? 分区内的数据是不是有序的?如有序,spark 内部是按照什么排序算法来排序每个分区上的key的?算法

3. shuffle的溢出操做和TaskMemoryManager的关系?apache

4. 在数据溢出阶段,内存中数据的排序是使用算法进行排序的?数组

5. 在溢出文件数据合并阶段,内存中的数据的排序是使用的什么算法?缓存

6. 为何在读取溢出文件到内存中时,返回的结果是迭代器而不是直接的数据结果?数据结构

。。。。。。还有不少的细节。函数

前言

咱们先来回首前几篇文章的关系: spark 源码分析之二十一 -- Task的执行流程 从调度的角度说明了TaskScheduler是如何调度任务的,其中任务的执行目前为止写了三篇文章,分别是 剖析Task运行时内存的管理的 spark 源码分析之二十二-- Task的内存管理,剖析shuffle写操做执行前的准备工做,引出了三种shuffle的写方式,前两篇文章分别介绍了 spark shuffle写操做三部曲之UnsafeShuffleWriter 和 spark shuffle写操做三部曲之BypassMergeSortShuffleWriter 前两种shuffle的写的方式。本篇文章来剖析最后一种 shuffle 写的方式。源码分析

咱们先来看第三种shuffle的相关依赖类。post

SizeTrackingAppendOnlyMap

这个类继承了AppendOnlyMap并实现了SizeTracker trait。fetch

其内部方法以下:

它依赖的类都是其父类,他只是它的两个父类的拼凑,因此要想了解真正的动做,仍是须要去看其父类AppendOnlyMap和trait SizeTracker。

父类AppendOnlyMap

这个类继承了Iterable trait和 Serializable 接口。

其类结构以下:

成员变量

成员变量以下:

LOAD_FACTOR:负载因子,为0.7,实际存储数据占比大于负载因子则须要扩容。

mask的做用:将任意的数映射到[0,mask]的范围内。

data:是真正保存数据的数组。

haveNullValue:是否有null值,由于数组中的null值还有一个做用,那就是表示该索引位置没有元素存在。

nullValue:null值。

destoryed:表示数据是否已经被销毁。

理论最大容量为:512MB

成员方法以下:

根据key获取value

 

解释:

1.若是是null值,则返回null值,由于约定 null值key对应null值value。

2. 首先先把原来的hashcode再求一次hash码,而后和掩码作与操做将其映射到 [0,mask] 范围内。

3. 尝试取出数据若是取出来的key是指定的key,则返回数据,若取出的key是null,表示以前没有保存过,返回null,若取出的数据的key不是当前key,则使用再散列法 先有pos + delta逐步散列,求得下一次的pos,而后再重复第三步,直至找匹配的值或null值后返回。

设置键值对

更新键值思路:跟查找的思路同样,只不过找到以后不返回,是执行更新操做。

在指定key的value上执行函数

更新键值思路:跟查找的思路同样,只不过找到以后不返回,若是找的的值是null值,则执行赋值操做,不然更新value为执行更新函数后的值。

获取未排序的迭代器

本质上是遍历数组,只不过这里的元素是稀疏的,只返回有元素的数据,不作过多说明。

先整理数组,将数组的数据变为紧凑的数据。再按照key来进行排序。最后返回一个迭代器,这个迭代器里的数据是有序的。

rehash

扩容

若是当前使用容量占比大于负载因子,则开始扩容。

新容量是旧容量的一倍。遍历旧的数组中的每个非null元素,将其映射到新的数组中。

父类SizeTracker

A general interface for collections to keep track of their estimated sizes in bytes. We sample with a slow exponential back-off using the SizeEstimator to amortize the time, as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).

集合的通用接口,用于跟踪其估计的大小(以字节为单位)。 咱们使用SizeEstimator以缓慢的指数退避进行采样以分摊时间,由于每次调用SizeEstimator都有点昂贵。

成员变量

SAMPLE_GROWTH_RATE指数增加因子,好比是2,则是 1,2,4,8,16,......

 

核心方法以下:

采样

估算大小

重采样

更新后采样

依赖类 -- SizeEstimator

主要用于数据占用内存的估算。

ExternalAppendOnlyMap

继承关系

其继承关系以下:

其父类是Spillable抽象类。

先来看父类Spillable

超类--Spillable

类说明:当内存不足时,这个类会把内存里的集合溢出到磁盘中。

其成员变量以下,不作过多解释。

主要方法以下:

溢出内存到磁盘

它实现了父类的抽象方法 spill方法,源码以下:

思路:若是consumer不是这个类而且内存模式是堆内内存才支持内存溢出。

其依赖方法以下:

org.apache.spark.util.collection.Spillable#forceSpill源码以下,它是一个抽象方法,没有具体实现。

释放内存方法,其调用了 父类的freeMemory方法:

尝试溢出来释放内存

org.apache.spark.util.collection.Spillable#maybeSpill 源码以下:

其依赖方法spill方法以下,注意这个方法是用来溢出集合的数据到内存的,它是抽象方法,待子类实现。

这个类留给子类两个方法来实现,forceSpill和spill方法。

ExternalAppendOnlyMap这个类里面的是对 SizeTrackingAppendOnlyMap 的进一步封装,下面咱们先看 SizeTrackingAppendOnlyMap。

数据比较器 -- HashComparator

其源码以下:

总之,它是根据哈希码进行比较的。

SpillableIterator

首先,它是org.apache.spark.util.collection.ExternalAppendOnlyMap的内部类,实现了Iterator trait,它是跟ExternalAppendOnlyMap一块儿使用的,也使用了 ExternalAppendOnlyMap 里的方法。

成员变量

其成员变量以下:

SPILL_LOCK是一个对象锁,每次执行溢出操做都会先获取锁再执行溢出操做,执行完毕后释放锁。

cur表示下一个未读的元素。

hasSpilled表示是否有溢出。

核心方法

1.溢出

其源码以下:

2.销毁数据释放内存

其依赖方法 org.apache.spark.util.collection.ExternalAppendOnlyMap#freeCurrentMap 以下:

3. 读取下一个

4. 是否有下一个

5. 获取下一个元素

6. 转换为CompletionIterator

总结

从本质来来讲,它是一个包装类,数据从构造方法以Iterator的形式传递过来,而它本身也是一个Iterator,除了实现了Iterator自己的方法外,还具有了溢出到磁盘、销毁内存数据、转换为CompletionIterator的功能。

DiskMapIterator

这个类就是用来读取文件的数据的,只不过文件被划分为了多个文件段,有一个数组专门记录这多个文件段的段大小,如构造函数所示:

其中file就是要读取的数据文件,blockId表示文件在shuffle系统中对应的blockId,batchSize就是指的每个文件段的大小。

成员变量以下:

 

下面从Iterator的主要方法入手,去剖析整个类。

是否有下一个元素

其依赖方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#readNextItem 源码以下:

思路:首先先读取下一个key-value对,若读取完毕后,发现这个批次的数据已经读取完毕,则调用 nextBatchStream 方法,关闭现有反序列化流,初始化读取下一个文件段的反序列化流。

 

其依赖方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#nextBatchStream 以下:

思路:首先先肯定该批次的数据是否读取完毕,若读取完毕,则作完清理操做后,返回null值,不然先关闭现有的反序列化流,而后获取下一个反序列化流的开始和结束offset,最后初始化一个反序列化流返回给调用端。

其依赖方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#cleanup 方法以下:

思路:首先关闭现有的反序列化流和文件流,最后若是文件存在,则删除之。

读取下一个元素

思路很简单,其中,nextItem已经在是否有下一个元素的时候反序列化出来了。

构造方法

它有两个重载的构造方法:

解释一下其中的参数:

createCombiner:是根据一个原值来建立其combine以后的值的函数。

mergeValue:是根据一个combine以后的值和一个原值求combine以后的值的函数。

mergeCombiner:是根据两个combine以后的值求combine以后的值函数。

本质上这几个函数就是逐步归并聚合的体现。

 

成员变量

serializerBatchSize:表示每次溢出时,写入文件的批次大小,这个批次是指的写入的对象的次数,而不是一般意义上的buffer的缓冲区大小。

_diskBytesSpilled :表示总共溢出的字节大小

fileBufferSize: 文件缓存大小,默认为 32k

_peakMemoryUsedBytes: 表示内存使用峰值

keyComparater:表示内存排序的比较器

核心方法

插入数据

溢出操做

思路:首先先调用currentMap的destructiveSortedIterator方法,先整理其内部的数据成紧凑的数据,而后对数据进行排序,最终有序数据以Iterator的结果返回。而后调用 

将数据溢出到磁盘,最后将溢出的信息记录到spilledMaps中,其依赖方法 org.apache.spark.util.collection.ExternalAppendOnlyMap#spillMemoryIteratorToDisk 源码以下:

思路:建立本地临时block,并获取其writer,最终遍历内存数组的迭代器,将数据都经过writer写入到file中,其中写文件是分批写入的,即每次知足serializerBatchSize大小以后,执行flush写入,最后执行一次flush写入,关闭文件,最终返回DiskMapIterator对象。

强制溢出

 

摧毁迭代器

获取迭代器

 

预聚合类 -- Aggregator

其源码以下:

这个类的两个方法 combineValuesByKey 和 combineCombinersByKey 都依赖于 ExternalAppendOnlyMap类。

下面继续来看ExternalSorter类的内部实现。

支持排序预聚合的sorter -- ExternalSorter

 

类说明

Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then optionally sorts keys within each partition using a custom Comparator. Can output a single partitioned file with a different byte range for each partition, suitable for shuffle fetches. If combining is disabled, the type C must equal V -- we'll cast the objects at the end. Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied to its use in sort-based shuffle (for example, its block compression is controlled by spark.shuffle.compress). We may need to revisit this if ExternalSorter is used in other non-shuffle contexts where we might want to use different configuration settings.

对类型(K,V)的多个键值对进行排序并可能合并,以生成类型(K,C)的键组合对。使用分区程序首先将key分组到分区中,而后能够选择使用自定义Comparator对每一个分区中的key进行排序。能够为每一个分区输出具备不一样字节范围的单个分区文件,适用于随机提取。若是禁用了组合,则类型C必须等于V - 咱们将在末尾转换对象。注意:虽然ExternalSorter是一个至关通用的排序器,但它的一些配置与基于排序的shuffle的使用有关(例如,它的块压缩由spark.shuffle.compress控制)。若是在咱们可能想要使用不一样配置设置的其余非随机上下文中使用ExternalSorter,咱们可能须要从新审视这一点。

下面,先来看其构造方法:

构造方法

参数以下:

aggregator:可选的聚合器,能够用于归并数据

partitioner :可选的分区器,若是有的话,先按分区Id排序,再按key排序

ordering : 可选的排序,它在每个分区内按key进行排序,它也能够是全局排序

serializer :用于溢出内存数据到磁盘的序列化器

 

其成员变量和核心方法,先不作剖析,其方法围绕两个核心展开,一部分是跟数据的插入有关的方法,一部分是跟多个溢出文件的合并操做有关的方法。 

下面来看看它的一些内部类。

 

只读一个分区数据的迭代器 -- IteratorForPartition

这个类实现了Iterator trait,只负责迭代读取一个特定分区的数据,其定义以下:

比较简单,不作过多说明。 

溢出文件的描述 -- SpilledFile

这个类是一个 case class ,它记录了溢出文件的一些关键信息,构造方法的各个字段以下:

file:溢出文件

blockId:溢出文件对应的blockId

serializerBatchSizes:表示每个序列化类对应的batch的大小。

elementsPerPartition:表示每个分区的元素的个数。

比较简单,没有类的方法定义。

读取溢出文件的内容 -- SpillReader

它负责读取一个按分区作文件分区的文件,但愿按分区顺序读取分区文件的内容。

其类结构以下:

成员变量

先看其成员变量:

batchOffsets:序列化类的每个批次的offset

partitionId:分区id

indexInPartition:在分区内的索引信息

batchId:batch的id

indexInBatch:在batch中的索引信息

lastPartitionId:上一个partition ID

nextPartitionToRead:下一个要读取的partition的id

fileStream:文件输入流

deserializeStream:分序列化流

nextItem:下一个键值对

finished:是否读取完毕

下面,来看其核心方法:

获取下一个批次的反序列化流

思路跟DiskMapIterator的获取下一个流的思路很相似,不作过多解释。

读取下一个partition的数据

其返回的是一个迭代器,org.apache.spark.util.collection.ExternalSorter.SpillReader#readNextPartition源码以下:

思路:其返回迭代器中,的hasNext中先去读取下一个item,若是读取到的下一个元素为null,则返回false,表示没有数据能够返回。

其依赖方法 org.apache.spark.util.collection.ExternalSorter.SpillReader#readNextItem 源码以下:

思路:首先该批次数据读取完毕,则关闭掉读取该批次数据的流,继续读取下一个批次的流。

其依赖方法 org.apache.spark.util.collection.ExternalSorter.SpillReader#skipToNextPartition 方法以下:

下面,整理一下思路:

每次读取一个文件的分区,该分区读取完毕,关闭分区文件,读取下一个文件的下一个分区数据。只不过它在读文件的分区的时候,会有batch操做,一个分区可能会对应多个batch,可是一个batch有且只能有一个分区。

SpillableIterator

首先它跟 org.apache.spark.util.collection.ExternalAppendOnlyMap.SpillableIterator 很像, 实现方法也很相似,都是实现了一个Iterator trait,构造方法以一个Iterator对象传入,而且对其作了封装,能够跟上文的 SpillableIterator 对比剖析。

其成员变量以下:

nextUpStream:下一个批次的stream

对Iterator的实现

先来看Iterator的方法实现:

溢出

其源码以下:

思路以下:首先建立内存迭代器,而后遍历内存迭代器,将数据溢出到磁盘中,其关键方法 spillMemoryIteratorToDisk。

 

两种存放溢出前数据的数据结构 

PartitionedAppendOnlyMap

这个类底层是数组,数据按照Map的形式稀疏排列,它还支持多个key的预聚合操做。

它是SizeTrackingAppendOnlyMap和 WritablePartitionPairCollection的子类。

 

其源码以下:

 

PartitionedPairBuffer

这个类底层是数组,数据按数组的形式紧凑排列。不支持多个相同key的预聚合操做。

它是SizeTracker 和 WritablePartitionPairCollection的子类。 

其源码以下:

插入数据

数组扩容

获取排序后的迭代器

获取读取数组数据的迭代器

 

下面来看最后一种shuffle数据写的方式。

使用SortShuffleWriter写数据

这种shuffle方式支持预聚合操做。

其下操做源码以下:

初始化Sorter

若是须要在map段作combine操做,则须要指定 aggragator和 keyOrdering,即map端的数据会作预聚合操做,而且分区内的数据有序,其排序规则是按照hashCode作排序的。

不然这两个参数为null,即map端的数据没有预聚合,而且分区内数据无序。

向sorter插入数据

其源码以下:

org.apache.spark.util.collection.ExternalSorter#insertAll的源码以下:

思路:首先若是数据须要执行map端的combine操做,则使用 PartitionedAppendOnlyMap 类来操做,这个类能够支持数据的combine操做。若是不须要 执行map 端的combine 操做,则使用  PartitionedPairBuffer 来实现,这个类不会对数据进行预聚合。每次数据写入以后,都要查看是否须要执行溢出内存数据到磁盘的操做。

这两个类在上文中已经作了详细的说明。

 

其依赖方法 addElementsRead 源码以下:

溢出内存数据到磁盘的核心方法 maybeSpillCollection 源码以下:

思路:它有一个标志位 usingMap表示是否使用的是map的数据结构,便是否是 PartitionedAppendOnlyMap,其思路几乎同样,只不过在调用 mayBeSpill 方法中传入的参数不同。其中使用的内存的大小,都是通过采样评估计算过的。其依赖方法 org.apache.spark.util.collection.Spillable#maybeSpill 以下:

思路:若是读取的数据是 32 的整数倍而且当前使用的内存比初始内存大,则开始向TaskMemoryManager申请分配内存,若是申请成功,则返回申请的大小,注意:在向TaskMemoryManager申请内存的过程当中,若是内存不够,也会去调用 org.apache.spark.util.collection.Spillable#spill 方法,在其内部也会去调用 org.apache.spark.util.collection.ExternalSorter#forceSpill 方法其源码以下,其中readingIterator是SpillableIterator类型的对象。

 

其依赖方法 org.apache.spark.util.collection.Spillable#logSpillage 会打印一些溢出日志。再也不过多说明。

其依赖方法 org.apache.spark.util.collection.ExternalSorter#spill 源码以下:

思路相对比较简单,主要是先获取排序后集合的迭代器,而后将迭代器传入 org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk ,将内存数据溢出到临时的磁盘文件后返回一个SpilledFile对象,将其记录到 spills中,spills这个变量主要记录了内存数据的溢出过程当中的溢出文件的信息。

其溢出磁盘方法 org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk 源码以下:

首先获取写序列化文件的writer,而后遍历数据的迭代器,将数据迭代写入到磁盘中,在写入过程当中,不断将每个分区的大小信息以及每个分区内元素的个数记录下来,最终将溢出文件、分区元素个数,以及每个segment的大小信息封装到SpilledFile对象中返回。

 

多文件归并为一个文件

其核心代码以下:

思路:首先先初始化一个临时的最终文件(以uuid做为后缀),而后初始化blockId,最后调用 org.apache.spark.util.collection.ExternalSorter的writePartitionedFile 方法。将数据写入一个临时文件,并将该文件中每个分区对应的FileSegment的大小返回。

其关键方法 org.apache.spark.util.collection.ExternalSorter#writePartitionedFile 源码以下:

思路:首先若是历来没有过溢出文件,则首先先看一下是否须要map端聚合,如果须要,则数据已经被写入到了map中,不然是buffer中。而后调用集合的转成迭代器的方法,将内存的数据排序后输出,最终迭代遍历这个迭代器,将数据不断写入到最终的临时文件中,更新分区大小返回。

若是以前已经有溢出文件了,则先调用  org.apache.spark.util.collection.ExternalSorter的partitionedIterator 方法将数据合并后返回合并后的迭代器。

最终遍历每个分区的数据,将分区的数据写入到最终的临时文件,更新分区大小;最后返回分区大小。

 

下面重点剖析一下合并方法 org.apache.spark.util.collection.ExternalSorter#partitionedIterator,其源码以下:

首先,要说明的是,经过咱们上面的程序分支进入该程序,此时历史溢出文件集合是空的,即它不会执行第一个分支的处理流程,但仍是要作一下简单的说明。

它有三个依赖方法分别以下:

依赖方法 org.apache.spark.util.collection.ExternalSorter#destructiveIterator 源码以下:

思路:首先 isShuffleSort为 true,咱们如今就是走的 shuffle sort的流程,确定是须要走第一个分支的,即它不会返回一个SpillableIterator迭代器。

值得注意的是,这里的comparator跟内存排序使用的comparator是同样的,即排序方式是同样的。

依赖方法 org.apache.spark.util.collection.ExternalSorter#groupByPartition 源码以下:

思路:遍历每个分区返回一个IteratorForPartition的分区迭代器。

注意:因为历史溢出文件集合此时不为空,将不会调用这个方法。

 

依赖方法 org.apache.spark.util.collection.ExternalSorter#merge 源码以下:

 

思路:传给merge方法的有两个参数,一个是表明溢出文件的SpiiledFile集合,一个是表明内存数据的迭代器。

首先遍历每个溢出文件,建立一个读取该溢出文件的SpillReader对象,而后遍历每个分区建立一个IteratorForPartition迭代器,而后读取每个溢出文件的分区的迭代器,最终和 做为参数传入merge 方法的内存迭代器合并到一个迭代器集合中。

若是是须要预聚合的,则调用 mergeWithAggregation 方法,若是是须要排序的,则调用mergeSort 方法,对其进行排序,最后若是不知足前两种状况,调用集合的flatten 方法,将打平到一个迭代器中返回。

它有两个依赖方法,分别以下:

org.apache.spark.util.collection.ExternalSorter#mergeSort 源码以下:

思路:使用堆排序构造优先队列,对数据进行排序,最终返回一个迭代器。每次先从堆中根据partitionID排序,将同一个partition的排到前面,每次取出一个Iterator,而后取出该Iterator中的一个元素,再放入堆中,由于可能取出一个元素后,Iterator的头节点的partitionId改变了,因此须要再次排序,就这样动态的出堆入堆,让不一样Iterator的相同partition的数据老是在一块儿被迭代取出。注意这里的comparator在指定ordering或aggragator的时候,是支持二级排序的,即不只仅支持分区排序,还支持分区内的数据按key进行排序,其排序器源码以下:

若是ordering和aggragator没有指定,则数据排序器为:

即只按分区排序,跟第二种shuffle的最终格式很相似,分区内部数据无序。

 

org.apache.spark.util.collection.ExternalSorter#mergeWithAggregation源码以下:

思路:若是数据总体并不要求有序,则会使用combiner将数据总体进行combine操做,最终相同key的数据被聚合在一块儿。若是数据总体要求有序,则直接对有序的数据按照顺序一边聚合一边迭代输出下一个元素,最终数据是总体有序的。

 

建立索引文件

其关键源码以下:

其思路很简单,能够参考 spark shuffle写操做三部曲之UnsafeShuffleWriter 对应部分的说明。 

总结

在本篇文章中,剖析了spark shuffle的最后一种写方式。溢出前数据使用数组自定义的Map或者是列表来保存,若是指定了aggerator,则使用Map结构,Map数据结构支持map端的预聚合操做,可是列表方式的不支持预聚合。

数据每次溢出数据都进行排序,若是指定了ordering,则先按分区排序,再按每一个分区内的key排序,最终数据溢出到磁盘中的临时文件中,在merge阶段,数据被SpillReader读取出来和未溢出的数据总体排序,最终数据能够总体有序的落到最终的数据文件中。

至此,spark shuffle的三种写方式都剖析完了。以后会有文章来剖析shuffle的读取操做。

不足之处:这篇文章历时比较久,也因为平时工做缘由,用的都是碎片时间,可能有一些部分思路接不上,可能还有部分类没有剖析,望见谅,虽然本文有诸多问题,可是对预总体理解第三种shuffle的写方式来讲,都无足轻重了。

相关文章
相关标签/搜索