Spark性能优化:数据倾斜调优

前言

   继Spark性能优化:开发调优篇》和《Spark性能优化:资源调优篇》讲解了每一个Spark开发人员都必须熟知的开发调优与资源调优以后,本文做为《Spark性能优化指南》的高级篇,将深刻分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。html

1.数据倾斜调优

调优概述

      有的时候,咱们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark做业的性能会比指望差不少。数据倾斜调优,就是使用各类技术方案解决不一样类型的数据倾斜问题,以保证Spark做业的性能。java

数据倾斜发生时的现象

  • 绝大多数task执行得都很是快,但个别task执行极慢。好比,总共有1000个task,997个task都在1分钟以内执行完了,可是剩余两三个task却要一两个小时。这种状况很常见。sql

  • 本来可以正常执行的Spark做业,某天忽然报出OOM(内存溢出)异常,观察异常栈,是咱们写的业务代码形成的。这种状况比较少见。后端

数据倾斜发生的原理

      数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,好比按照key进行聚合或join等操做。此时若是某个key对应的数据量特别大的话,就会发生数据倾斜。好比大部分key对应10条数据,可是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,而后1秒钟就运行完了;可是个别task可能分配到了100万数据,要运行一两个小时。所以,整个Spark做业的运行进度是由运行时间最长的那个task决定的。性能优化

      所以出现数据倾斜的时候,Spark做业看起来会运行得很是缓慢,甚至可能由于某个task处理的数据量过大致使内存溢出。网络

      下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,因此另外两个task只要分别处理1条数据便可。此时第一个task的运行时间多是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。app

数据倾斜原理

如何定位致使数据倾斜的代码

      数据倾斜只会发生在shuffle过程当中。这里给你们罗列一些经常使用的而且可能会触发shuffle操做的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所致使的。dom

某个task执行特别慢的状况

      首先要看的,就是数据倾斜发生在第几个stage中。ide

      若是是用yarn-client模式提交,那么本地是直接能够看到log的,能够在log中找到当前运行到了第几个stage;若是是用yarn-cluster模式提交,则能够经过Spark Web UI来查看当前运行到了第几个stage。此外,不管是使用yarn-client模式仍是yarn-cluster模式,咱们均可以在Spark Web UI上深刻看一下当前这个stage各个task分配的数据量,从而进一步肯定是否是task分配的数据不均匀致使了数据倾斜。函数

      好比下图中,倒数第三列显示了每一个task的运行时间。明显能够看到,有的task运行特别快,只须要几秒钟就能够运行完;而有的task运行特别慢,须要几分钟才能运行完,此时单从运行时间上看就已经可以肯定发生数据倾斜了。此外,倒数第一列显示了每一个task处理的数据量,明显能够看到,运行时间特别短的task只须要处理几百KB的数据便可,而运行时间特别长的task须要处理几千KB的数据,处理的数据量差了10倍。此时更加可以肯定是发生了数据倾斜。

      知道数据倾斜发生在哪个stage以后,接着咱们就须要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中确定会有一个shuffle类算子。精准推算stage与代码的对应关系,须要对Spark的源码有深刻的理解,这里咱们能够介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会致使shuffle的语句(好比group by语句),那么就能够断定,以那个地方为界限划分出了先后两个stage。

      这里咱们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大体推算出一个stage对应的代码。以下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,所以就能够认为,以这个算子为界限,会划分出先后两个stage。

  • stage0,主要是执行从textFile到map操做,以及执行shuffle write操做。shuffle write操做,咱们能够简单理解为对pairs RDD中的数据进行分区操做,每一个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操做,stage1的各个task一开始运行,就会首先执行shuffle read操做。执行shuffle read操做的task,会从stage0的各个task所在节点拉取属于本身处理的那些key,而后对同一个key进行全局性的聚合或join等操做,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子以后,就计算出了最终的wordCounts RDD,而后会执行collect算子,将全部数据拉取到Driver上,供咱们遍历和打印输出。
    [plain]  view plain  copy
     
     print?在CODE上查看代码片派生到个人代码片
    1. val conf = new SparkConf()  
    2. val sc = new SparkContext(conf)  
    3.   
    4. val lines = sc.textFile("hdfs://...")  
    5. val words = lines.flatMap(_.split(" "))  
    6. val pairs = words.map((_, 1))  
    7. val wordCounts = pairs.reduceByKey(_ + _)  
    8.   
    9. wordCounts.collect().foreach(println(_))  

      经过对单词计数程序的分析,但愿可以让你们了解最基本的stage划分的原理,以及stage划分后shuffle操做是如何在两个stage的边界处执行的。而后咱们就知道如何快速定位出发生数据倾斜的stage对应代码的哪个部分了。好比咱们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,断定stage1出现了数据倾斜,那么就能够回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就能够肯定是由educeByKey算子致使的数据倾斜问题。好比某个单词出现了100万次,其余单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的状况

      这种状况下去定位出问题的代码就比较容易了。咱们建议直接看yarn-client模式下本地log的异常栈,或者是经过YARN查看yarn-cluster模式下的log中的异常栈。通常来讲,经过异常栈信息就能够定位到你的代码中哪一行发生了内存溢出。而后在那行代码附近找找,通常也会有shuffle类算子,此时极可能就是这个算子致使了数据倾斜。

      可是你们要注意的是,不能单纯靠偶然的内存溢出就断定发生了数据倾斜。由于本身编写的代码的bug,以及偶然出现的数据异常,也可能会致使内存溢出。所以仍是要按照上面所讲的方法,经过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能肯定是不是因为数据倾斜才致使了此次内存溢出。

查看致使数据倾斜的key的数据分布状况

      知道了数据倾斜发生在哪里以后,一般须要分析一下那个执行了shuffle操做而且致使了数据倾斜的RDD/Hive表,查看一下其中key的分布状况。这主要是为以后选择哪种技术方案提供依据。针对不一样的key分布与不一样的shuffle算子组合起来的各类状况,可能须要选择不一样的技术方案来解决。

      此时根据你执行操做的状况不一样,能够有不少种查看key分布的方式:

  1. 若是是Spark SQL中的group by、join语句致使的数据倾斜,那么就查询一下SQL中使用的表的key分布状况。
  2. 若是是对Spark RDD执行shuffle算子致使的数据倾斜,那么能够在Spark做业中加入查看key分布的代码,好比RDD.countByKey()。而后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就能够看到key的分布状况。

      举例来讲,对于上面所说的单词计数程序,若是肯定了是stage1的reduceByKey算子致使了数据倾斜,那么就应该看看进行reduceByKey操做的RDD中的key分布状况,在这个例子中指的就是pairs RDD。以下示例,咱们能够先对pairs采样10%的样本数据,而后使用countByKey算子统计出每一个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

[plain]  view plain  copy
 
 print?在CODE上查看代码片派生到个人代码片
  1. val sampledPairs = pairs.sample(false, 0.1)  
  2. val sampledWordCounts = sampledPairs.countByKey()  
  3. sampledWordCounts.foreach(println(_))  

 

2.数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

      方案适用场景:致使数据倾斜的是Hive表。若是该Hive表中的数据自己很不均匀(好比某个key对应了100万数据,其余key才对应了10条数据),并且业务场景须要频繁使用Spark对Hive表执行某个分析操做,那么比较适合使用这种技术方案。

      方案实现思路:此时能够评估一下,是否能够经过Hive来进行数据预处理(即经过Hive ETL预先对数据按照key进行聚合,或者是预先和其余表进行join),而后在Spark做业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时因为数据已经预先进行过聚合或join操做了,那么在Spark做业中也就不须要使用原先的shuffle类算子执行这类操做了。

      方案实现原理:这种方案从根源上解决了数据倾斜,由于完全避免了在Spark中执行shuffle类算子,那么确定就不会有数据倾斜的问题了。可是这里也要提醒一下你们,这种方式属于治标不治本。由于毕竟数据自己就存在分布不均匀的问题,因此Hive ETL中进行group by或者join等shuffle操做时,仍是会出现数据倾斜,致使Hive ETL的速度很慢。咱们只是把数据倾斜的发生提早到了Hive ETL中,避免Spark程序发生数据倾斜而已。

      方案优势:实现起来简单便捷,效果还很是好,彻底规避掉了数据倾斜,Spark做业的性能会大幅度提高。

      方案缺点:治标不治本,Hive ETL中仍是会发生数据倾斜。

      方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark做业的场景,并且对Spark做业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提早到上游的Hive ETL,天天仅执行一次,只有那一次是比较慢的,而以后每次Java调用Spark做业时,执行速度都会很快,可以提供更好的用户体验。

      项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是容许用户经过Java Web系统提交数据分析统计任务,后端经过Java提交Spark做业进行数据分析统计。要求Spark做业速度必需要快,尽可能在10分钟之内,不然速度太慢,用户体验会不好。因此咱们将有些Spark做业的shuffle操做提早到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽量地减小Spark的shuffle操做,大幅度提高了性能,将部分做业的性能提高了6倍以上。

解决方案二:过滤少数致使倾斜的key

      方案适用场景:若是发现致使倾斜的key就少数几个,并且对计算自己的影响并不大的话,那么很适合使用这种方案。好比99%的key就对应10条数据,可是只有一个key对应了100万数据,从而致使了数据倾斜。

      方案实现思路:若是咱们判断那少数几个数据量特别多的key,对做业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。好比,在Spark SQL中可使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。若是须要每次做业执行时,动态断定哪些key的数据量最多而后再进行过滤,那么可使用sample算子对RDD进行采样,而后计算出每一个key的数量,取数据量最多的key过滤掉便可。

      方案实现原理:将致使数据倾斜的key给过滤掉以后,这些key就不会参与计算了,天然不可能产生数据倾斜。

      方案优势:实现简单,并且效果也很好,能够彻底规避掉数据倾斜。

      方案缺点:适用场景很少,大多数状况下,致使倾斜的key仍是不少的,并非只有少数几个。

      方案实践经验:在项目中咱们也采用过这种方案解决数据倾斜。有一次发现某一天Spark做业在运行的时候忽然OOM了,追查以后发现,是Hive表中的某一个key在那天数据异常,致使数据量暴增。所以就采起每次执行前先进行采样,计算出样本中数据量最大的几个key以后,直接在程序中将那些key给过滤掉。

解决方案三:提升shuffle操做的并行度

      方案适用场景:若是咱们必需要对数据倾斜迎难而上,那么建议优先使用这种方案,由于这是处理数据倾斜最简单的一种方案。

      方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,好比reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,好比group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲都有点太小。

      方案实现原理:增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了。具体原理以下图所示。

      方案优势:实现起来比较简单,能够有效缓解和减轻数据倾斜的影响。

      方案缺点:只是缓解了数据倾斜而已,没有完全根除问题,根据实践经验来看,其效果有限。

      方案实践经验:该方案一般没法完全解决数据倾斜,由于若是出现一些极端状况,好比某个key对应的数据量有100万,那么不管你的task数量增长到多少,这个对应着100万数据的key确定仍是会分配到一个task中去处理,所以注定仍是会发生数据倾斜的。因此这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其余方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

      方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

      方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个key都打上一个随机数,好比10之内的随机数,此时原先同样的key就变成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。而后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操做,就能够获得最终结果了,好比(hello, 4)。

      方案实现原理:将本来相同的key经过附加随机前缀的方式,变成多个不一样的key,就可让本来被一个task处理的数据分散到多个task上去作局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就能够获得最终的结果。具体原理见下图。

      方案优势:对于聚合类的shuffle操做致使的数据倾斜,效果是很是不错的。一般均可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark做业的性能提高数倍以上。

      方案缺点:仅仅适用于聚合类的shuffle操做,适用范围相对较窄。若是是join类的shuffle操做,还得用其余的解决方案。

[plain]  view plain  copy
 
 print?在CODE上查看代码片派生到个人代码片
  1. <span style="font-size:12px;">// 第一步,给RDD中的每一个key都打上一个随机前缀。  
  2. JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(  
  3.         new PairFunction<Tuple2<Long,Long>, String, Long>() {  
  4.             private static final long serialVersionUID = 1L;  
  5.             @Override  
  6.             public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)  
  7.                     throws Exception {  
  8.                 Random random = new Random();  
  9.                 int prefix = random.nextInt(10);  
  10.                 return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);  
  11.             }  
  12.         });  
  13.   
  14. // 第二步,对打上随机前缀的key进行局部聚合。  
  15. JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(  
  16.         new Function2<Long, Long, Long>() {  
  17.             private static final long serialVersionUID = 1L;  
  18.             @Override  
  19.             public Long call(Long v1, Long v2) throws Exception {  
  20.                 return v1 + v2;  
  21.             }  
  22.         });  
  23.   
  24. // 第三步,去除RDD中每一个key的随机前缀。  
  25. JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(  
  26.         new PairFunction<Tuple2<String,Long>, Long, Long>() {  
  27.             private static final long serialVersionUID = 1L;  
  28.             @Override  
  29.             public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)  
  30.                     throws Exception {  
  31.                 long originalKey = Long.valueOf(tuple._1.split("_")[1]);  
  32.                 return new Tuple2<Long, Long>(originalKey, tuple._2);  
  33.             }  
  34.         });  
  35.   
  36. // 第四步,对去除了随机前缀的RDD进行全局聚合。  
  37. JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(  
  38.         new Function2<Long, Long, Long>() {  
  39.             private static final long serialVersionUID = 1L;  
  40.             @Override  
  41.             public Long call(Long v1, Long v2) throws Exception {  
  42.                 return v1 + v2;  
  43.             }  
  44.         });</span>  

 

解决方案五:将reduce join转为map join

      方案适用场景:在对RDD使用join类操做,或者是在Spark SQL中使用join语句时,并且join操做中的一个RDD或表的数据量比较小(好比几百M或者一两G),比较适用此方案。

      方案实现思路:不使用join算子进行链接操做,而使用Broadcast变量与map类算子实现join操做,进而彻底规避掉shuffle类的操做,完全避免数据倾斜的发生和出现。将较小RDD中的数据直接经过collect算子拉取到Driver端的内存中来,而后对其建立一个Broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照链接key进行比对,若是链接key相同的话,那么就将两个RDD的数据用你须要的方式链接起来。

      方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。具体原理以下图所示。

      方案优势:对join操做致使的数据倾斜,效果很是好,由于根本就不会发生shuffle,也就根本不会发生数据倾斜。

      方案缺点:适用场景较少,由于这个方案只适用于一个大表和一个小表的状况。毕竟咱们须要将小表进行广播,此时会比较消耗内存资源,driver和每一个Executor内存中都会驻留一份小RDD的全量数据。若是咱们广播出去的RDD数据比较大,好比10G以上,那么就可能发生内存溢出了。所以并不适合两个都是大表的状况。

 

[plain]  view plain  copy
 
 print?在CODE上查看代码片派生到个人代码片
  1. <span style="font-size:12px;">// 首先将数据量比较小的RDD的数据,collect到Driver中来。    
  2. List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()    
  3. // 而后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每一个Executor就只有一份RDD的数据。    
  4. // 能够尽量节省内存空间,而且减小网络传输性能开销。    
  5. final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);    
  6.     
  7. // 对另一个RDD执行map类操做,而再也不是join类操做。    
  8. JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(    
  9.         new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {    
  10.             private static final long serialVersionUID = 1L;    
  11.             @Override    
  12.             public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)    
  13.                     throws Exception {    
  14.                 // 在算子函数中,经过广播变量,获取到本地Executor中的rdd1数据。    
  15.                 List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();    
  16.                 // 能够将rdd1的数据转换为一个Map,便于后面进行join操做。    
  17.                 Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();    
  18.                 for(Tuple2<Long, Row> data : rdd1Data) {    
  19.                     rdd1DataMap.put(data._1, data._2);    
  20.                 }    
  21.                 // 获取当前RDD数据的key以及value。    
  22.                 String key = tuple._1;    
  23.                 String value = tuple._2;    
  24.                 // 从rdd1数据Map中,根据key获取到能够join到的数据。    
  25.                 Row rdd1Value = rdd1DataMap.get(key);    
  26.                 return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));    
  27.             }    
  28.         });    
  29.     
  30. // 这里得提示一下。    
  31. // 上面的作法,仅仅适用于rdd1中的key没有重复,所有是惟一的场景。    
  32. // 若是rdd1中有多个相同的key,那么就得用flatMap类的操做,在进行join的时候不能用map,而是得遍历rdd1全部数据进行join。    
  33. // rdd2中每条数据均可能会返回多条join后的数据。</span>  


 

解决方案六:采样倾斜key并分拆join操做

      方案适用场景:两个RDD/Hive表进行join的时候,若是数据量都比较大,没法采用“解决方案五”,那么此时能够看一下两个RDD/Hive表中的key分布状况。若是出现数据倾斜,是由于其中某一个RDD/Hive表中的少数几个key的数据量过大,而另外一个RDD/Hive表中的全部key都分布比较均匀,那么采用这个解决方案是比较合适的。

      方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,经过sample算子采样出一份样原本,而后统计一下每一个key的数量,计算出来数据量最大的是哪几个key。
  • 而后将这几个key对应的数据从原来的RDD中拆分出来,造成一个单独的RDD,并给每一个key都打上n之内的随机数做为前缀,而不会致使倾斜的大部分key造成另一个RDD。
  • 接着将须要join的另外一个RDD,也过滤出来那几个倾斜key对应的数据并造成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会致使倾斜的大部分key也造成另一个RDD。
  • 再将附加了随机前缀的独立RDD与另外一个膨胀n倍的独立RDD进行join,此时就能够将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join便可。
  • 最后将两次join的结果使用union算子合并起来便可,就是最终的join结果。

      方案实现原理:对于join致使的数据倾斜,若是只是某几个key致使了倾斜,能够将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

      方案优势:对于join致使的数据倾斜,若是只是某几个key致使了倾斜,采用该方式能够用最有效的方式打散key进行join。并且只须要针对少数倾斜key对应的数据进行扩容n倍,不须要对全量数据进行扩容。避免了占用过多内存。

      方案缺点:若是致使倾斜的key特别多的话,好比成千上万个key都致使数据倾斜,那么这种方式也不适合。

 

[plain]  view plain  copy
 
 print?在CODE上查看代码片派生到个人代码片

// 首先从包含了少数几个致使数据倾斜key的rdd1中,采样10%的样本数据。    

  1. JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);    
  2. // 对样本数据RDD统计出每一个key的出现次数,并按出现次数降序排序。    
  3. // 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。    
  4. // 具体取出多少个数据量最多的key,由你们本身决定,咱们这里就取1个做为示范。    
  5. JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(    
  6. new PairFunction<Tuple2<Long,String>, Long, Long>() {    
  7. private static final long serialVersionUID = 1L;    
  8. @Override    
  9. public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)    
  10. throws Exception {    
  11. return new Tuple2<Long, Long>(tuple._1, 1L);    
  12. }    
  13. });    
  14. JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(    
  15. new Function2<Long, Long, Long>() {    
  16. private static final long serialVersionUID = 1L;    
  17. @Override    
  18. public Long call(Long v1, Long v2) throws Exception {    
  19. return v1 + v2;    
  20. }    
  21. });    
  22. JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(    
  23. new PairFunction<Tuple2<Long,Long>, Long, Long>() {    
  24. private static final long serialVersionUID = 1L;    
  25. @Override    
  26. public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)    
  27. throws Exception {    
  28. return new Tuple2<Long, Long>(tuple._2, tuple._1);    
  29. }    
  30. });    
  31. //take(1)取1个致使数据倾斜的数量最多的key做为示范
  32. final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;    
  33. // 从rdd1中分拆出致使数据倾斜的key,造成独立的RDD。    
  34. JavaPairRDD<Long, String> skewedRDD = rdd1.filter(    
  35. new Function<Tuple2<Long,String>, Boolean>() {    
  36. private static final long serialVersionUID = 1L;    
  37. @Override    
  38. public Boolean call(Tuple2<Long, String> tuple) throws Exception {    
  39. return tuple._1.equals(skewedUserid);    
  40. }    
  41. });    
  42. // 从rdd1中分拆出不致使数据倾斜的普通key,造成独立的RDD。    
  43. JavaPairRDD<Long, String> commonRDD = rdd1.filter(    
  44. new Function<Tuple2<Long,String>, Boolean>() {    
  45. private static final long serialVersionUID = 1L;    
  46. @Override    
  47. public Boolean call(Tuple2<Long, String> tuple) throws Exception {    
  48. return !tuple._1.equals(skewedUserid);    
  49. }    
  50. });    
  51. // rdd2,就是那个全部key的分布相对较为均匀的rdd。    
  52. // 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。    
  53. // 对扩容的每条数据,都打上0~100的前缀。    
  54. JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(    
  55. new Function<Tuple2<Long,Row>, Boolean>() {    
  56. private static final long serialVersionUID = 1L;    
  57. @Override    
  58. public Boolean call(Tuple2<Long, Row> tuple) throws Exception {    
  59. return tuple._1.equals(skewedUserid);    
  60. }    
  61. }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {    
  62. private static final long serialVersionUID = 1L;    
  63. @Override    
  64. public Iterable<Tuple2<String, Row>> call(    
  65. Tuple2<Long, Row> tuple) throws Exception {    
  66. Random random = new Random();    
  67. List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();    
  68. for(int i = 0; i < 100; i++) {    
  69. list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));    
  70. }    
  71. return list;    
  72. }    
  73. });    
  74. // 将rdd1中分拆出来的致使倾斜的key的独立rdd,每条数据都打上100之内的随机前缀。    
  75. // 而后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。    
  76. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(    
  77. new PairFunction<Tuple2<Long,String>, String, String>() {    
  78. private static final long serialVersionUID = 1L;    
  79. @Override    
  80. public Tuple2<String, String> call(Tuple2<Long, String> tuple)    
  81. throws Exception {    
  82. Random random = new Random();    
  83. int prefix = random.nextInt(100);    
  84. return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);    
  85. }    
  86. })    
  87. .join(skewedUserid2infoRDD)    
  88. .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {    
  89. private static final long serialVersionUID = 1L;    
  90. @Override    
  91. public Tuple2<Long, Tuple2<String, Row>> call(    
  92. Tuple2<String, Tuple2<String, Row>> tuple)    
  93. throws Exception {    
  94. long key = Long.valueOf(tuple._1.split("_")[1]);    
  95. return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);    
  96. }    
  97. });    
  98. // 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。    
  99. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);    
  100. // 将倾斜key join后的结果与普通key join后的结果,uinon起来。    
  101. // 就是最终的join结果。    
  102. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);


 

解决方案七:使用随机前缀和扩容RDD进行join

      方案适用场景:若是在进行join操做时,RDD中有大量的key致使数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”相似,首先查看RDD/Hive表中的数据分布状况,找到那个形成数据倾斜的RDD/Hive表,好比有多个key都对应了超过1万条数据。
  • 而后将该RDD的每条数据都打上一个n之内的随机前缀。
  • 同时对另一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join便可。

      方案实现原理:将原先同样的key经过附加随机前缀变成不同的key,而后就能够将这些处理后的“不一样key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不一样之处就在于,上一种方案是尽可能只对少数倾斜key对应的数据进行特殊处理,因为处理过程须要扩容RDD,所以上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的状况,无法将部分key拆分出来进行单独处理,所以只能对整个RDD进行数据扩容,对内存资源要求很高。

      方案优势:对join类型的数据倾斜基本均可以处理,并且效果也相对比较显著,性能提高效果很是不错。

      方案缺点:该方案更多的是缓解数据倾斜,而不是完全避免数据倾斜。并且须要对整个RDD进行扩容,对内存资源要求很高。

      方案实践经验:曾经开发一个数据需求的时候,发现一个join致使了数据倾斜。优化以前,做业的执行时间大约是60分钟左右;使用该方案优化以后,执行时间缩短到10分钟左右,性能提高了6倍。

 

[plain]  view plain  copy
 
 print?在CODE上查看代码片派生到个人代码片
  1. // 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。  
  2. JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(  
  3.         new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {  
  4.             private static final long serialVersionUID = 1L;  
  5.             @Override  
  6.             public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)  
  7.                     throws Exception {  
  8.                 List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();  
  9.                 for(int i = 0; i < 100; i++) {  
  10.                     list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));  
  11.                 }  
  12.                 return list;  
  13.             }  
  14.         });  
  15.   
  16. // 其次,将另外一个有数据倾斜key的RDD,每条数据都打上100之内的随机前缀。  
  17. JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(  
  18.         new PairFunction<Tuple2<Long,String>, String, String>() {  
  19.             private static final long serialVersionUID = 1L;  
  20.             @Override  
  21.             public Tuple2<String, String> call(Tuple2<Long, String> tuple)  
  22.                     throws Exception {  
  23.                 Random random = new Random();  
  24.                 int prefix = random.nextInt(100);  
  25.                 return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);  
  26.             }  
  27.         });  
  28.   
  29. // 将两个处理后的RDD进行join便可。  
  30. JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);  

 

解决方案八:多种方案组合使用

      在实践中发现,不少状况下,若是只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就能够解决。可是若是要处理一个较为复杂的数据倾斜场景,那么可能须要将多种方案组合起来使用。好比说,咱们针对出现了多个数据倾斜环节的Spark做业,能够先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次能够对某些shuffle操做提高并行度,优化其性能;最后还能够针对不一样的聚合或join操做,选择一种方案来优化其性能。你们须要对这些方案的思路和原理都透彻理解以后,在实践中根据各类不一样的状况,灵活运用多种方案,来解决本身的数据倾斜问题。

本文转载自:http://tech.meituan.com/spark-tuning-basic.html 

相关文章
相关标签/搜索