最近几回被问到关于数据倾斜的问题,这里找了些资料也结合一些本身的理解.html
在并行计算中咱们总但愿分配的每个task 都能以差很少的粒度来切分而且完成时间相差不大,可是集群中可能硬件不一样,应用的类型不一样和切分的数据大小不一致总会致使有部分任务极大的拖慢了整个任务 的完成时间,硬件不一样就不说了,应用的类型不一样其中就好比page rank 或者data mining 里面一些计算,它的每条记录消耗的成本不太同样,这里只讨论关于关系型运算的(通常能用SQL表述的) 数据切分上的数据倾斜问题.算法
hadoop 中数据倾斜会极大影响性能的一个背景是mapreduce 框架中老是不分条件的进行sort . 在通用状况下map sort + partition +reduce sort 能够获得结果,可是这个过程不必定是最优的. 对于关系型计算,其中数据倾斜影响最大的地方在reduce 的sort , reduce 处理的数据量的大小若是超过给定的reduce jvm 的大小的2倍不到的阈值的时候(这个阈值是我猜想的,具体以实际监控运行状况为准),reduce 端会发生multi-pass merge sort 的状况, 这个时候观察这些运行较慢的reduce task 的metrics 会发现reduce 跟IO 相关的metrics 会比其余reduce 大不少. 具体的细节参考今年hadoop summit 上Todd 的performance tuning 的ppt (26页):sql
http://www.slideshare.net/cloudera/mr-perfapache
这种在reduce 端不分条件的排序只是hadoop 是这种实现,并非mapreduce 框架必定须要排序,其余的mapreduce 实现或者其余的分布式计算框架可能在reduce 上的这种瓶颈会小一些, 好比shark 里面的group by 就是基于hash 而不是sort 的.app
对于关系型的计算中常见的数据倾斜有两种:group by 和 join , 其余有可能的有:框架
in或exists 的操做尤为是in或exists 做为subquery 的返回(in 或exists 有时候会变成left semi join),dom
有相同输入源的union 或union all 也许也会有(其余集合类型的操做intersect 之类也许也是). jvm
hive 中的udtf 也算一种.分布式
这里只讨论最多见的group by 和join 的状况.ide
数据分布:
正常的数据分布理论上都是倾斜的,就是咱们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不一样的数据字段可能的数据倾斜通常有两种状况:
一种是惟一值很是少,极少数值有很是多的记录值(惟一值少于几千)
一种是惟一值比较多,这个字段的某些值有远远多于其余值的记录数,可是它的占比也小于百分之一或千分之一
分区:
常见的mapreduce分区方式为hash 和range ,
hash partition 的好处是比较弹性,跟数据类型无关,实现简单(设定reduce个数就好,通常不须要本身实现)
range partition 须要实现者本身了解数据分布, 有时候须要手工作sample取样. 同时也不够弹性, 表如今几个方面,1. 对同一个表的不一样字段都须要实现不一样的range partition, 对于时间这种字段根据查询类型的不一样或者过滤条件的不一样切分range 的大小都不必定.
2 .有时候可能设计使用多个字段组合的状况, 这时候又不能使用以前单个字段的partition 类, 而且多个字段组合之间有可能有隐含的联系,好比出生日期和星座,商品和季节.
3. 手工作sample 很是耗时间,须要使用者对查询使用的数据集的分布有领域知识.
4. 分配方式是死的,reduce 个数是肯定的,一旦某种状况下发生倾斜,调整参数
其余的分区类型还有hbase 的hregionpartitioner 或者totalorder partitioner 等.
可以想到的关于数据倾斜的一些解决方式(欢迎补充,尤为是有没有作搜索或者数据挖掘的朋友有碰到相似问题):
1. 增长reduce 的jvm内存
2. 增长reduce 个数
3. customer partition
4. 其余优化的讨论.
5. reduce sort merge排序算法的讨论
6. 正在实现中的hive skewed join.
7. pipeline
8. distinct
9. index 尤为是bitmap index
方式1:既然reduce 自己的计算须要以合适的内存做为支持,在硬件环境允许的状况下,增长reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤为适合数据分布第一种状况,单个值有大量记录, 这种值的全部纪录已经超过了分配给reduce 的内存,不管你怎么样分区这种状况都不会改变. 固然这种状况的限制也很是明显, 1.内存的限制存在,2.可能会对集群其余任务的运行产生不稳定的影响.
方式2: 这个对于数据分布第二种状况有效,惟一值较多,单个惟一值的记录数不会超过度配给reduce 的内存. 若是发生了偶尔的数据倾斜状况,增长reduce 个数能够缓解偶然状况下的某些reduce 不当心分配了多个较多记录数的状况. 可是对于第一种数据分布无效.
方式3: 一种状况是某个领域知识告诉你数据分布的显著类型,好比hadoop definitive guide 里面的温度问题,一个固定的组合(观测站点的位置和温度) 的分布是固定的, 对于特定的查询若是前面两种方式都没用,实现本身的partitioner 也许是一个好的方式.
方式4: 目前有的一些针对数据倾斜的优化好比pig 的skewed join
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins
pig 文档上面说是根据数据输入的统计信息来肯定分区(也就是range partition?),另外不清楚这个行为是不是动态运行时候才决定的,也就是运行以前有一步pig 自动作sample 的工做,由于pig 是没有统计信息这一说的.
hive 中的group by
<property>
<name>hive.groupby.skewindata</name>
<value>false</value>
<description>Whether there is skew in data to optimize group by queries</description>
</property>
<property>
<name>hive.optimize.groupby</name>
<value>true</value>
<description>Whether to enable the bucketed group by from bucketed partitions / tables.</description>
</property>
<property>
<name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
<value>0.3</value>
<description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description>
</property>
<property>
<name>hive.groupby.mapaggr.checkinterval</name>
<value>100000</value>
<description>Number of rows after which size of the grouping keys/aggregation classes is performed</description>
</property>
其中最后一个参数hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 类似, in-memeory combiner 是发生在mapper 端sort 以前,而不是如今的combiner发生在mapper sort 以后甚至在写入磁盘以后从新读磁盘而后排序合并. in-memeory combiner 最先好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介绍ppt 里面好像提到它们也有这个优化. mapper 端减小数据的机会比reduce 端的要大,因此通常不会看到reduce 端的combiner 的讨论,可是这种思路也有,好比google tenzing 的join 讨论里面有一个prev-next 的小优化就是基于reduce 端的combiner, 但那个前提是基于block shuffle 实现的基础上,数据已经排过序了,因此join 时候前一条数据跟后一条数据相同的几率很大.
hive 中的skewed join : 以前的文章已经介绍过两表join 中hive 的几个优化,其中的skewed join 的相似思路就是上面介绍的skewed 的第二种:增长reduce 的个数,hive 中是经过判断阈值若是大于一个reduce 须要处理的数据量,从新起额外的task 来处理这些超额的reduce 自己须要处理的数据, 这是一种较晚的补救措施,自己hive 开始分区的时候已经倾斜(partition 的方式不合理), 当运行的时候经过运行时监控reduce 发现倾斜的特殊key 而后额外的起task 去处理,效果比较通常,感兴趣的同窗能够参考HIVE-3086 里面我和facebook 团队对这种优化思路的讨论. 第六节我会讨论一下我所认为的思路和facebook 正在作的思路之间的差异.
方式5 : reduce 分配的内存远小于处理的数据量时,会产生multi-pass sort 的状况是瓶颈,那么就要问
1. 这种排序是有必要的嘛?
2. 是否有其余排序算法或优化能够根据特定状况下降他瓶颈的阈值?
3. map reduce 适合处理这种状况嘛?
关于问题1. 若是是group by , 那么对于数据分布状况1 ,hash 比sort 好很是多,即便某一个reduce 比其余reduce 处理多的多的数据,hash 的计算方式也不会差距太大.
问题2. 一个是若是实现block shuffle 确定会极大的减小排序自己的成本, 另外,若是分区以后的reduce 不是使用copy –> sort-merge –> reduce 的计算方式, 在copy 以后将每一个block 的头部信息保存在内存中,不用sort – merge 也能够直接计算reduce, 只不过这时候变成了随机访问,而不是如今的sort-merge 以后的顺序访问. block shuffle 的实现有两种类型,一种是当hadoop 中真正有了列数据格式的时候,数据有更大的机会已经排过序而且按照block 来切分,通常block 为1M ( 能够关注avro-806 ) , 这时候的mapper 什么都不作,甚至连计算分区的开销都小了不少倍,直接进入reduce 最后一步,第二种类型为没有列数据格式的支持,须要mapper 排序获得以后的block 的最大最小值,reduce 端在内存中保存最大最小值,copy 完成后直接用这个值来作随机读而后进行reduce. ( block shuffle 的实现能够关注 MAPREDUCE-4039 , hash 计算能够关注 MAPREDUCE-1639)
问题3 . map reduce 只有两个函数,一个map 一个 reduce, 一旦发生数据倾斜就是partition 失效了,对于join 的例子,某一个key 分配了过多的记录数,对于只有一次partittion的机会,分配错了数据倾斜的伤害就已经形成了,这种状况很难调试,可是若是你是基于map- reduce-reduce 的方式计算,那么对于同一个key 不须要分配到同一个reduce 中,在第一个reduce 中获得的结果能够在第二个reduce 才汇总去重,第二个reduce 不须要sort – merge 的步骤,由于前一个reduce 已经排过序了,中间的reduce 处理的数据不用关心partition 怎么分,处理的数据量都是同样大,而第二个reduce 又不使用sort-merge 来排序,不会遇到如今的内存大小的问题,对于skewed join 这种状况瓶颈天然小不少.
方式6: 目前hive 有几个正在开发中的处理skewed join 状况的jira case, HIVE-3086 , HIVE-3286 ,HIVE-3026 . 简单介绍一下就是facebook 但愿经过手工处理提早枚举的方式列出单个倾斜的值,在join 的时候将这些值特殊列出看成map join 来处理,对于其余值使用原来的方式. 我我的以为这太不伸缩了,值自己没有考虑应用过滤条件和优化方式以后的数据量大小问题,他们提早列出的值都是基于整个分区的. join key 若是为组合key 的状况也应该没有考虑,对metastore 的储存问题有限制,对输入的大表和小表都会scan 两次( 一次处理非skew key , 一次处理skew key 作map join), 对输出表也会scan 两次(将两个结果进行merge) , skew key 必须提早手工列出这又存在额外维护的成本,目前由于尚未完整的开发完到可以投入生产的状况,因此等全部特性处理完了有了文档在看看这个处理方式是否有 效,我我的认为的思路应该是接着bucked map join 的思路往下走,只不过不用提早处理cluster key 的问题, 这时候cluster key 的选择应该是join key + 某个能分散join key 的列, 这等于将大表的同一个key 的值分散到了多个不一样的reduce 中,而小表的join key 也必须cluster 到跟大表对应的同一个key , join 中对于数据分布第二种状况不用太难,增长reduce 个数就好,主要是第一种,须要大表的join key 可以分散,对于一样join key 的小表又可以匹配到全部大表中的记录. 这种思路就是不用扫描大表两遍或者结果输出表,不须要提早手工处理,数据是动态sample 的应用了过滤条件以后的数据,而不是提早基于统计数据的不许确结果. 这个基本思路跟tenzing 里面描述的distributed hash join 是同样的,想办法切成合适的大小而后用hash 和 map join .
方式7: 当同时出现join 和group 的时候, 那么这两个操做应该是以pipeline (管道) 的方式执行. 在join 的时候就能够直接使用group 的操做符减小大量的数据,而不是等待join 完成,而后写入磁盘,group 又读取磁盘作group操做. HIVE-2206 正在作这个优化. hive 里面是没有pipeline 这个概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有这种概念的.
方式8: distinct 自己就是group by 的一种简写,我原先觉得count(distinct x)这种跟group by 是同样的,可是发现hive 里面distinct 明显比group by 要慢,可能跟group by 会有map 端的combiner有关, 另外观察到hive 在预估count(distinct x) 的reduce 个数比group by 的个数要少 , 因此hive 中使用count(distinct x) , 要么尽可能把reduce 个数设置大,直接设置reduce 个数或者hive.exec.reducers.bytes.per.reducer 调小,我我的比较喜欢调后面一个,hive 目前的reduce 个数没有统计信息的状况下就是用map端输入以前的数值, 若是你是join 以后还用count(distinct x) 的话,这个默认值通常都会悲剧,若是有where 条件并能过滤必定数量的数据,那么默认reduce 个数可能就还好一点. 无论怎样,多浪费一点reduce slot 总比等十几甚至几十分钟要好, 或者转换成group by 的写法也不错,写成group by 的时候distributed by 也颇有帮助.
方式9: hive 中的index 就是物化视图,对于group by 和distinct 的状况等于变成了map 端在作计算,天然不存在倾斜. 尤为是bitmap index , 对于惟一值比较少的列优点更大,不过index 麻烦的地方在于须要判断你的sql 是否是经常使用sql , 另外若是create index 的时候没有选你查询的时候用的字段,这个index 是不能用的( hive 中是永远不可能有DBMS中的用index 去lookup 或者join 原始表这种概念的)
其余建议:
网上能找到的另一份很好的描述数据倾斜的资料是
http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf
里面的map side skew 和expensive record 都不是关系型计算中的问题,因此不是这篇文章关注点. 对于关系型计算,其中数据倾斜影响最大的地方在reduce 的sort. 这篇文章里面最后总结的5点好的建议值得参考,
其中第三条须要你知道应用combiner 和特殊优化方式是否带来了性能的提高,hive 的map aggr 在数据分布状况1效果会比较好,数据分布状况2效果就不大,还有combiner 应用的时候是消耗了系统资源的,确认这种消耗是否值得而不是任何状况下都使用combiner.
对于第四点关系型计算中map 倾斜状况不太常见. 一种能够举出来的例子是分区不合理,或者hive 中的cluster by 的key 选择不合理(都是使用目录的方式分区, 目录是最小处理单元了).
另一份是淘宝的数据倾斜总结:
http://www.alidata.org/archives/2109
不过我我的以为帮助不是太大,里面第一个解决方式空值产生的影响第一个Union All 的方式我的是极力反对的,同一个表尤为是大表扫描两遍这额外的成本跟收益太不匹配,不推荐,第二个将特殊值变成random 的方式, 这个产生的结果是正确的嘛? 尤为是在各类状况下输出结果是正确的嘛?里面背景好像是那个小表users 的主键为userid, 而后userid 又是join key , 并且还不为空? 不太推荐,背景条件和输出的正确性与否存疑.
第二个数据类型不一样的问题我以为跟HIVE-3445 都算是数据建模的问题,提早修改好是同样的.
第三个是由于淘宝的hadoop 版本中没有map side hash aggr 的参数吧. 并且写成distinct 还多了一个MR 步骤,不太推荐.
数据倾斜在MPP 中也是一个课题,这也设计到一个数据重分配的问题,可是相对于MPP 中有比较成熟的机制,一个是mpp 在处理数据初始分布的时候老是会指定segmented by 或者distributed by 这种显示分配到不一样物理机器上的建表语句. 还有就是统计信息会帮助执行引擎选择合适的从新分布.可是统计信息也不是万能的,好比
1:统计信息的粒度和更新问题.
2: 应用了过滤条件以后的数据也许不符合原始指望的数据分布.
3: 统计信息是基于采样的,总于真实全部数据存在偏差.
4: 统计信息是基于partittion 的, 对于查询没有涉及到partition 字段的切分就不能使用各partition 只和来表示整体的统计信息.
5. 临时表或者多步骤查询的中间过程数据没有统计信息的状况.
6. 各类其余的算法优化好比in-mapper combiner 或者google Tenzing 的prev – next combine 都会影响统计信息对于算法选择的不一样.
总结:
数据倾斜没有一劳永逸的方式能够解决,了解你的数据集的分布状况,而后了解你所使用计算框架的运行机制和瓶颈,针对特定的状况作特定的优化,作多种尝试,观察是否有效.