Spark 解决数据倾斜的几种经常使用方法

数据倾斜是大数据计算中一个最棘手的问题,出现数据倾斜后,Spark 做业的性能会比指望值差不少。数据倾斜的调优,就是利用各类技术方案解决不一样类型的数据倾斜问题,保证 Spark 做业的性能。web

一,数据倾斜原理

一个 Spark 做业,会根据其内部的 Action 操做划分红多个 job,每一个 job 内部又会根据 shuffle 操做划分红多个 stage,而后每一个 stage 会分配多个 task 去执行任务。每一个 task 会领取一个 partition 的数据处理。sql

同一个 stage 内的 task 是能够并行处理数据的,具备依赖关系的不一样 stage 之间是串行处理的。因为这个处理机制,假设某个 Spark 做业中的某个 job 有两个 stage,分别为 stage0 和 stage1,那么 stage1 必需要等待 stage0 处理结束才能进行。若是 stage0 内部分配了 n 个 task 进行计算任务,其中有一个 task 领取的 partition 数据过大,执行了 1 个小时还没结束,其他的 n-1 个 task 在半小时内就执行结束了,都在等这最后一个 task 执行结束才能进入下一个 stage。这种因为某个 stage 内部的 task 领取的数据量过大的现象就是数据倾斜。session

下图就是一个例子:hello 这个 key 对应了 7 条数据,映射到了同一个 task 去处理了,剩余的 2 个 task 分别只处理了一个数据。ide

数据倾斜

二,数据倾斜发生的现象

1,绝大多数task执行得都很是快,但个别task执行极慢。好比,总共有1000个task,997个task都在1分钟以内执行完了,可是剩余两三个task却要一两个小时。这种状况很常见。性能

2,本来可以正常执行的Spark做业,某天忽然报出OOM(内存溢出)异常,观察异常栈,是咱们写的业务代码形成的。这种状况比较少见。大数据

三,如何定位数据倾斜的代码

数据倾斜只会发生在 shuffle 过程当中。经常使用而且可能会触发 shuffle 操做的算子有:distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition 等。出现数据倾斜,颇有可能就是使用了这些算子中的某一个致使的。spa

若是咱们是 yarn-client 模式提交,咱们能够在本地直接查看 log,在 log 中定位到当前运行到了哪一个 stage;若是用的 yarn-cluster 模式提交的话,咱们能够经过 spark web UI 来查看当前运行到了哪一个 stage。不管用的哪一种模式咱们均可以在 spark web UI 上面查看到当前这个 stage 的各个 task 的数据量和运行时间,从而可以进一步肯定是否是 task 的数据分配不均致使的数据倾斜。3d

当肯定了发生数据倾斜的 stage 后,咱们能够找出会触发 shuffle 的算子,推算出发生倾斜的那个 stage 对应代码。触发 shuffle 操做的除了上面提到的那些算子外,还要注意使用 spark sql 的某些 sql 语句,好比 group by 等。日志

四,解决方法

解决数据倾斜的思路就是保证每一个 stage 内部的 task 领取的数据足够均匀。一个是想办法让数据源在 Spark 内部计算粒度的这个维度上划分足够均匀,若是作不到这个,就要相办法将读取的数据源进行加工,尽可能保证均匀。大体有如下几种方案。cdn

1,聚合源数据和过滤致使倾斜的 key

a,聚合源数据

假设咱们的某个 Spark 做业的数据源是天天 ETL 存储到 Hive 中的数据,这些数据主要是电商平台天天用户的操做日志。Spark 做业中分析的粒度是 session,那么咱们能够在往 Hive 中写数据的时候就保证每条数据对应一个 session 的全部信息,也就是以 session 为粒度的将数据写入到 Hive 中。

这样咱们能够保证一点,咱们 Spark 做业中就没有必要作一些 groupByKey + map 的操做了,能够直接对每一个 key 的 value 进行 map 操做,计算咱们须要的数据。省去了 shuffle 操做,避免了 shuffle 时的数据倾斜。

可是,当咱们 Spark 做业中分析粒度不止一个粒度,好比除了 session 这个粒度外,还有 date 的粒度,userId 的粒度等等。这时候是没法保证这些全部粒度的数据都能聚合到一条数据上的。这时候咱们能够作个妥协,选择一个相对比较大的粒度,进行聚合数据。好比咱们按照原来的存储方式可能有 100W 条数据,但按照某个粒度,好比 date 这个粒度,进行聚合后存储,这样的话咱们的数据能够降到 50W 条,能够作到减轻数据倾斜的现象。

b,过滤致使倾斜的 key

好比说咱们的 Hive 中数据,共有 100W 个 key,其中有 5 个 key 对应的数据量很是的大,可能有几十万条数据(这种状况在电商平台上发生恶意刷单时候会出现),其它的 key 对应数据量都只有几十。若是咱们业务上面可以接受这 5 个 key 对应的数据能够舍弃。这种状况下咱们能够在用 sql 从 Hive 中取数据时过滤掉这个 5 个 key。从而避免了数据倾斜。

2,shuffle 操做提升 reduce 端的并行度

Spark 在作 shuffle 操做时,默认使用的是 HashPartitioner 对数据进行分区。若是 reduce 端的并行度设置的不合适,极可能形成大量不一样的 key 被分配到同一个 task 上去,形成某个 task 上处理的数据量大于其余 task,形成数据倾斜。

若是调整 reduce 端的并行度,可让 reduce 端的每一个 task 处理的数据减小,从而缓解数据倾斜。

设置方法:在对 RDD 执行 shuffle 算子时,给 shuffle 算子传入一个参数,好比 reduceByKey(1000) ,该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,好比 group by、join 等,须要设置一个参数,即 spark.sql.shuffle.partitions ,该参数表明了 shuffle read task 的并行度,该值默认是 200 ,对于不少场景来讲都有点太小。

3,使用随机 key 进行双重聚合

在 Spark 中使用 groupByKey 和 reduceByKey 这两个算子会进行 shuffle 操做。这时候若是 map 端的文件每一个 key 的数据量误差很大,很容易会形成数据倾斜。

咱们能够先对须要操做的数据中的 key 拼接上随机数进行打散分组,这样原来是一个 key 的数据可能会被分到多个 key 上,而后进行一次聚合,聚合完以后将原来拼在 key 上的随机数去掉,再进行聚合,这样对数据倾斜会有比较好的效果。

具体能够看下图:

随机 key 双重聚合

这种方案对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by 语句进行分组聚合时,比较适用这种方案。

4, 将 reduce-side join 转换成 map-side join

两个 RDD 在进行 join 时会有 shuffle 操做,若是每一个 key 对应的数据分布不均匀也会有数据倾斜发生。

这种状况下,若是两个 RDD 中某个 RDD 的数据量不大,能够将该 RDD 的数据提取出来,而后作成广播变量,将数据量大的那个 RDD 作 map 算子操做,而后在 map 算子内和广播变量进行 join,这样能够避免了 join 过程当中的 shuffle,也就避免了 shuffle 过程当中可能会出现的数据倾斜现象。

5,采样倾斜 key 并拆分 join 操做

当碰到这种状况:两个 RDD 进行 join 的时候,其中某个 RDD 中少数的几个 key 对应的数据量很大,致使了数据倾斜,而另一个 RDD 数据相对分布均匀。这时候咱们能够采用这种方法。

1,对包含少数几个数据量过大的 key 的那个 RDD,经过 sample 算子采样出一份样原本,而后统计一下每一个 key 的数量,计算出来数据量最大的是哪几个 key。

2,而后将这几个 key 对应的数据从原来的 RDD 中拆分出来,造成一个单独的 RDD,并给每一个 key 都打上 n 之内的随机数做为前缀,而不会致使倾斜的大部分 key 造成另一个 RDD。

3,接着将须要 join 的另外一个 RDD,也过滤出来那几个倾斜 key 对应的数据并造成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,不会致使倾斜的大部分 key 也造成另一个 RDD。

4,再将附加了随机前缀的独立 RDD 与另外一个膨胀 n 倍的独立 RDD 进行 join,此时就能够将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。

5, 而另外两个普通的 RDD 就照常 join 便可。 最后将两次 join 的结果使用 union 算子合并起来便可,就是最终的 join 结果。

具体能够看下图:

采样倾斜 key 分别 join

6,使用随机前缀和扩容 RDD 进行 join

若是在进行 join 操做时,RDD 中有大量的 key 致使数据倾斜,那么进行分拆 key 也没什么意义,这时候能够采起这种方案。

该方案的实现思路基本和上一种相似,首先查看 RDD 中数据分布状况,找到那个形成数据倾斜的 RDD,好比有多个 key 映射数据都很大。而后咱们将该 RDD 没调数据都打上一个 n 之内的随机前缀。同时对另外一个 RDD 进行扩容,将其没调数据扩容成 n 条数据,扩容出来的每条数据依次打上 0~n 的前缀。而后将这两个 RDD 进行 join。

这种方案和上一种比,少了取样的过程,由于上一种是针对某个 RDD 中只有很是少的几个 key 发生数据倾斜,须要针对这几个 key 特殊处理。而这个方案是针对某个 RDD 有大量的 key 发生数据倾斜,这时候就不必取样了。

相关文章
相关标签/搜索