spak数据倾斜解决方案

数据倾斜解决方案

数据倾斜的解决,跟以前讲解的性能调优,有一点殊途同归之妙。sql

性能调优中最有效最直接最简单的方式就是加资源加并行度,并注意RDD架构(复用同一个RDD,加上cache缓存)。相对于前面,shuffle、jvm等是次要的。缓存

6.一、原理以及现象分析

6.1.一、数据倾斜怎么出现的

在执行shuffle操做的时候,是按照key,来进行values的数据的输出、拉取和聚合的。架构

同一个key的values,必定是分配到一个reduce task进行处理的。jvm

多个key对应的values,好比一共是90万。可能某个key对应了88万数据,被分配到一个task上去面去执行。函数

另外两个task,可能各分配到了1万数据,多是数百个key,对应的1万条数据。性能

这样就会出现数据倾斜问题。spa

想象一下,出现数据倾斜之后的运行的状况。很糟糕!对象

其中两个task,各分配到了1万数据,可能同时在10分钟内都运行完了。另一个task有88万条,88 * 10 =  880分钟 = 14.5个小时。blog

你们看,原本另外两个task很快就运行完毕了(10分钟),可是因为一个拖后腿的家伙,第三个task,要14.5个小时才能运行完,就致使整个spark做业,也得14.5个小时才能运行完。内存

数据倾斜,一旦出现,是否是性能杀手?!

6.1.二、发生数据倾斜之后的现象

Spark数据倾斜,有两种表现:

一、你的大部分的task,都执行的特别特别快,(你要用client模式,standalone client,yarn client,本地机器一执行spark-submit脚本,就会开始打印log),task175 finished,剩下几个task,执行的特别特别慢,前面的task,通常1s能够执行完5个,最后发现1000个task,998,999 task,要执行1个小时,2个小时才能执行完一个task。

出现以上loginfo,就代表出现数据倾斜了。

这样还算好的,由于虽然老牛拉破车同样很是慢,可是至少还能跑。

二、另外一种状况是,运行的时候,其余task都执行完了,也没什么特别的问题,可是有的task,就是会忽然间报了一个OOM,JVM Out Of Memory,内存溢出了,task failed,task lost,resubmitting task。反复执行几回都到了某个task就是跑不通,最后就挂掉。

某个task就直接OOM,那么基本上也是由于数据倾斜了,task分配的数量实在是太大了!因此内存放不下,而后你的task每处理一条数据,还要建立大量的对象,内存爆掉了。

这样也代表出现数据倾斜了。

这种就不太好了,由于你的程序若是不去解决数据倾斜的问题,压根儿就跑不出来。

做业都跑不完,还谈什么性能调优这些东西?!

6.1.三、定位数据倾斜出现的缘由与出现问题的位置

根据log去定位

出现数据倾斜的缘由,基本只多是由于发生了shuffle操做,在shuffle的过程当中,出现了数据倾斜的问题。由于某个或者某些key对应的数据,远远的高于其余的key。

一、你在本身的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

二、看log

log通常会报是在你的哪一行代码,致使了OOM异常。或者看log,看看是执行到了第几个stage。spark代码,是怎么划分红一个一个的stage的。哪个stage生成的task特别慢,就可以本身用肉眼去对你的spark代码进行stage的划分,就可以经过stage定位到你的代码,到底哪里发生了数据倾斜。

 一、使用Hive ETL预处理数据

方案适用场景:

若是致使数据倾斜的是Hive表。若是该Hive表中的数据自己很不均匀(好比某个key对应了100万数据,其余key才对应了10条数据),并且业务场景须要频繁使用Spark对Hive表执行某个分析操做,那么比较适合使用这种技术方案。

方案实现思路:

此时能够评估一下,是否能够经过Hive来进行数据预处理(即经过Hive ETL预先对数据按照key进行聚合,或者是预先和其余表进行join),而后在Spark做业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时因为数据已经预先进行过聚合或join操做了,那么在Spark做业中也就不须要使用原先的shuffle类算子执行这类操做了。

方案实现原理:

这种方案从根源上解决了数据倾斜,由于完全避免了在Spark中执行shuffle类算子,那么确定就不会有数据倾斜的问题了。可是这里也要提醒一下你们,这种方式属于治标不治本。由于毕竟数据自己就存在分布不均匀的问题,因此Hive ETL中进行group by或者join等shuffle操做时,仍是会出现数据倾斜,致使Hive ETL的速度很慢。咱们只是把数据倾斜的发生提早到了Hive ETL中,避免Spark程序发生数据倾斜而已。

 

二、过滤少数致使倾斜的key

方案适用场景:

若是发现致使倾斜的key就少数几个,并且对计算自己的影响并不大的话,那么很适合使用这种方案。好比99%的key就对应10条数据,可是只有一个key对应了100万数据,从而致使了数据倾斜。

方案实现思路:

若是咱们判断那少数几个数据量特别多的key,对做业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。好比,在Spark SQL中可使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。若是须要每次做业执行时,动态断定哪些key的数据量最多而后再进行过滤,那么可使用sample算子对RDD进行采样,而后计算出每一个key的数量,取数据量最多的key过滤掉便可。

方案实现原理:

将致使数据倾斜的key给过滤掉以后,这些key就不会参与计算了,天然不可能产生数据倾斜。

 

三、提升shuffle操做的并行度

方案实现思路:

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,好比reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,好比group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲都有点太小。

方案实现原理:

增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个不一样的key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了。

 

四、双重聚合

方案适用场景:

对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个key都打上一个随机数,好比10之内的随机数,此时原先同样的key就变成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。而后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操做,就能够获得最终结果了,好比(hello, 4)。

方案实现原理:

将本来相同的key经过附加随机前缀的方式,变成多个不一样的key,就可让本来被一个task处理的数据分散到多个task上去作局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就能够获得最终的结果。若是一个RDD中有一个key致使数据倾斜,同时还有其余的key,那么通常先对数据集进行抽样,而后找出倾斜的key,再使用filter对原始的RDD进行分离为两个RDD,一个是由倾斜的key组成的RDD1,一个是由其余的key组成的RDD2,那么对于RDD1可使用加随机前缀进行多分区多task计算,对于另外一个RDD2正常聚合计算,最后将结果再合并起来。

随机前缀加几,ReduceByKey分几个区。

 

五、将reduce join转为map join(完全避免数据倾斜)

BroadCast+filter(或者map)

方案适用场景:

在对RDD使用join类操做,或者是在Spark SQL中使用join语句时,并且join操做中的一个RDD或表的数据量比较小(好比几百M或者一两G),比较适用此方案。

方案实现思路:

不使用join算子进行链接操做,而使用Broadcast变量与map类算子实现join操做,进而彻底规避掉shuffle类的操做,完全避免数据倾斜的发生和出现。将较小RDD中的数据直接经过collect算子拉取到Driver端的内存中来,而后对其建立一个Broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照链接key进行比对,若是链接key相同的话,那么就将两个RDD的数据用你须要的方式链接起来。

方案实现原理:

普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。

 

六、采样倾斜key并分拆join操做

方案适用场景:

两个RDD/Hive表进行join的时候,若是数据量都比较大,没法采用“解决方案五”,那么此时能够看一下两个RDD/Hive表中的key分布状况。若是出现数据倾斜,是由于其中某一个RDD/Hive表中的少数几个key的数据量过大,而另外一个RDD/Hive表中的全部key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

对包含少数几个数据量过大的key的那个RDD,经过sample算子采样出一份样原本,而后统计一下每一个key的数量,计算出来数据量最大的是哪几个key。而后将这几个key对应的数据从原来的RDD中拆分出来,造成一个单独的RDD,并给每一个key都打上n之内的随机数做为前缀,而不会致使倾斜的大部分key造成另一个RDD。接着将须要join的另外一个RDD,也过滤出来那几个倾斜key对应的数据并造成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会致使倾斜的大部分key也造成另一个RDD。再将附加了随机前缀的独立RDD与另外一个膨胀n倍的独立RDD进行join,此时就能够将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join便可。最后将两次join的结果使用union算子合并起来便可,就是最终的join结果 。

 

七、使用随机前缀和扩容RDD进行join

 

方案适用场景:

若是在进行join操做时,RDD中有大量的key致使数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

该方案的实现思路基本和“解决方案六”相似,首先查看RDD/Hive表中的数据分布状况,找到那个形成数据倾斜的RDD/Hive表,好比有多个key都对应了超过1万条数据。而后将该RDD的每条数据都打上一个n之内的随机前缀。同时对另一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join便可。

相关文章
相关标签/搜索