在历史的发展中,为何 Spark 最终仍是选择放弃了 HashShuffle 而使用了 Sorted-Based Shuffle,并且做为后起之秀的 Tungsten-based Shuffle 它到底在什么样的背景下产生的。Tungsten-Sort Shuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序须要原生的 Sorted-Based Shuffle 仍是用 Tungsten-Sort Shuffle,那识别的依据是什么,其实 Spark 会检查相对的应用程序有没有 Aggregrate 的操做。文章的后续部份会介绍 Tungsten-Sort Shuffle 是如何管理内存和CPU。其实 Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必需要先进行排序 (注意,这里没有说对计算结果进行排序),因此致使它排序的速度有点慢。而 Tungsten-Sort Shuffle 对它的排序算法进行了改进,优化了排序的速度。但愿这篇文章能为读者带出如下的启发:算法
为何 Spark 用 Sorted-Based Shuffle 而放弃了 Hash-Based Shuffle?在 Spark 里为何最终是 Sorted-Based Shuffle 成为了核心,有基本了解过 Spark 的学习者都会知道,Spark会根据宽依赖把它一系列的算子划分红不一样的 Stage,Stage 的内部会进行 Pipeline,Stage 与 Stage 之间进行 Shuffle,Shuffle 的过程包含三部份。数组
第一部份是 Shuffle 的 Write;第二部份是网络的传输;第三部份就是 Shuffle 的 Read,这三大部份设置了内存操做、磁盘IO、网络IO以及 JVM 的管理。而这些东西影响了 Spark 应用程序在 95%以上效率的惟一缘由,假设你程序代码的质素自己是很是好的状况下,你性能的95%都消耗在 Shuffle 阶段的本地写磁盘文件,网络传输数据以及抓取数据这样的生命周期中。缓存
在 Shuffle 写数据的时候,内存中有一个缓存区叫 Buffer,你能够想像成一个Map,同时在本地磁盘有相对应的本地文件。若是本地磁盘有文件你在内存中确定也须要有相对应的管理句柄。也就是说,单是从 ShuffleWrite 内存占用的角度讲,已经有一部分内存空间是用在存储 Buffer 数据的,另外一部份的内存空间是用来管理文件句柄的,回顾 HashShuffle 所产生小文件的个数是 Mapper 分片数量 x Reducer 分片数量 (MxR)。好比Mapper端有1千个数据分片,Reducer端也有1千过数据分片,在 HashShuffle 的机制下,它在本地内存空间中会产生 1000 * 1000 = 1,000,000 个小文件,可想而知的结果会是什么,这么多的 IO,这么多的内存消耗、这么容易产生 OOM、以及这么沉重的 CG 负担。再说,若是Reducer端去读取 Mapper端的数据时,Mapper 端有这么多的小文件,要打开不少网络通道去读数据,打开 1,000,000 端口不是一件很轻松的事。这会致使一个很是经典的错误:Reducer 端也就是下一个 Stage 经过 Driver 去抓取上一个 Stage 属于它本身的数据的时候,说文件找不到。其实这个时候不是真的是磁盘上文件找不到,而是程序不响应,由于它在进行垃圾回收 (GC) 操做。网络
由于 Spark 想完成一体化多样化的数据处理中心或者叫一统大数据领域的一个好梦,确定不甘心于本身只是一个只能处理中小规模的数据计算平台,因此Spark最根本要优化和逼切要解决的问题是:减小 Mapper 端 ShuffleWriter 所产生的文件数量,这样即可以能让 Spark 从几百台集群的规模中瞬间变成能够支持几千台甚至几万台集群的规模。(一个Task背后多是一个Core去运行、也多是多个Core去运行,但默认状况下是用一个Core去运行一个Task)。 数据结构
减小Mapper端的小文件所带来的好处是:并发
Sorted-Based Shuffle 的出现,最显著的优点就是把 Spark 从只能处理中小规模的数据平台,变成能够处理无限大规模的数据平台。可能你会问规模真是这么重要吗?固然有,集群规模意为著它处理数据的规模,也意为著它的运算能力。app
Sorted-Based Shuffle 不会为每一个Reducer 中的Task 生产一个单独的文件,相反Sorted-Based Shuffle 会把Mapper 中每一个ShuffleMapTask 全部的输出数据Data 只写到一个文件中,由于每一个ShuffleMapTask 中的数据会被分类,因此Sort-based Shuffle 使用了index 文件存储具体ShuffleMapTask 输出数据在同一个Data 文件中是如何分类的信息。因此说基于 Sort-based Shuffle 会在 Mapper 中的每个 ShuffleMapTask 中产生两个文件 (并发度的个数 x 2)!!!框架
它会产生一个 Data 文件和一个 Index 文件,其中 Data 文件是存储当前 Task 的 Shuffle 输出的, 而 Index 文件则存储了 Data 文件中的数据经过 Partitioner 的分类信息,此时下一个阶段的 Stage 中的 Task 就是根据这个 Index 文件获取本身所须要抓取的上一个 Stage 中 ShuffleMapTask 所产生的数据;oop
假设如今 Mapper 端有 1000 个数据分片,Reducer 端也有 1000 个数据分片,它的并发度是100,使用 Sorted-Based Shuffle 会产生多少个 Mapper端的小文件,答案是 100 x 2 = 200 个。它的 MapTask 会独自运行,每一个 MapTask 在运行的时候写2个文件,运行成功后就不须要这个 MapTask 的文件句柄,不管是文件自己的句柄仍是索引的句柄都不须要,因此若是它的并发度是 100 个 Core,每次运行 100 个任务的话,它最终只会占用 200 个文件句柄,这跟 HashShuffle 的机制不同,HashShuffle 最差的状况是 Hashed 句柄存储在内存中的。性能
Sorted-Based Shuffle 主要是在Mapper阶段,这个跟Reducer端没有任何关系,在Mapper阶段它要进行排序,你能够认为是二次排序,它的原理是有2个Key进行排序,第一个是 PartitionId进行排序,第二个是就是自己数据的Key进行排序。看下图,它会把 PartitionId 分红3个,分别是索引为 0、一、2,这个在Mapper端进行排序的过程实际上是让Reducer去抓取数据的时候变得更高效,好比说第一个Reducer,它会到Mappr端的索引为 0 的数据分片中抓取数据。
具体而言,Reducer 首先找 Driver 去获取父 Stage 中每一个 ShuffleMapTask 输出的位置信息,跟据位置信息获取 Index 文件,解析 Index 文件,从解析的 Index 文件中获取 Data 文件中属于本身的那部分内容。
一个Mapper任务除了有一个数据文件之外,它也会有一个索引文件,Map Task 把数据写到文件磁盘是顺序根据自身的Key写进去的,也同时按照 Partition写进去的,由于它是顺序写数据,记录每一个 Partition 的大小。
Sort-Based Shuffle 的弱点:
1
|
spark.shuffle.spill
=
false
|
若是设置了这个运行模式,在生产环境下建义对内存的数据做2份备份,由于在默认状况下内存数据只有1份,它不像HDFS那样,自然有3份备份。使用 ExternalAppendOnlyMap 的方式时,若是内存占用率达到必定的临界值后会首先尝试在内存中扩大 ExternalAppendOnlyMap (内部有实现算法),若是不能扩容的话才会 spill 到磁盘。
第三大问题:Shuffle 的数据在 Mapper 端如何存储,在 Reducer 端如何知道数据具体在那里的?在Spark的实现上每个Stage (里面是 ShuffleMapTask) 中的 Task 在 Stage 的最后一个 RDD 上必定会注册给 Driver 上的 MapOutputTrackerMaster,Mapper 经过和 MapOutputTrackerMaster 来汇报 ShuffleMapTask 具体输出数据的位置 (具体的输出文件及内容是和 Reducer 有关的),Reducer 是向 Driver 中的 MapOutputTrackerMaster 请求数据的元数据信息,而后和 Mapper 所在的 Executor 进行通讯。
人们会对 Sorted-Based Shuffle 有一种误解,就是它产出的结果是有序的,这一节会讲解 Sorted-Based Shuffle 是如何工做的并配合源码看看它具体的实现,Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每一个 ShuffleMapTask 的输出排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不一样的内容,它还须要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。
上图在 Reducer 端有4个Reducer Task,它会产生一组 File Group 和 一个索引文件,File Group 里的 FileSegement 会进行排序,下游的 Task 能够很容易跟据索引 (index) 定位到这个 Fie 中的那一部份 FileSegement 是属于下游的,它至关于一个指针,下游的 Task 要向 Driver 去碓定文件在那里,而后到了这个 File 文件所在的地方,实际上会跟 BlockManager 进行沟通,BlockManager 首先会读一个 Index 文件,根据它的命名则进行解析,好比说下一个阶段的第一个 Task,通常就是抓取第一个 Segment,这是一个指针定位的过程。
再次强调 Sort-Based Shuffle 最大的意义是减小临时文件的输出数量,且只会产生两个文件:一个是包含不一样内容划分红不一样 FileSegment 构成的单一文件 File,另一个是索引文件 Index。上图在 Sort-Based Shuffle 的介绍中看见了一个 Sort and Spill 的过程 (它是 Spill 到磁盘的时候再进行排序的),如今咱们从源码的角度去看看到底它这个排序其实是在干什么的。
Sorted-Based Shuffle 的诞生和出现意味著 Spark 从只能处理〝中小规模数据的数据处理平台"重新定位为可以处理〝大规模数据的数据处理平台",更进一步巩故它在数据处理领域的龙头地位。它最大的优化就是:减小了由于 HashShuffle 机制不管是原生 HashShuffle、仍是 Consolidated Shuffle 在 Mapper 端所产生的海量小文件 (这是应用程序运行时的一个中间过程),中间文件数量从 M x R 个数 变成 2M 的小文件。数据量愈大,这个所优化带来的效果便越来越强烈,这是优化的第一步。
虽然 Sort-Based Shuffle 已经大大提高了序程运行时的效率,但若是 Mapper 端并行度的数据分片过多的话,也会致使大量内存消耗和GC的巨大负担,形成系统缓慢甚至崩溃。基于这观点 Spark 再一次突破本身,推出了 Tungsten-Based Shuffle,提高了在Mapper端进行排序的速度,充分利用了的 CPU 等资源。