[Spark性能调优] 第二章:完全解密Spark的HashShuffle

本課主題

  • Shuffle 是分布式系统的天敌
  • Spark HashShuffle介绍
  • Spark Consolidated HashShuffle介绍
  • Shuffle 是如何成为 Spark 性能杀手
  • Shuffle 性能调优思考
  • Spark HashShuffle 源码鉴赏

 

引言

Spark HashShuffle 是它之前的版本,如今1.6x 版本默应是 Sort-Based Shuffle,那为何要讲 HashShuffle 呢,由于有分布式就必定会有 Shuffle,并且 HashShuffle 是 Spark之前的版本,亦便是 Sort-Based Shuffle 的前身,由于有 HashShuffle 的不足,才会有后续的 Sorted-Based Shuffle,以及如今的 Tungsten-Sort Shuffle,因此咱们有必要去了解它。html

人们对Spark的印象每每是基于内存进行计算,但实际上来说,Spark能够基于内存、也能够基于磁盘或者是第三方的储存空间进行计算,背后有两层含意,第1、Spark框架的架构设计和设计模式上是倾向于在内存中计算数据的,第2、这也表达了人们对数据处理的一种美好的愿望,就是但愿计算数据的时候,数据就在内存中算法

为何再一次强调 Shuffle 是 Spark 的性能杀手啦,那不就是说,Spark中的 “Shuffle“ 和 “Spark彻底是基于内存计算“ 的愿景是相违背的!!!但愿这篇文章能为读者带出如下的启发:设计模式

  • 了解为何 Shuffle 是分布式系统的天敌
  • 了解 Spark HashShuffle的原理和机制
  • 了解优化后 Spark Consolidated HashShuffle的原理和机制
  • 了解Shuffle 是如何成为 Spark 性能杀手
  • 了解能够从那几方面思考 Spark Shuffle 的性能调优
  • 了解 Spark HashShuffle 在读、写磁盘这个过程的源码鉴赏

 

Shuffle 是分布式系统的天敌

Spark 运行分红两部份,第一部份是 Driver Program,里面的核心是 SparkContext,它驱动著一个程序的开始,负责指挥,另一部份是 Worker 节点上的 Task,它是实际运行任务的,当程序运行时,不间断地由 Driver 与所在的进程进行交互,交互什么,有几点,第1、是让你去干什么,第2、是具体告诉 Task 数据在那里,例如说有三个 Stage,第二个 Task 要拿数据,它就会向 Driver 要数据,因此在整个工做的过程当中,Executor 中的 Task 会不断地与 Driver 进行沟通,这是一个网络传输的过程。缓存

[下图是 Spark 官方网站上的经典Spark架框图]
 性能优化

在这个过程当中一方面是 Driver 跟 Executor 进行网络传输,另外一方面是Task要从 Driver 抓取其余上游的 Task 的数据结果,因此有这个过程当中就不断的产生网络结果。其中,下一个 Stage 向上一个 Stage 要数据这个过程,咱们就称之为 Shuffle。
bash

思考点:上一个 Stage 为何要向下一个 Stage 发数据?假设如今有一个程序,里面有五个 Stage,我把它当作为一个很大的 Stage,在分布式系统中,数据分布在不一样的节点上,每个节点计算一部份数据,若是不对各个节点上独立的部份进行汇聚的话,咱们是计算不到最终的结果。这就是由于咱们须要利用分布式来发挥它自己并行计算的能力,然后续又须要计算各节点上最终的结果,因此须要把数据汇汇集中,这就会致使 Shuffle,这也是说为何 Shuffle 是分布式不可避免的命运。网络

  

Spark 中的 HashShuffle介绍

原始的 HashShuffle 机制

基于 Mapper 和 Reducer 理解的基础上,当 Reducer 去抓取数据时,它的 Key 究竟是怎么分配的,核心思考点是:做为上游数据是怎么去分配给下游数据的。在这张图中你能够看到有4个 Task 在2个 Executors 上面,它们是并行运行的,Hash 自己有一套 Hash算法,能够把数据的 Key 进行从新分类,每一个 Task 对数据进行分类而后把它们不一样类别的数据先写到本地磁盘,而后再通过网络传输 Shuffle,把数据传到下一个 Stage 进行汇聚。架构

下图有3个 Reducer,从 Task 开始那边各自把本身进行 Hash 计算,分类出3个不一样的类别,每一个 Task 都分红3种类别的数据,刚刚提过由于分布式的关系,咱们想把不一样的数据汇聚而后计算出最终的结果,因此下游的 Reducer 会在每一个 Task 中把属于本身类别的数据收集过来,汇聚成一个同类别的大集合,抓过来的时候会首先放在内存中,但内存可能放不下,也有可能放在本地 (这也是一个调优势。能够参考上一章讲过的一些调优参数),每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,因此总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件app

[下图是 Spark 最原始的 Hash-Based Shuffle 概念图]
 负载均衡

HashShuffle 也有它的弱点:

  1. Shuffle前在磁盘上会产生海量的小文件,此时会产生大量耗时低效的 IO 操做 (因為产生过多的小文件)
  2. 内存不够用,因为内存中须要保存海量文件操做句柄和临时信息,若是数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

 

优化后的 HashShuffle 机制

在刚才 HashShuffle 的基础上思考该如何进行优化,这是优化后的实现:

[下图是 Spark Consolidated Hash-Based Shuffle 概念图]

这里仍是有4个Tasks,数据类别仍是分红3种类型,由于Hash算法会根据你的 Key 进行分类,在同一个进程中,不管是有多少过Task,都会把一样的Key放在同一个Buffer里,而后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据)每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,因此总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。Consoldiated Hash-Shuffle的优化有一个很大的好处就是假设如今有200个Mapper Tasks在同一个进程中,也只会产生3个本地小文件; 若是用原始的 Hash-Based Shuffle 的话,200个Mapper Tasks 会各自产生3个本地小文件,在一个进程已经产生了600个本地小文件。3个对比600已是一个很大的差别了。

这个优化后的 HashShuffle 叫 ConsolidatedShuffle,在实际生产环境下能够调如下参数:

spark.shuffle.consolidateFiles=true

Consolidated HashShuffle 也有它的弱点: 

  1. 若是 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生不少小文件

 

Shuffle是如何成为Spark性能杀手及调优势思考

Shuffle 不能够避免是由于在分布式系统中的基本点就是把一个很大的的任务/做业分红一百份或者是一千份,这一百份和一千份文件在不一样的机器上独自完成各自不一样的部份,咱们是针对整个做业要结果,因此在后面会进行汇聚,这个汇聚的过程的前一阶段到后一阶段以致网络传输的过程就叫 Shuffle。在 Spark 中为了完成 Shuffle 的过程会把真正的一个做业划分为不一样的 Stage,这个Stage 的划分是跟据依赖关系去决定的,Shuffle 是整个 Spark 中最消耗性能的一个地方。试试想一想若是没有 Shuffle 的话,Spark能够完成一个纯内存式的操做。

reduceByKey,它会把每一个 Key 对应的 Value 聚合成一个 value 而后生成新的 RDD

Shuffle 是如何破坏了纯内存操做呢,由于在不一样节点上咱们要进行数据传输,数据在经过网络发送以前,要先存储在内存中,内存达到必定的程度,它会写到本地磁盘,(在之前 Spark 的版本它没有Buffer 的限制,会不断地写入 Buffer 而后等内存满了就写入本地,如今的版本对 Buffer 多少设定了限制,以防止出现 OOM,减小了 IO)

Mapper 端会写入内存 Buffer,这个便关乎到 GC 的问题,而后 Mapper端的 Block 要写入本地,大量的磁盘与IO的操做和磁盘与网络IO的操做,这就构成了分布式的性能杀手。

若是要对最终计算结果进行排序的话,通常会都会进行 sortByKey,若是以最终结果来思考的话,你能够认为是产生了一个很大很大的 partition,你能够用 reduceByKey 的时候指定它的并行度,例如你把 reduceByKey 的并行度变成为1,新 RDD 的数据切片就变成1,排序通常都会在不少节点上,若是你把不少节点变成一个节点而后进行排序,有时候会取得更好的效果,由于数据就在一个节点上,技术层面来说就只须要在一个进程里进行排序。

能够在调用 reduceByKey()接著调用 mapPartition( );
也能够用 repartitionAndSortWithPartitions( ); 

还有一个很危险的地方就是数据倾斜,在咱们谈的 Shuffle 机制中,不断强调不一样机器从Mapper端抓取数据并计算结果,但有没有意会到数据可能会分布不均衡,何时会致使数据倾斜,答案就是 Shuffle 时会导政数据分布不均衡,也就是数据倾斜的问题。数据倾斜的问题会引伸不少其余问题,好比,网络带宽、各重硬件故障、内存过分消耗、文件掉失。由于 Shuffle 的过程当中会产生大量的磁盘 IO、网络 IO、以及压缩、解压缩、序列化和反序列化等等

 

Shuffle 性能调优思考

Shuffle可能面临的问题,运行 Task 的时候才会产生 Shuffle (Shuffle 已经融化在 Spark 的算子中)

  1. 几千台或者是上万台的机器进行汇聚计算,数据量会很是大,网络传输会很大
  2. 数据如何分类其实就是 partition,即如何 Partition、Hash 、Sort 、计算
  3. 负载均衡 (数据倾斜)
  4. 网络传输效率,须要压缩或解压缩之间作出权衡,序列化 和 反序列化也是要考虑的问题

具体的 Task 进行计算的时候尽一切最大可能使得数据具有 Process Locality 的特性,退而求其次是增长数据分片减小每一个 Task 处理的数据量基于Shuffle 和数据倾斜所致使的一系列问题,能够延伸出不少不一样的调优势,好比说:

  • Mapper端的 Buffer 应该设置为多大呢?
  • Reducer端的 Buffer 应该设置为多大呢?若是 Reducer 太少的话,这会限制了抓取多少数据
  • 在数据传输的过程当中是否有压缩以及该用什么方式去压缩,默应是用 snappy 的压缩方式。
  • 网络传输失败重试的次数,每次重试之间间隔多少时间。

 

Spark HashShuffle 源码鉴赏

咱们说 Shuffle 的过程是Mapper和Reducer以及网络传输构成的,Mapper 这一端会把本身的数据写入本地磁盘,Reducer 这一端会经过网络把数据抓取过来。Mapper 会先把数据缓存在内存中,在默应状况下缓存空间是 32K,数据从内存到本地磁盘的一个过程就是写数据的一个过程

这里有两个 Stage,上一个 Stage 叫 ShuffleMapTask,下面的一个 Stage 多是 ShuffleMapTask,也有多是 ResultsTask,取决于它这个任务是否是最后一个Stage所产生的。ShuffleMapTask会把咱们处理的RDD的数据分红苦干个 Bucket,即一个又一个的 Buffer。一个Task怎么去切分具体要看你的 partitioner,ShuffleMapTask确定是属于具体的 Stage。

HashShuffle 写数据的过程

  1. 在一个 Task 中,核心的代码是 runTask,你能够看到里面建立了一个 ShuffleWriter,它是负责把缓存中的数据写入本地磁盘的,但 ShuffleWriter 写入入本地磁盘时,还有一个很是重要的工做,就是要先跟Spark 的Driver 通讯,说我把数据写到了什么地方,这样下一个Stage找上一个Stage的数据的时候,它是找 Driver(blockManagerMaster)去获取数据信息的,Driver(blockManagerMaster) 会告诉下一个Stage中的Task写入的数据在那里。
    [下图是 ShuffleMapTask.scala 中的 runTask 方法]

  2. 而后建立了一个 ShuffleManager,这是从 SparkEnv 中得到到的 ShuffleManager,SparkEnv是运行时的环境,因此在写代码的时候能够配置它。
    [下图是 SparkContext.scala 中的 shuffleManager 变量]

  3. 再往下看ShuffleManager调用了getWriter 方法,在这里咱们主要的是看 HashShuffle 的方式,因此看看它具体子类该怎么实现。
    [下图是 ShuffleManager.scala 中的 ShuffleManager 类]


    [下图是 HashShuffleManager.scala 中的 getWriter 方法]
  4. 从getWriter方式建立了 HashShuffleWriter 的实例对象,因此若是须要看它具体的怎么写数据的话,必须要看 HashShuffleWriter 类,而后它也必需有一个 write 的方法,首先它会判断一下是否有在 Mapper 端进行 aggregrate 的操做,也就是说是否进行的 Mapper 和 Reducer 这种计算模型的 LocalReduce,若是有的话,就基于records 进行聚合,它就会循环遍历Buffer里面的数据。在本地的聚合显现带来的好处是减小的磁盘IO的数据、以及操做磁盘IO的次数、以及网络传输的数据量、以及这个 Reduce Task 抓取 Mapper Task 数据的次数,这个意义确定是很是重大的。
    [下图是 HashShuffleWriter.scala 中的 write 方法]

  5. ShuffleWriterGroup,它会把相应的 Key 合并在同一个文件中,而后它会判断一下是否须要进行一个合并的过程,它构建了一个ShuffleWriterGroup的实体对象,同时呢,它会判断是否启动压缩机制,若是启动了压缩机制,会有一个fileGroup(bucketId),不然的话就getFile(bucketId)。
    [下图是 ShuffleWriterGroup.scala 中的 forMapTask 方法]

    [下图是 ShuffleWriterGroup.scala 中的 forMapTask 方法内部具体的实现]
  6. 最后不管它采用那种状况,最终也会调用 blockManager.getDiskWriter( )来完成写入数据到本地磁盘这个动做
    [下图是 BlockManager.scala 中的 getDiskWriter 方法]

  7. 通过第4步后再回来看看shuffle,第一个参数是 shuffleId,第二个是 mapId,第三个是输出的 Split 个数,第4个是序列化器,第五个是metric 来统计它的一些基本信息
    [下图是 HashShuffleWriter.scala 中的 shuffle 参数]

  8. 这里的writer是代表具体要写到什么地方,bucketId 是经过传入key到partition中的方式,下面调用 write 时有两个参数,elem._1 和 elem._2,因此elem._1是key和elem._2是具体内容自己。当分好bucketId就开始写数据。
    [下图是 HashShuffleWriter.scala 中的 runTask 方法内部具体的实现]

    Spark 的并行度是继承的,若是上游有4个并行任务的话,下游也会有4个
    [下图是 Partitioner.scala 中的 HashPartitioner 类以及它的方法实现]

    [下图是 Utils.scala 中的 nonNegativeMod 方法]
     

HashShuffle 读数据的过程

  1. 在 Reader 中重点是看它的 Read 方法,首先会建立一个 ShuffleBlockFetcherIterator,这里有一个很重要的调优的参数,也就是说一次能最大的抓取多少数据过来,在 Spark1.5.2 默应状况下是 48M,若是你内存足够大以及把内存空间分配足够的状况下,由于Shuffle会占用百分比,能够试试调大这个参数,调大这个参数的好处是减小抓取次数,由于网络IO的开销来创建新的链接其实很耗时的;往下看它再次进行一下判断看看Mapper端的Aggregrator 是否已经定义了。
    [下图是 HashShuffleReader.scala 中的 read 方法]

    [下图是 HashShuffleReader.scala 中的 read 方法的内部实现]
  2. 在Shuffle写数据的过程当中,一开始会建立 ShuffleBlockFetecherIterator 对象实例,而后调用它的 initialize( )方法
    [下图是 ShuffleBlockFetecherIterator.scala 中的 ShuffleBlockFetecherIterator 类]


    [下图是 ShuffleBlockFetecherIterator.scala 中的 localBlocks 和 remoteBlocks 变量]

    [下图是 ShuffleBlockFetecherIterator.scala 中的 initialize 方法]

    在 initialize( )方法转过来会调用 sendRequest( )方法,抓到数据后这里有一个 BlockFetchingListener,它会对数据进行处理,
    [下图是 ShuffleBlockFetecherIterator.scala 中的 sendRequest 方法]

  3. 说明一点就是它底层有一套通讯框架,咱们基于这套通讯框架进行数据的请求和传输
    [下图是 NettyBlockTransferService.scala 中的 fetchBlocks 方法]

咱们从 Reducer端借助了 HashShuffleReader 从远程抓取数据,抓取数据过来以后进行 Aggregrate 操做汇聚,汇聚具体是进行分组或者是什么样的算法是开发者本身决定的。reduceByKey和Hadoop中的mapper与reducer相比,有一个缺点,在 Hadoop 的世界,不管你的数据的什么样的类型你均可以自定义,Mapper和Reducer的业务逻辑能够完成不同。

Reducer端若是内存不够写磁盘的代价是双倍的,在 Mapper端不管内存够不够它都须要先写磁盘,由于Reducer端在计算的时候须要又一次的把数据从磁盘上抓回来,因此实际生产环境下须要适当地把 Shuffle 内存调大一点。

 

总结

由于想利用分布式的计算能力,因此要把数据分散到不一样节点上运行,上游阶段数据是并行运行的,下游阶段要进行汇聚,因此出现Shuffle,若是下游分红三类,上游也须要每一个Task把数据分红三类,虽然有可能有一类是没有数据,这无所谓,只要在实际运行时按照这套规则就能够了,这就是最原始的 Shuffle 过程。

Hash-based Shuffle 默认Mapper 阶段会为Reducer 阶段的每个Task单首创建一个文件来保存该Task中要使用的数据,可是在一些状况下(例如说数据量很是庞大的状况) 会形成大量文件的随机磁盘IO操做且会性成大量的Memory消耗(极易形成OOM)。

  • 原始的 Hash-Shuffle 所产生的小文件: Mapper 端 Task 的个数 x Reduce 端 Task 的数量
  • Consolidated Hash-Shuffle 所产生的小文件: CPU Cores 的个数 x Reduce 端 Task 的数量

Spark Shuffle 说到底都是离不开读文件、写文件、为了高效咱们须要缓存,因为有不少不一样的进程,就须要一个管理者。HashShuffle 适合的埸景是小数据的埸景,对小规模数据的处理效率会比排序后的 Shuffle 高。

 

参考资料

资料来源来至

[1] DT大数据梦工厂 大数据商业案例以及性能调优

第23课:Spark旧版本中性能调优之HashShuffle剖析及调优(内含大数据Shuffle本质及其思考)
第24课:完全解密Shuffle是如何成为Spark性能杀手的及调优势思考
第25课:Spark Hash Shuffle源码解读与剖析

[2] 美团 Spark性能优化指南——高级篇

[3] Spark源码图片取自于 Spark 1.5.2版本

 

 

想了解 JVM 在 Spark 中是如何分配内存空间能够参考:第四章 : Shuffle 中 JVM 内存使用及配置内幕详情

相关文章
相关标签/搜索