- 本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/lqMu6lfk-Ny1ZHYruEeBdA
- 做者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。前后从事过电子商务、开放平台、移动浏览器、推荐广告和大数据、人工智能等相关开发和架构。目前在vivo智能平台中心从事 AI中台建设以及广告推荐业务。擅长各类业务形态的业务架构、平台化以及各类业务解决方案。
本文从数据倾斜的危害、现象、缘由等方面,由浅入深阐述Spark数据倾斜及其解决方案。sql
对 Spark/Hadoop 这样的分布式大数据系统来说,数据量大并不可怕,可怕的是数据倾斜。segmentfault
对于分布式系统而言,理想状况下,随着系统规模(节点数量)的增长,应用总体耗时线性降低。若是一台机器处理一批大量数据须要120分钟,当机器数量增长到3台时,理想的耗时为120 / 3 = 40分钟。可是,想作到分布式状况下每台机器执行时间是单机时的1 / N,就必须保证每台机器的任务量相等。不幸的是,不少时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。好比一台机器负责处理 80% 的任务,另外两台机器各处理 10% 的任务。浏览器
『不患多而患不均』,这是分布式环境下最大的问题。意味着计算能力不是线性扩展的,而是存在短板效应: 一个 Stage 所耗费的时间,是由最慢的那个 Task 决定。性能优化
因为同一个 Stage 内的全部 task 执行相同的计算,在排除不一样计算节点计算能力差别的前提下,不一样 task 之间耗时的差别主要由该 task 所处理的数据量决定。因此,要想发挥分布式系统并行计算的优点,就必须解决数据倾斜问题。微信
当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得总体耗时过大,未能充分发挥分布式系统的并行计算优点。 网络
另外,当发生数据倾斜时,部分任务处理的数据量过大,可能形成内存不足使得任务失败,并进而引进整个应用失败。 架构
当发现以下现象时,十有八九是发生数据倾斜了:并发
TIPS在 Spark streaming 程序中,数据倾斜更容易出现,特别是在程序中包含一些相似 sql 的 join、group 这种操做的时候。由于 Spark Streaming 程序在运行的时候,咱们通常不会分配特别多的内存,所以一旦在这个过程当中出现一些数据倾斜,就十分容易形成 OOM。app
在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,好比按照 key 进行聚合或 join 等操做。此时若是某个 key 对应的数据量特别大的话,就会发生数据倾斜。好比大部分 key 对应10条数据,可是个别 key 却对应了100万条数据,那么大部分 task 可能就只会分配到10条数据,而后1秒钟就运行完了;可是个别 task 可能分配到了100万数据,要运行一两个小时。负载均衡
所以出现数据倾斜的时候,Spark 做业看起来会运行得很是缓慢,甚至可能由于某个 task 处理的数据量过大致使内存溢出。
经过 Spark Web UI 来查看当前运行的 stage 各个 task 分配的数据量(Shuffle Read Size/Records),从而进一步肯定是否是 task 分配的数据不均匀致使了数据倾斜。
知道数据倾斜发生在哪个 stage 以后,接着咱们就须要根据 stage 划分原理,推算出来发生倾斜的那个 stage 对应代码中的哪一部分,这部分代码中确定会有一个 shuffle 类算子。能够经过 countByKey 查看各个 key 的分布。
TIPS数据倾斜只会发生在 shuffle 过程当中。这里给你们罗列一些经常使用的而且可能会触发 shuffle 操做的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所致使的。
也能够经过抽样统计 key 的出现次数验证。
因为数据量巨大,能够采用抽样的方式,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个:
df.select("key").sample(false, 0.1) // 数据采样 .(k => (k, 1)).reduceBykey(_ + _) // 统计 key 出现的次数 .map(k => (k._2, k._1)).sortByKey(false) // 根据 key 出现次数进行排序 .take(10) // 取前 10 个。
若是发现多数数据分布都较为平均,而个别数据比其余数据大上若干个数量级,则说明发生了数据倾斜。
若是致使数据倾斜的 key 是异常数据,那么简单的过滤掉就能够了。
首先要对 key 进行分析,判断是哪些 key 形成数据倾斜。具体方法上面已经介绍过了,这里不赘述。
而后对这些 key 对应的记录进行分析:
解决方案
对于第 1,2 种状况,直接对数据进行过滤便可。
第3种状况则须要特殊的处理,具体咱们下面详细介绍。
Spark 在作 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle)对数据进行分区。若是并行度设置的不合适,可能形成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,形成该 Task 所处理的数据远大于其它 Task,从而形成数据倾斜。
若是调整 Shuffle 时的并行度,使得本来被分配到同一 Task 的不一样 Key 发配到不一样 Task 上处理,则可下降原 Task 所需处理的数据量,从而缓解数据倾斜问题形成的短板效应。
(1)操做流程
RDD 操做 可在须要 Shuffle 的操做算子上直接设置并行度或者使用 spark.default.parallelism 设置。若是是 Spark SQL,还可经过 SET spark.sql.shuffle.partitions=[num_tasks] 设置并行度。默认参数由不一样的 Cluster Manager 控制。
dataFrame 和 sparkSql 能够设置 spark.sql.shuffle.partitions=[num_tasks] 参数控制 shuffle 的并发度,默认为200。
(2)适用场景
大量不一样的 Key 被分配到了相同的 Task 形成该 Task 数据量过大。
(3)解决方案
调整并行度。通常是增大并行度,但有时如减少并行度也可达到效果。
(4)优点
实现简单,只须要参数调优。可用最小的代价解决问题。通常若是出现数据倾斜,均可以经过这种方法先试验几回,若是问题未解决,再尝试其它方法。
(5)劣势
适用场景少,只是让每一个 task 执行更少的不一样的key。没法解决个别key特别大的状况形成的倾斜,若是某些 key 的大小很是大,即便一个 task 单独执行它,也会受到数据倾斜的困扰。而且该方法通常只能缓解数据倾斜,没有完全消除问题。从实践经验来看,其效果通常。
TIPS 能够把数据倾斜类比为 hash 冲突。提升并行度就相似于 提升 hash 表的大小。
(1)原理
使用自定义的 Partitioner(默认为 HashPartitioner),将本来被分配到同一个 Task 的不一样 Key 分配到不一样 Task。
例如,咱们在 groupByKey 算子上,使用自定义的 Partitioner:
.groupByKey(new Partitioner() { @Override public int numPartitions() { return 12; } @Override public int getPartition(Object key) { int id = Integer.parseInt(key.toString()); if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { return (id - 9500000) / 12; } else { return id % 12; } } })
TIPS 这个作法至关于自定义 hash 表的 哈希函数。
(2)适用场景
大量不一样的 Key 被分配到了相同的 Task 形成该 Task 数据量过大。
(3)解决方案
使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽可能将全部不一样的 Key 均匀分配到不一样的 Task 中。
(4)优点
不影响原有的并行度设计。若是改变并行度,后续 Stage 的并行度也会默认改变,可能会影响后续 Stage。
(5)劣势
适用场景有限,只能将不一样 Key 分散开,对于同一 Key 对应数据集很是大的场景不适用。效果与调整并行度相似,只能缓解数据倾斜而不能彻底消除数据倾斜。并且须要根据数据特色自定义专用的 Partitioner,不够灵活。
经过 Spark 的 Broadcast 机制,将 Reduce 端 Join 转化为 Map 端 Join,这意味着 Spark 如今不须要跨节点作 shuffle 而是直接经过本地文件进行 join,从而彻底消除 Shuffle 带来的数据倾斜。
from pyspark.sql.functions import broadcast result = broadcast(A).join(B, ["join_col"], "left")
其中 A 是比较小的 dataframe 而且可以整个存放在 executor 内存中。
(1)适用场景
参与Join的一边数据集足够小,可被加载进 Driver 并经过 Broadcast 方法广播到各个 Executor 中。
(2)解决方案
在 Java/Scala 代码中将小数据集数据拉取到 Driver,而后经过 Broadcast 方案将小数据集的数据广播到各 Executor。或者在使用 SQL 前,将 Broadcast 的阈值调整得足够大,从而使 Broadcast 生效。进而将 Reduce Join 替换为 Map Join。
(3)优点
避免了 Shuffle,完全消除了数据倾斜产生的条件,可极大提高性能。
(4)劣势
由于是先将小数据经过 Broadcase 发送到每一个 executor 上,因此须要参与 Join 的一方数据集足够小,而且主要适用于 Join 的场景,不适合聚合的场景,适用条件有限。
NOTES使用Spark SQL时须要经过 SET spark.sql.autoBroadcastJoinThreshold=104857600 将 Broadcast 的阈值设置得足够大,才会生效。
思路很简单,就是将一个 join 拆分红 倾斜数据集 Join 和 非倾斜数据集 Join,最后进行 union:
TIPS
- rightRDD 与倾斜 Key 对应的部分数据,须要与随机前缀集 (1~n) 做笛卡尔乘积 (即将数据量扩大 n 倍),从而保证不管数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。
- skewRDD 的 join 并行度能够设置为 n * k (k 为 topSkewkey 的个数)。
- 因为倾斜Key与非倾斜Key的操做彻底独立,可并行进行。
(1)适用场景
两张表都比较大,没法使用 Map 端 Join。其中一个 RDD 有少数几个 Key 的数据量过大,另一个 RDD 的 Key 分布较为均匀。
(2)解决方案
将有数据倾斜的 RDD 中倾斜 Key 对应的数据集单独抽取出来加上随机前缀,另一个 RDD 每条数据分别与随机前缀结合造成新的RDD(至关于将其数据增到到原来的N倍,N即为随机前缀的总个数),而后将两者Join并去掉前缀。而后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集经过union合并,便可获得所有Join结果。
(3)优点
相对于 Map 则 Join,更能适应大数据集的 Join。若是资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提高明显。且只针对倾斜部分的数据作数据扩展,增长的资源消耗有限。
(4)劣势
若是倾斜 Key 很是多,则另外一侧数据膨胀很是大,此方案不适用。并且此时对倾斜 Key 与非倾斜 Key 分开处理,须要扫描数据集两遍,增长了开销。
若是出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集所有加上随机前缀,而后对另一个不存在严重数据倾斜的数据集总体与随机前缀集做笛卡尔乘积(即将数据量扩大N倍)。
其实就是上一个方法的特例或者简化。少了拆分,也就没有 union。
(1)适用场景
一个数据集存在的倾斜 Key 比较多,另一个数据集数据分布比较均匀。
(2)优点
对大部分场景都适用,效果不错。
(3)劣势
须要将一个数据集总体扩大 N 倍,会增长资源消耗。
在 map 端加个 combiner 函数进行局部聚合。加上 combiner 至关于提早进行 reduce ,就会把一个 mapper 中的相同 key 进行聚合,减小 shuffle 过程当中数据量 以及 reduce 端的计算量。这种方法能够有效的缓解数据倾斜问题,可是若是致使数据倾斜的 key 大量分布在不一样的 mapper 的时候,这种方法就不是颇有效了。
TIPS 使用 reduceByKey 而不是 groupByKey。
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每一个 key 都打上一个 1~n 的随机数,好比 3 之内的随机数,此时原先同样的 key 就变成不同的了,好比 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成 (1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行 reduceByKey 等聚合操做,进行局部聚合,那么局部聚合结果,就会变成了 (1_hello, 2) (2_hello, 2) (3_hello, 1)。而后将各个 key 的前缀给去掉,就会变成 (hello, 2) (hello, 2) (hello, 1),再次进行全局聚合操做,就能够获得最终结果了,好比 (hello, 5)。
def antiSkew(): RDD[(String, Int)] = { val SPLIT = "-" val prefix = new Random().nextInt(10) pairs.map(t => ( prefix + SPLIT + t._1, 1)) .reduceByKey((v1, v2) => v1 + v2) .map(t => (t._1.split(SPLIT)(1), t2._2)) .reduceByKey((v1, v2) => v1 + v2) }
不过进行两次 mapreduce,性能稍微比一次的差些。
Hadoop 中直接贴近用户使用的是 Mapreduce 程序和 Hive 程序,虽然说 Hive 最后也是用 MR 来执行(至少目前 Hive 内存计算并不普及),可是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,所以这里稍做区分。
Hadoop 中的数据倾斜主要表如今 ruduce 阶段卡在99.99%,一直99.99%不能结束。
这里若是详细的看日志或者和监控界面的话会发现:
经验: Hive的数据倾斜,通常都发生在 Sql 中 Group 和 On 上,并且和数据逻辑绑定比较深。
优化方法
这里列出来一些方法和思路,具体的参数和用法在官网看就好了。
set hive.map.aggr=true
set hive.groupby.skewindata=true
说明
hive.map.aggr=true: 在map中会作部分汇集操做,效率更高但须要更多的内存。hive.groupby.skewindata=true: 数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每一个Reduce作部分聚合操做,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不一样的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程能够保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操做。
更多内容敬请关注 vivo 互联网技术 微信公众号
注:转载文章请先与微信号:labs2020 联系。