spark shuffle写操做三部曲之BypassMergeSortShuffleWriter

前言

再上一篇文章 spark shuffle的写操做之准备工做 中,主要介绍了 spark shuffle的准备工做,本篇文章主要介绍spark shuffle使用BypassMergeSortShuffleWriter写数据详细细节。html

在本篇文章中若是有不了解的术语,也能够参照 spark shuffle的写操做之准备工做  作进一步了解。apache

这种shuffle写数据的方式是最简单的,spark计划在之后会移除这种shuffle机制。数组

 

先上源码,后解释:函数

流程以下:工具

map数据根据分区函数写入分区文件

若是没有数据要写,那么数据文件为空,索引文件中各个segment的大小为0,返回初始化的MapStatus。post

若是有数据要写到各个reducer的文件中,首先初始化序列化工具实例,遍历初始化各个partition的partitionWriter数组中的DiskBlockObjectWriter对象,初始化各个partition的FileSegment数组。ui

而后遍历每个要写入的记录值,而且取出记录的key值,根据Partitioner的getPartition函数肯定其reduce到的目标分区索引,而后根据计算出的索引肯定负责写数据的DiskBlockObjectWriter对象,而后根据该对象将键值对写入到临时分区文件。url

当每个要写入的记录值遍历操做完毕,遍历每个分区,将该分区对应的partitionWriter执行commitAndGet操做,返回该分区的FileSegment对象。spa

其依赖方法commitAndGet源码以下:设计

至此,大多数状况下,reduce的每个partition的数据有被写入到一个单独的文件。明明是FileSegment,为何是单独的文件呢?缘由就在于DiskBlockManager返回的临时ShuffleBlockId是不重复的,org.apache.spark.storage.DiskBlockManager#createTempShuffleBlock源码以下:

又由于建立临时文件,只是建立临时文件的句柄,此时对应的物理文件,并不存在,因此,这个方法不能保证建立的临时文件不重复。因此多个partition数据写入到一个临时文件的几率仍是有的,只不过是小几率事件。

最后小的分区文件会被合并为一个文件。

首先调用ShuffleBlockResolver(它是IndexShuffleBlockResolver实例)的getDataFile方法获取数据文件的句柄File对象,org.apache.spark.util.Utils的tempFileWith获取临时文件,org.apache.spark.util.Utils#tempFileWith源码以下,即得到一个带uuid后缀的文件:

合并分区文件

最后调用org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter的writePartitionedFile方法将多个小文件合并为一个大文件并返回包含每个partition

对应的文件段的大小的数组,源码以下:

更新索引文件

最后更新索引文件,给数据文件重命名后整个写过程就完全结束了,源码再也不作过多解释,在  spark shuffle的写操做之准备工做 中 IndexShuffleBlockResolver类中有说明。

总结

BypassMergeSortShuffleWriter是基于文件作的分区,没有sort操做,最后分区数据被写入一个完整文件,而且有一个索引文件记录文件中每个分区对应的FileSegment的大小。这种设计是比较朴素的,也很简单,易实现。

相关文章
相关标签/搜索