【转载】Spark性能优化指南——高级篇

前言

继基础篇讲解了每一个Spark开发人员都必须熟知的开发调优与资源调优以后,本文做为《Spark性能优化指南》的高级篇,将深刻分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。html

数据倾斜调优

调优概述

有的时候,咱们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark做业的性能会比指望差不少。数据倾斜调优,就是使用各类技术方案解决不一样类型的数据倾斜问题,以保证Spark做业的性能。
算法

数据倾斜发生时的现象

  • 绝大多数task执行得都很是快,但个别task执行极慢。好比,总共有1000个task,997个task都在1分钟以内执行完了,可是剩余两三个task却要一两个小时。这种状况很常见。
  • 本来可以正常执行的Spark做业,某天忽然报出OOM(内存溢出)异常,观察异常栈,是咱们写的业务代码形成的。这种状况比较少见。

数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,好比按照key进行聚合或join等操做。此时若是某个key对应的数据量特别大的话,就会发生数据倾斜。好比大部分key对应10条数据,可是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,而后1秒钟就运行完了;可是个别task可能分配到了100万数据,要运行一两个小时。所以,整个Spark做业的运行进度是由运行时间最长的那个task决定的。sql

所以出现数据倾斜的时候,Spark做业看起来会运行得很是缓慢,甚至可能由于某个task处理的数据量过大致使内存溢出。shell

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,因此另外两个task只要分别处理1条数据便可。此时第一个task的运行时间多是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。后端

如何定位致使数据倾斜的代码

数据倾斜只会发生在shuffle过程当中。这里给你们罗列一些经常使用的而且可能会触发shuffle操做的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所致使的。性能优化

某个task执行特别慢的状况

首先要看的,就是数据倾斜发生在第几个stage中。网络

若是是用yarn-client模式提交,那么本地是直接能够看到log的,能够在log中找到当前运行到了第几个stage;若是是用yarn-cluster模式提交,则能够经过Spark Web UI来查看当前运行到了第几个stage。此外,不管是使用yarn-client模式仍是yarn-cluster模式,咱们均可以在Spark Web UI上深刻看一下当前这个stage各个task分配的数据量,从而进一步肯定是否是task分配的数据不均匀致使了数据倾斜。数据结构

好比下图中,倒数第三列显示了每一个task的运行时间。明显能够看到,有的task运行特别快,只须要几秒钟就能够运行完;而有的task运行特别慢,须要几分钟才能运行完,此时单从运行时间上看就已经可以肯定发生数据倾斜了。此外,倒数第一列显示了每一个task处理的数据量,明显能够看到,运行时间特别短的task只须要处理几百KB的数据便可,而运行时间特别长的task须要处理几千KB的数据,处理的数据量差了10倍。此时更加可以肯定是发生了数据倾斜。
app

知道数据倾斜发生在哪个stage以后,接着咱们就须要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中确定会有一个shuffle类算子。精准推算stage与代码的对应关系,须要对Spark的源码有深刻的理解,这里咱们能够介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会致使shuffle的语句(好比group by语句),那么就能够断定,以那个地方为界限划分出了先后两个stage。dom

这里咱们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大体推算出一个stage对应的代码。以下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,所以就能够认为,以这个算子为界限,会划分出先后两个stage。

  • stage0,主要是执行从textFile到map操做,以及执行shuffle write操做。shuffle write操做,咱们能够简单理解为对pairs RDD中的数据进行分区操做,每一个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操做,stage1的各个task一开始运行,就会首先执行shuffle read操做。执行shuffle read操做的task,会从stage0的各个task所在节点拉取属于本身处理的那些key,而后对同一个key进行全局性的聚合或join等操做,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子以后,就计算出了最终的wordCounts RDD,而后会执行collect算子,将全部数据拉取到Driver上,供咱们遍历和打印输出。
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

经过对单词计数程序的分析,但愿可以让你们了解最基本的stage划分的原理,以及stage划分后shuffle操做是如何在两个stage的边界处执行的。而后咱们就知道如何快速定位出发生数据倾斜的stage对应代码的哪个部分了。好比咱们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,断定stage1出现了数据倾斜,那么就能够回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就能够肯定是由educeByKey算子致使的数据倾斜问题。好比某个单词出现了100万次,其余单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的状况

这种状况下去定位出问题的代码就比较容易了。咱们建议直接看yarn-client模式下本地log的异常栈,或者是经过YARN查看yarn-cluster模式下的log中的异常栈。通常来讲,经过异常栈信息就能够定位到你的代码中哪一行发生了内存溢出。而后在那行代码附近找找,通常也会有shuffle类算子,此时极可能就是这个算子致使了数据倾斜。

可是你们要注意的是,不能单纯靠偶然的内存溢出就断定发生了数据倾斜。由于本身编写的代码的bug,以及偶然出现的数据异常,也可能会致使内存溢出。所以仍是要按照上面所讲的方法,经过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能肯定是不是因为数据倾斜才致使了此次内存溢出。

查看致使数据倾斜的key的数据分布状况

知道了数据倾斜发生在哪里以后,一般须要分析一下那个执行了shuffle操做而且致使了数据倾斜的RDD/Hive表,查看一下其中key的分布状况。这主要是为以后选择哪种技术方案提供依据。针对不一样的key分布与不一样的shuffle算子组合起来的各类状况,可能须要选择不一样的技术方案来解决。

此时根据你执行操做的状况不一样,能够有不少种查看key分布的方式:

  • 若是是Spark SQL中的group by、join语句致使的数据倾斜,那么就查询一下SQL中使用的表的key分布状况。
  • 若是是对Spark RDD执行shuffle算子致使的数据倾斜,那么能够在Spark做业中加入查看key分布的代码,好比RDD.countByKey()。而后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就能够看到key的分布状况。

举例来讲,对于上面所说的单词计数程序,若是肯定了是stage1的reduceByKey算子致使了数据倾斜,那么就应该看看进行reduceByKey操做的RDD中的key分布状况,在这个例子中指的就是pairs RDD。以下示例,咱们能够先对pairs采样10%的样本数据,而后使用countByKey算子统计出每一个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

方案适用场景:致使数据倾斜的是Hive表。若是该Hive表中的数据自己很不均匀(好比某个key对应了100万数据,其余key才对应了10条数据),并且业务场景须要频繁使用Spark对Hive表执行某个分析操做,那么比较适合使用这种技术方案。

方案实现思路:此时能够评估一下,是否能够经过Hive来进行数据预处理(即经过Hive ETL预先对数据按照key进行聚合,或者是预先和其余表进行join),而后在Spark做业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时因为数据已经预先进行过聚合或join操做了,那么在Spark做业中也就不须要使用原先的shuffle类算子执行这类操做了。

方案实现原理:这种方案从根源上解决了数据倾斜,由于完全避免了在Spark中执行shuffle类算子,那么确定就不会有数据倾斜的问题了。可是这里也要提醒一下你们,这种方式属于治标不治本。由于毕竟数据自己就存在分布不均匀的问题,因此Hive ETL中进行group by或者join等shuffle操做时,仍是会出现数据倾斜,致使Hive ETL的速度很慢。咱们只是把数据倾斜的发生提早到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优势:实现起来简单便捷,效果还很是好,彻底规避掉了数据倾斜,Spark做业的性能会大幅度提高。

方案缺点:治标不治本,Hive ETL中仍是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark做业的场景,并且对Spark做业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提早到上游的Hive ETL,天天仅执行一次,只有那一次是比较慢的,而以后每次Java调用Spark做业时,执行速度都会很快,可以提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是容许用户经过Java Web系统提交数据分析统计任务,后端经过Java提交Spark做业进行数据分析统计。要求Spark做业速度必需要快,尽可能在10分钟之内,不然速度太慢,用户体验会不好。因此咱们将有些Spark做业的shuffle操做提早到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽量地减小Spark的shuffle操做,大幅度提高了性能,将部分做业的性能提高了6倍以上。

解决方案二:过滤少数致使倾斜的key

方案适用场景:若是发现致使倾斜的key就少数几个,并且对计算自己的影响并不大的话,那么很适合使用这种方案。好比99%的key就对应10条数据,可是只有一个key对应了100万数据,从而致使了数据倾斜。

方案实现思路:若是咱们判断那少数几个数据量特别多的key,对做业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。好比,在Spark SQL中可使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。若是须要每次做业执行时,动态断定哪些key的数据量最多而后再进行过滤,那么可使用sample算子对RDD进行采样,而后计算出每一个key的数量,取数据量最多的key过滤掉便可。

方案实现原理:将致使数据倾斜的key给过滤掉以后,这些key就不会参与计算了,天然不可能产生数据倾斜。

方案优势:实现简单,并且效果也很好,能够彻底规避掉数据倾斜。

方案缺点:适用场景很少,大多数状况下,致使倾斜的key仍是不少的,并非只有少数几个。

方案实践经验:在项目中咱们也采用过这种方案解决数据倾斜。有一次发现某一天Spark做业在运行的时候忽然OOM了,追查以后发现,是Hive表中的某一个key在那天数据异常,致使数据量暴增。所以就采起每次执行前先进行采样,计算出样本中数据量最大的几个key以后,直接在程序中将那些key给过滤掉。

解决方案三:提升shuffle操做的并行度

方案适用场景:若是咱们必需要对数据倾斜迎难而上,那么建议优先使用这种方案,由于这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,好比reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,好比group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲都有点太小。

方案实现原理:增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了。具体原理以下图所示。

方案优势:实现起来比较简单,能够有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有完全根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案一般没法完全解决数据倾斜,由于若是出现一些极端状况,好比某个key对应的数据量有100万,那么不管你的task数量增长到多少,这个对应着100万数据的key确定仍是会分配到一个task中去处理,所以注定仍是会发生数据倾斜的。因此这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其余方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个key都打上一个随机数,好比10之内的随机数,此时原先同样的key就变成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。而后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操做,就能够获得最终结果了,好比(hello, 4)。

方案实现原理:将本来相同的key经过附加随机前缀的方式,变成多个不一样的key,就可让本来被一个task处理的数据分散到多个task上去作局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就能够获得最终的结果。具体原理见下图。

方案优势:对于聚合类的shuffle操做致使的数据倾斜,效果是很是不错的。一般均可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark做业的性能提高数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操做,适用范围相对较窄。若是是join类的shuffle操做,还得用其余的解决方案。

// 第一步,给RDD中的每一个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每一个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操做,或者是在Spark SQL中使用join语句时,并且join操做中的一个RDD或表的数据量比较小(好比几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行链接操做,而使用Broadcast变量与map类算子实现join操做,进而彻底规避掉shuffle类的操做,完全避免数据倾斜的发生和出现。将较小RDD中的数据直接经过collect算子拉取到Driver端的内存中来,而后对其建立一个Broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照链接key进行比对,若是链接key相同的话,那么就将两个RDD的数据用你须要的方式链接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。具体原理以下图所示。

方案优势:对join操做致使的数据倾斜,效果很是好,由于根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,由于这个方案只适用于一个大表和一个小表的状况。毕竟咱们须要将小表进行广播,此时会比较消耗内存资源,driver和每一个Executor内存中都会驻留一份小RDD的全量数据。若是咱们广播出去的RDD数据比较大,好比10G以上,那么就可能发生内存溢出了。所以并不适合两个都是大表的状况。

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 而后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每一个Executor就只有一份RDD的数据。
// 能够尽量节省内存空间,而且减小网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另一个RDD执行map类操做,而再也不是join类操做。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,经过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 能够将rdd1的数据转换为一个Map,便于后面进行join操做。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到能够join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 这里得提示一下。
// 上面的作法,仅仅适用于rdd1中的key没有重复,所有是惟一的场景。
// 若是rdd1中有多个相同的key,那么就得用flatMap类的操做,在进行join的时候不能用map,而是得遍历rdd1全部数据进行join。
// rdd2中每条数据均可能会返回多条join后的数据。

解决方案六:采样倾斜key并分拆join操做

方案适用场景:两个RDD/Hive表进行join的时候,若是数据量都比较大,没法采用“解决方案五”,那么此时能够看一下两个RDD/Hive表中的key分布状况。若是出现数据倾斜,是由于其中某一个RDD/Hive表中的少数几个key的数据量过大,而另外一个RDD/Hive表中的全部key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路

对包含少数几个数据量过大的key的那个RDD,经过sample算子采样出一份样原本,而后统计一下每一个key的数量,计算出来数据量最大的是哪几个key。
而后将这几个key对应的数据从原来的RDD中拆分出来,造成一个单独的RDD,并给每一个key都打上n之内的随机数做为前缀,而不会致使倾斜的大部分key造成另一个RDD。
接着将须要join的另外一个RDD,也过滤出来那几个倾斜key对应的数据并造成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会致使倾斜的大部分key也造成另一个RDD。
再将附加了随机前缀的独立RDD与另外一个膨胀n倍的独立RDD进行join,此时就能够将原先相同的key打散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join便可。
最后将两次join的结果使用union算子合并起来便可,就是最终的join结果。
方案实现原理:对于join致使的数据倾斜,若是只是某几个key致使了倾斜,能够将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优势:对于join致使的数据倾斜,若是只是某几个key致使了倾斜,采用该方式能够用最有效的方式打散key进行join。并且只须要针对少数倾斜key对应的数据进行扩容n倍,不须要对全量数据进行扩容。避免了占用过多内存。

方案缺点:若是致使倾斜的key特别多的话,好比成千上万个key都致使数据倾斜,那么这种方式也不适合。

// 首先从包含了少数几个致使数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每一个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由你们本身决定,咱们这里就取1个做为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出致使数据倾斜的key,造成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不致使数据倾斜的普通key,造成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那个全部key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 将rdd1中分拆出来的致使倾斜的key的独立rdd,每条数据都打上100之内的随机前缀。
// 而后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:若是在进行join操做时,RDD中有大量的key致使数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路
该方案的实现思路基本和“解决方案六”相似,首先查看RDD/Hive表中的数据分布状况,找到那个形成数据倾斜的RDD/Hive表,好比有多个key都对应了超过1万条数据。
而后将该RDD的每条数据都打上一个n之内的随机前缀。
同时对另一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
最后将两个处理后的RDD进行join便可。
方案实现原理:将原先同样的key经过附加随机前缀变成不同的key,而后就能够将这些处理后的“不一样key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不一样之处就在于,上一种方案是尽可能只对少数倾斜key对应的数据进行特殊处理,因为处理过程须要扩容RDD,所以上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的状况,无法将部分key拆分出来进行单独处理,所以只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优势:对join类型的数据倾斜基本均可以处理,并且效果也相对比较显著,性能提高效果很是不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是完全避免数据倾斜。并且须要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join致使了数据倾斜。优化以前,做业的执行时间大约是60分钟左右;使用该方案优化以后,执行时间缩短到10分钟左右,性能提高了6倍。

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,将另外一个有数据倾斜key的RDD,每条数据都打上100之内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 将两个处理后的RDD进行join便可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解决方案八:多种方案组合使用

在实践中发现,不少状况下,若是只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就能够解决。可是若是要处理一个较为复杂的数据倾斜场景,那么可能须要将多种方案组合起来使用。好比说,咱们针对出现了多个数据倾斜环节的Spark做业,能够先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次能够对某些shuffle操做提高并行度,优化其性能;最后还能够针对不一样的聚合或join操做,选择一种方案来优化其性能。你们须要对这些方案的思路和原理都透彻理解以后,在实践中根据各类不一样的状况,灵活运用多种方案,来解决本身的数据倾斜问题。

shuffle调优

调优概述

大多数Spark做业的性能主要就是消耗在了shuffle环节,由于该环节包含了大量的磁盘IO、序列化、网络数据传输等操做。所以,若是要让做业的性能更上一层楼,就有必要对shuffle过程进行调优。可是也必须提醒你们的是,影响一个Spark做业性能的因素,主要仍是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。所以你们务必把握住调优的基本原则,千万不要舍本逐末。下面咱们就给你们详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得愈来愈先进。

在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。

所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。

下面咱们详细分析一下HashShuffleManager和SortShuffleManager的原理。

HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里咱们先明确一个假设前提:每一个Executor只有1个CPU core,也就是说,不管这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

咱们先从shuffle write开始提及。shuffle write阶段,主要就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。

那么每一个执行shuffle write的task,要为下一个stage建立多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每一个task就要建立多少份磁盘文件。好比下一个stage总共有100个task,那么当前stage的每一个task都要建立100份磁盘文件。若是当前stage有50个task,总共有10个Executor,每一个Executor执行5个Task,那么每一个Executor上总共就要建立500个磁盘文件,全部Executor上会建立5000个磁盘文件。因而可知,未经优化的shuffle write操做所产生的磁盘文件的数量是极其惊人的。

接着咱们来讲说shuffle read。shuffle read,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,task给下游stage的每一个task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个task只要从上游stage的全部task所在节点上,拉取属于本身的那一个磁盘文件便可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果。

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指咱们能够设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true便可开启优化机制。一般来讲,若是咱们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制以后,在shuffle write过程当中,task就不是为下游stage的每一个task建立一个磁盘文件了。此时会出现shuffleFileGroup的概念,每一个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。所以,consolidate机制容许不一样的task复用同一批磁盘文件,这样就能够有效将多个task的磁盘文件进行必定程度上的合并,从而大幅度减小磁盘文件的数量,进而提高shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共仍是有10个Executor,每一个Executor执行5个task。那么本来使用未经优化的HashShuffleManager时,每一个Executor会产生500个磁盘文件,全部Executor会产生5000个磁盘文件的。可是此时通过优化以后,每一个Executor建立的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每一个Executor此时只会建立100个磁盘文件,全部Executor只会建立1000个磁盘文件。

SortShuffleManager运行原理

SortShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。

在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是经过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢以后再一次写入磁盘文件中,这样能够减小磁盘IO次数,提高性能。

一个task将全部数据写入内存数据结构的过程当中,会发生屡次磁盘溢写操做,也就会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,这就是merge过程,此时会将以前全部临时磁盘文件中的数据读取出来,而后依次写入最终的磁盘文件之中。此外,因为一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,所以还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量。好比第一个stage有50个task,总共有10个Executor,每一个Executor执行5个task,而第二个stage有100个task。因为每一个task最终只有一个磁盘文件,所以此时每一个Executor上只有5个磁盘文件,全部Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件以下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(好比reduceByKey)。
    此时task会为每一个下游task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不一样在于:第一,磁盘写机制不一样;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。

shuffle相关参数调优

如下是Shffule过程当中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。
  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比64k),从而减小shuffle write过程当中溢写磁盘文件的次数,也就能够减小磁盘IO次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。
    调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。
    调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数表明了Executor内存中,分配给shuffle read task进行聚合操做的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给shuffle read的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个* * 参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。
  • 调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

写在最后的话

本文分别讲解了开发过程当中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。但愿你们可以在阅读本文以后,记住这些性能调优的原则以及方案,在Spark做业开发、测试以及运行的过程当中多尝试,只有这样,咱们才能开发出更优的Spark做业,不断提高其性能。

转自: http://tech.meituan.com/spark-tuning-pro.html?from=timeline

相关文章
相关标签/搜索