【SQL系列】深刻浅出数据仓库中SQL性能优化之Hive篇

公众号: SAP Technical
本文做者: matinal
 

 

前言部分

你们能够关注个人公众号,公众号里的排版更好,阅读更温馨。html

正文部分

一个Hive查询生成多个Map Reduce Job,一个Map Reduce Job又有Map,Reduce,Spill,Shuffle,Sort等多个阶段,因此针对Hive查询的优化能够大体分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR Job)的优化,下文会分别阐述。算法

 

在开始以前,先把MR的流程图帖出来(摘自Hadoop权威指南),方便后面对照。另外要说明的是,这个优化只是针对Hive 0.9版本,而不是后来Hortonwork发起Stinger项目以后的版本。相对应的Hadoop版本是1.x而非2.x。sql

 

Map阶段的优化(Map phase)

Map阶段的优化,主要是肯定合适的Map数。那么首先要了解Map数的计算公式:数据库

 

[js]  view plain copy
 
 
 
  1. num_Map_tasks = max[${Mapred.min.split.size},  
  2.                 min(${dfs.block.size}, ${Mapred.max.split.size})]  

 

 

  • Mapred.min.split.size指的是数据的最小分割单元大小。
  • Mapred.max.split.size指的是数据的最大分割单元大小。
  • dfs.block.size指的是HDFS设置的数据块大小。

 

通常来讲dfs.block.size这个值是一个已经指定好的值,并且这个参数Hive是识别不到的: apache

 

[js]  view plain copy
 
 
 
  1. Hive> set dfs.block.size;  
  2. dfs.block.size is undefined  

 

因此实际上只有Mapred.min.split.size和Mapred.max.split.size这两个参数(本节内容后面就以min和max指代这两个参数)来决定Map数量。在Hive中min的默认值是1B,max的默认值是256MB: 性能优化

 

[js]  view plain copy
 
 
 
  1. Hive> set Mapred.min.split。size;  
  2. Mapred.min.split.size=1  
  3. Hive> set Mapred.max.split。size;  
  4. Mapred.max.split.size=256000000  

 

因此若是不作修改的话,就是1个Map task处理256MB数据,咱们就以调整max为主。经过调整max能够起到调整Map数的做用,减少max能够增长Map数,增大max能够减小Map数。须要提醒的是,直接调整Mapred.Map.tasks这个参数是没有效果的。架构

调整大小的时机根据查询的不一样而不一样,总的来说能够经过观察Map task的完成时间来肯定是否须要增长Map资源。若是Map task的完成时间都是接近1分钟,甚至几分钟了,那么每每增长Map数量,使得每一个Map task处理的数据量减小,可以让Map task更快完成;而若是Map task的运行时间已经不多了,好比10-20秒,这个时候增长Map不太可能让Map task更快完成,反而可能由于Map须要的初始化时间反而让Job整体速度变慢,这个时候反而须要考虑是否能够把Map的数量减小,这样能够节省更多资源给其余Job。app

Reduce阶段的优化(Reduce phase)

这里说的Reduce阶段,是指前面流程图中的Reduce phase(实际的Reduce计算)而非图中整个Reduce task。Reduce阶段优化的主要工做也是选择合适的Reduce task数量,跟上面的Map优化相似。jvm

与Map优化不一样的是,Reduce优化时,能够直接设置Mapred。Reduce。tasks参数从而直接指定Reduce的个数。固然直接指定Reduce个数虽然比较方便,可是不利于自动扩展。Reduce数的设置虽然相较Map更灵活,可是也能够像Map同样设定一个自动生成规则,这样运行定时Job的时候就不用担忧原来设置的固定Reduce数会因为数据量的变化而不合适。分布式

Hive估算Reduce数量的时候,使用的是下面的公式:

 

[js]  view plain copy
 
 
 
  1. num_Reduce_tasks = min[${Hive.exec.Reducers.max},   
  2.                       (${input.size} / ${ Hive.exec.Reducers.bytes.per.Reducer})]  

 

也就是说,根据输入的数据量大小来决定Reduce的个数,默认Hive.exec.Reducers.bytes.per.Reducer为1G,并且Reduce个数不能超过一个上限参数值,这个参数的默认取值为999。因此咱们能够调整Hive.exec.Reducers.bytes.per.Reducer来设置Reduce个数。

设置Reduce数一样也是根据运行时间做为参考调整,而且能够根据特定的业务需求、工做负载类型总结出经验,因此再也不赘述。

Map与Reduce之间的优化(Spill, copy, Sort phase)

Map phase和Reduce phase之间主要有3道工序。首先要把Map输出的结果进行排序后作成中间文件,其次这个中间文件就能分发到各个Reduce,最后Reduce端在执行Reduce phase以前把收集到的排序子文件合并成一个排序文件。这个部分能够调的参数挺多,可是通常都是不要调整的,没必要重点关注。

Spill 与 Sort

在Spill阶段,因为内存不够,数据可能没办法在内存中一次性排序完成,那么就只能把局部排序的文件先保存到磁盘上,这个动做叫Spill,而后Spill出来的多个文件能够在最后进行merge。若是发生Spill,能够经过设置io.Sort.mb来增大Mapper输出buffer的大小,避免Spill的发生。另外合并时能够经过设置io.Sort.factor来使得一次性可以合并更多的数据。调试参数的时候,一个要看Spill的时间成本,一个要看merge的时间成本,还须要注意不要撑爆内存(io.Sort.mb是算在Map的内存里面的)。Reduce端的merge也是同样能够用io.Sort.factor。通常状况下这两个参数不多须要调整,除非很明确知道这个地方是瓶颈。

Copy

copy阶段是把文件从Map端copy到Reduce端。默认状况下在5%的Map完成的状况下Reduce就开始启动copy,这个有时候是很浪费资源的,由于Reduce一旦启动就被占用,一直等到Map所有完成,收集到全部数据才能够进行后面的动做,因此咱们能够等比较多的Map完成以后再启动Reduce流程,这个比例能够通Mapred.Reduce.slowstart.completed.Maps去调整,他的默认值就是5%。若是以为这么作会减慢Reduce端copy的进度,能够把copy过程的线程增大。tasktracker.http.threads能够决定做为server端的Map用于提供数据传输服务的线程,Mapred.Reduce.parallel.copies能够决定做为client端的Reduce同时从Map端拉取数据的并行度(一次同时从多少个Map拉数据),修改参数的时候这两个注意协调一下,server端能处理client端的请求便可。

文件格式的优化

文件格式方面有两个问题,一个是给输入和输出选择合适的文件格式,另外一个则是小文件问题。小文件问题在目前的Hive环境下已经获得了比较好的解决,Hive的默认配置中就能够在小文件输入时自动把多个文件合并给1个Map处理,输出时若是文件很小也会进行一轮单独的合并,因此这里就不专门讨论了。相关的参数能够在这里找到。

关于文件格式,Hive0.9版本有3种,textfile,sequencefile和rcfile。整体上来讲,rcfile的压缩比例和查询时间稍好一点,因此推荐使用。

关于使用方法,能够在建表结构时能够指定格式,而后指定压缩插入:

 

[js]  view plain copy
 
 
 
  1. create table rc_file_test( col int ) stored as rcfile;  
  2. set Hive.exec.compress.output = true;  
  3. insert overwrite table rc_file_test  
  4. select * from source_table;  

 

另外时也能够指定输出格式,也能够经过Hive。default。fileformat来设定输出格式,适用于create table as select的状况: 

 

[js]  view plain copy
 
 
 
  1. set Hive.default.fileformat = SequenceFile;  
  2. set Hive.exec.compress.output = true;   
  3. /*对于sequencefile,有record和block两种压缩方式可选,block压缩比更高*/  
  4. set Mapred.output.compression.type = BLOCK;   
  5. create table seq_file_test  
  6. as select * from source_table;  

 

上面的文件格式转换,实际上是由Hive完成的(也就是插入动做)。可是也能够由外部直接导入纯文本(能够按照这里的作法预先压缩),或者是由MapReduce Job生成的数据。

值得注意的是,Hive读取sequencefile的时候,是把key忽略的,也就是直接读value而且按照指定分隔符分隔字段。可是若是Hive的数据来源是从mr生成的,那么写sequencefile的时候,key和value都是有意义的,key不能被忽略,而是应该当成第一个字段。为了解决这种不匹配的状况,有两种办法。一种是要求凡是结果会给Hive用的mr Job输出value的时候带上key。可是这样的话对于开发是一个负担,读写数据的时候都要注意这个状况。因此更好的方法是第二种,也就是把这个源自于Hive的问题交给Hive解决,写一个InputFormat包装一下,把value输出加上key便可。如下是核心代码,修改了RecordReader的next方法:

 

[js]  view plain copy
 
 
 
  1. public synchronized boolean next(K key, V value) throws IOException   
  2. {  
  3.     Text tKey = (Text) key;  
  4.     Text tValue = (Text) value;  
  5.     if (!super.next(innerKey, innerValue))   
  6.         return false;  
  7.   
  8.     Text inner_key = (Text) innerKey; //在构造函数中用createKey()生成  
  9.     Text inner_value = (Text) innerValue; //在构造函数中用createValue()生成  
  10.   
  11.     tKey.set(inner_key);  
  12.     tValue.set(inner_key.toString() + '\t' + inner_value.toString()); // 分隔符注意本身定义  
  13.     return true;  
  14. }  

 

Job总体优化

有一些问题必须从Job的总体角度去观察。这里讨论几个问题:Job执行模式(本地执行v.s.分布式执行)、JVM重用、索引、Join算法、数据倾斜。

Job执行模式

Hadoop的Map Reduce Job能够有3种模式执行,即本地模式,伪分布式,还有真正的分布式。本地模式和伪分布式都是在最初学习Hadoop的时候每每被说成是作单机开发的时候用到。可是实际上对于处理数据量很是小的Job,直接启动分布式Job会消耗大量资源,而真正执行计算的时间反而很是少。这个时候就应该使用本地模式执行mr Job,这样执行的时候不会启动分布式Job,执行速度就会快不少。好比通常来讲启动分布式Job,不管多小的数据量,执行时间通常不会少于20s,而使用本地mr模式,10秒左右就能出结果。

设置执行模式的主要参数有三个,一个是Hive.exec.mode.local.auto,把他设为true就可以自动开启local mr模式。可是这还不足以启动local mr,输入的文件数量和数据量大小必需要控制,这两个参数分别为Hive.exec.mode.local.auto.tasks.max和Hive.exec.mode.local.auto.inputbytes.max,默认值分别为4和128MB,即默认状况下,Map处理的文件数不超过4个而且总大小小于128MB就启用local mr模式。

JVM重用

正常状况下,MapReduce启动的JVM在完成一个task以后就退出了,可是若是任务花费时间很短,又要屡次启动JVM的状况下(好比对很大数据量进行计数操做),JVM的启动时间就会变成一个比较大的overhead。在这种状况下,可使用jvm重用的参数:

 

[js]  view plain copy
 
 
 
  1. set Mapred.Job.reuse.jvm.num.tasks = 5;  

 

他的做用是让一个jvm运行屡次任务以后再退出。这样一来也能节约很多JVM启动时间。

索引

整体上来讲,Hive的索引目前仍是一个不太适合使用的东西,这里只是考虑到叙述完整性,对其进行基本的介绍。

Hive中的索引架构开放了一个接口,容许你根据这个接口去实现本身的索引。目前Hive本身有一个参考的索引实现(CompactIndex),后来在0.8版本中又加入位图索引。这里就讲讲CompactIndex。

CompactIndex的实现原理相似一个lookup table,而非传统数据库中的B树。若是你对table A的col1作了索引,索引文件自己就是一个table,这个table会有3列,分别是col1的枚举值,每一个值对应的数据文件位置,以及在这个文件位置中的偏移量。经过这种方式,能够减小你查询的数据量(偏移量能够告诉你从哪一个位置开始找,天然只须要定位到相应的block),起到减小资源消耗的做用。可是就其性能来讲,并无很大的改善,极可能还不如构建索引须要花的时间。因此在集群资源充足的状况下,没有太大必要考虑索引。

CompactIndex的还有一个缺点就是使用起来不友好,索引建完以后,使用以前还须要根据查询条件作一个一样剪裁才能使用,索引的内部结构彻底暴露,并且还要花费额外的时间。具体看看下面的使用方法就了解了:

 

[js]  view plain copy
 
 
 
  1. /*在index_test_table表的id字段上建立索引*/  
  2. create index idx on table index_test_table(id)    
  3. as 'org.apache.Hadoop.Hive.ql.index.compact.CompactIndexHandler' with deferred rebuild;  
  4. alter index idx on index_test_table rebuild;  
  5.       
  6. /*索引的剪裁。找到上面建的索引表,根据你最终要用的查询条件剪裁一下。*/  
  7. /*若是你想跟RDBMS同样建完索引就用,那是不行的,会直接报错,这也是其麻烦的地方*/  
  8. create table my_index  
  9. as select _bucketname, `_offsets`  
  10. from default__index_test_table_idx__ where id = 10;  
  11.       
  12. /*如今能够用索引了,注意最终查询条件跟上面的剪裁条件一致*/  
  13. set Hive.index.compact.file = /user/Hive/warehouse/my_index;   
  14. set Hive.input.format = org.apache.Hadoop.Hive.ql.index.compact.HiveCompactIndexInputFormat;  
  15. select count(*) from index_test_table where id = 10;  

 

Join算法

处理分布式join,通常有两种方法:

 

  • replication join:把其中一个表复制到全部节点,这样另外一个表在每一个节点上面的分片就能够跟这个完整的表join了;
  • repartition join:把两份数据按照join key进行hash重分布,让每一个节点处理hash值相同的join key数据,也就是作局部的join。

 

这两种方式在M/R Job中分别对应了Map side join和Reduce side join。在一些MPP DB中,数据能够按照某列字段预先进行hash分布,这样在跟这个表以这个字段为join key进行join的时候,该表确定不须要作数据重分布了,这种功能是以HDFS做为底层文件系统的Hive所没有的。

在默认状况下,Hive的join策略是进行Reduce side join。当两个表中有一个是小表的时候,就能够考虑用Map join了,由于小表复制的代价会好过大表Shuffle的代价。使用Map join的配置方法有两种,一种直接在sql中写hint,语法是/*+MapJOIN (tbl)*/,其中tbl就是你想要作replication的表。另外一种方法是设置Hive.auto.convert.join = true,这样Hive会自动判断当前的join操做是否合适作Map join,主要是找join的两个表中有没有小表。至于多大的表算小表,则是由Hive.smalltable.filesize决定,默认25MB。

可是有的时候,没有一个表足够小到可以放进内存,可是仍是想用Map join怎么办?这个时候就要用到bucket Map join。其方法是两个join表在join key上都作hash bucket,而且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照join key作hash bucket。小表依然复制到全部节点,Map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket作局部join,这样每次只须要加载部分hashtable就能够了。

而后在两个表的join key都具备惟一性的时候(也就是可作主键),还能够进一步作Sort merge bucket Map join。作法仍是两边要作hash bucket,并且每一个bucket内部要进行排序。这样一来当两边bucket要作局部join的时候,只须要用相似merge Sort算法中的merge操做同样把两个bucket顺序遍历一遍便可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提高会有很大帮助。

而后这里以一个完整的实验说明这几种join算法如何操做。

首先建表要带上bucket:

 

[js]  view plain copy
 
 
 
  1. create table Map_join_test(id int)  
  2. clustered by (id) Sorted by (id) into 32 buckets  
  3. stored as textfile;  

 

而后插入咱们准备好的800万行数据,注意要强制划分红bucket(也就是用Reduce划分hash值相同的数据到相同的文件): 

 

[js]  view plain copy
 
 
 
  1. set Hive.enforce.bucketing = true;  
  2. insert overwrite table Map_join_test  
  3. select * from Map_join_source_data;  

 

这样这个表就有了800万id值(且里面没有重复值,因此能够作Sort merge),占用80MB左右。

接下来咱们就能够一一尝试Map join的算法了。首先是普通的Map join:

 

[js]  view plain copy
 
 
 
  1. select /*+Mapjoin(a) */count(*)  
  2. from Map_join_test a  
  3. join Map_join_test b on a.id = b.id;  

 

而后就会看到分发hash table的过程: 

 

[js]  view plain copy
 
 
 
  1. 2013-08-31 09:08:43     Starting to launch local task to process Map join;      maximum memory = 1004929024  
  2. 2013-08-31 09:08:45     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38823016        rate:   0.039  
  3. 2013-08-31 09:08:46     Processing rows:   300000  Hashtable size: 299999  Memory usage:   56166968        rate:   0.056  
  4. ……  
  5. 2013-08-31 09:12:39     Processing rows:  4900000 Hashtable size: 4899999 Memory usage:   896968104       rate:   0.893  
  6. 2013-08-31 09:12:47     Processing rows:  5000000 Hashtable size: 4999999 Memory usage:   922733048       rate:   0.918  
  7. Execution failed with exit status: 2  
  8. Obtaining error information  
  9.   
  10. Task failed!  
  11. Task ID:  
  12.   Stage-4  

 

不幸的是,竟然内存不够了,直接作Map join失败了。可是80MB的大小为什么用1G的heap size都放不下?观察整个过程就会发现,平均一条记录须要用到200字节的存储空间,这个overhead太大了,对于Map join的小表size必定要好好评估,若是有几十万记录数就要当心了。虽然不太清楚其中的构造原理,可是在互联网上也能找到其余的例证,好比这里和这里,平均一行500字节左右。这个明显比通常的表一行占用的数据量要大。不过Hive也在作这方面的改进,争取缩小hash table,好比Hive-6430。

因此接下来咱们就用bucket Map join,以前分的bucket就派上用处了。只须要在上述sql的前面加上以下的设置:

 

[js]  view plain copy
 
 
 
  1. set Hive。optimize。bucketMapjoin = true;  

 

而后仍是会看到hash table分发: 

 

[js]  view plain copy
 
 
 
  1. 2013-08-31 09:20:39     Starting to launch local task to process Map join;      maximum memory = 1004929024  
  2. 2013-08-31 09:20:41     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38844832        rate:   0.039  
  3. 2013-08-31 09:20:42     Processing rows:   275567  Hashtable size: 275567  Memory usage:   51873632        rate:   0.052  
  4. 2013-08-31 09:20:42     Dump the hashtable into file: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0。hashtable  
  5. 2013-08-31 09:20:46     Upload 1 File to: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0。hashtable File size: 11022975  
  6. 2013-08-31 09:20:47     Processing rows:   300000  Hashtable size: 24432   Memory usage:   8470976 rate:   0.008  
  7. 2013-08-31 09:20:47     Processing rows:   400000  Hashtable size: 124432  Memory usage:   25368080        rate:   0.025  
  8. 2013-08-31 09:20:48     Processing rows:   500000  Hashtable size: 224432  Memory usage:   42968080        rate:   0.043  
  9. 2013-08-31 09:20:49     Processing rows:   551527  Hashtable size: 275960  Memory usage:   52022488        rate:   0.052  
  10. 2013-08-31 09:20:49     Dump the hashtable into file: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0。hashtable  
  11. ……  

 

此次就会看到每次构建完一个hash table(也就是所对应的对应一个bucket),会把这个hash table写入文件,从新构建新的hash table。这样一来因为每一个hash table的量比较小,也就不会有内存不足的问题,整个sql也能成功运行。不过光光是这个复制动做就要花去3分半的时间,因此若是整个Job原本就花不了多少时间的,那这个时间就不可小视。    

最后咱们试试Sort merge bucket Map join,在bucket Map join的基础上加上下面的设置便可:

 

[js]  view plain copy
 
 
 
  1. set Hive.optimize.bucketMapjoin.Sortedmerge = true;  
  2. set Hive.input.format = org.apache.Hadoop.Hive.ql.io.BucketizedHiveInputFormat;  

 

Sort merge bucket Map join是不会产生hash table复制的步骤的,直接开始作实际Map端join操做了,数据在join的时候边作边读。跳过复制的步骤,外加join算法的改进,使得Sort merge bucket Map join的效率要明显好于bucket Map join。

关于join的算法虽然有这么些选择,可是我的以为,对于平常使用,掌握默认的Reduce join和普通的(无bucket)Map join已经能解决大多数问题。若是小表不能彻底放内存,可是小表相对大表的size量级差异也很是大的时候也能够试试bucket Map join,不过其hash table分发的过程会浪费很多时间,须要评估下是否可以比Reduce join更高效。而Sort merge bucket Map join虽然性能不错,可是把数据作成bucket自己也须要时间,另外其发动条件比较特殊,就是两边join key必须都惟一(不少介绍资料中都不提这一点。强调下必须都是惟一,哪怕只有一个表不惟一,出来的结果也是错的。固然,其实这点彻底能够根据其算法原理推敲出来)。这样的场景相对比较少见,“用户基本表 join 用户扩展表”以及“用户今天的数据快照 join 用户昨天的数据快照”这类场景可能比较合适。

这里顺便说个题外话,在数据仓库中,小表每每是维度表,而小表Map join这件事情其实用udf代替还会更快,由于不用单独启动一轮Job,因此这也是一种可选方案。固然前提条件是维度表是固定的天然属性(好比日期),只增长不修改(好比网站的页面编号)的状况也能够考虑。若是维度有更新,要作缓慢变化维的,固然仍是维表好维护。至于维表本来的一个主要用途OLAP,以Hive目前的性能是无法实现的,也就不须要多虑了。

数据倾斜

所谓数据倾斜,说的是因为数据分布不均匀,个别值集中占据大部分数据量,加上Hadoop的计算模式,致使计算资源不均匀引发性能降低。下图就是一个例子:

 

仍是拿网站的访问日志说事吧。假设网站访问日志中会记录用户的user_id,而且对于注册用户使用其用户表的user_id,对于非注册用户使用一个user_id=0表明。那么鉴于大多数用户是非注册用户(只看不写),因此user_id=0占据了绝大多数。而若是进行计算的时候若是以user_id做为group by的维度或者是join key,那么个别Reduce会收到比其余Reduce多得多的数据——由于它要接收全部user_id=0的记录进行处理,使得其处理效果会很是差,其余Reduce都跑完好久了它还在运行。

倾斜分红group by形成的倾斜和join形成的倾斜,须要分开看。

group by形成的倾斜有两个参数能够解决,一个是Hive.Map.aggr,默认值已经为true,意思是会作Map端的combiner。因此若是你的group by查询只是作count(*)的话,实际上是看不出倾斜效果的,可是若是你作的是count(distinct),那么仍是会看出一点倾斜效果。另外一个参数是Hive.groupby. skewindata。这个参数的意思是作Reduce操做的时候,拿到的key并非全部相同值给同一个Reduce,而是随机分发,而后Reduce作聚合,作完以后再作一轮MR,拿前面聚合过的数据再算结果。因此这个参数其实跟Hive.Map.aggr作的是相似的事情,只是拿到Reduce端来作,并且要额外启动一轮Job,因此其实不怎么推荐用,效果不明显。

若是说要改写SQL来优化的话,能够按照下面这么作:

 

[js]  view plain copy
 
 
 
  1. /*改写前*/  
  2. select a, count(distinct b) as c from tbl group by a;  
  3. /*改写后*/  
  4. select a, count(*) as c  
  5. from (select distinct a, b from tbl) group by a;  

 

join形成的倾斜,就好比上面描述的网站访问日志和用户表两个表join: 

 

[js]  view plain copy
 
 
 
  1. select a.* from logs a join users b on a。user_id = b.user_id;  

 

Hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在Reduce端计算掉,而是先写入hdfs,而后启动一轮Map join专门作这个特殊值的计算,指望能提升计算这部分值的处理速度。固然你要告诉Hive这个join是个skew join,即:

 

[js]  view plain copy
 
 
 
  1. set Hive.optimize.skewjoin = true;  

 

还有要告诉Hive如何判断特殊值,根据Hive.skewjoin.key设置的数量Hive能够知道,好比默认值是100000,那么超过100000条记录的值就是特殊值。

skew join的流程能够用下图描述:

 

另外对于特殊值的处理每每跟业务有关系,因此也能够从业务角度重写sql解决。好比前面这种倾斜join,能够把特殊值隔离开来(从业务角度说,users表应该不存在user_id = 0的状况,可是这里仍是假设有这个值,使得这个写法更加具备通用性): 

 

[js]  view plain copy
 
 
 
  1. select a.* from   
  2. (  
  3. select a.*  
  4. from (select * from logs where user_id = 0)  a   
  5. join (select * from users where user_id = 0) b   
  6. on a。user_id =  b。user_id  
  7. union all  
  8. select a.*   
  9. from logs a join users b  
  10. on a。user_id <> 0 and a。user_id = b.user_id  
  11. )t;  

 

数据倾斜不只仅是Hive的问题,实际上是share nothing架构下必然会碰到的数据分布问题,对此学界也有专门的研究,好比skewtune。

SQL总体优化

前面对于单个Job如何作优化已经作过详细讨论,可是Hive查询会生成多个Job,针对多个Job,有什么地方须要优化?

Job间并行

首先,在Hive生成的多个Job中,在有些状况下Job之间是能够并行的,典型的就是子查询。当须要执行多个子查询union all或者join操做的时候,Job间并行就可使用了。好比下面的代码就是一个能够并行的场景示意:

 

[js]  view plain copy
 
 
 
  1. select * from   
  2. (  
  3.    select count(*) from logs   
  4.    where log_date = 20130801 and item_id = 1  
  5.    union all   
  6.    select count(*) from logs   
  7.    where log_date = 20130802 and item_id = 2  
  8.    union all   
  9.    select count(*) from logs   
  10.    where log_date = 20130803 and item_id = 3  
  11. )t  

 

设置Job间并行的参数是Hive.exec.parallel,将其设为true便可。默认的并行度为8,也就是最多容许sql中8个Job并行。若是想要更高的并行度,能够经过Hive.exec.parallel. thread.number参数进行设置,但要避免设置过大而占用过多资源。

减小Job数

另外在实际开发过程当中也发现,一些实现思路会致使生成多余的Job而显得不够高效。好比这个需求:查询某网站日志中访问过页面a和页面b的用户数量。低效的思路是面向明细的,先取出看过页面a的用户,再取出看过页面b的用户,而后取交集,代码以下:

 

[js]  view plain copy
 
 
 
  1. select count(*)   
  2. from   
  3. (select distinct user_id   
  4. from logs where page_name = ‘a’) a  
  5. join   
  6. (select distinct user_id   
  7. from logs where blog_owner = ‘b’) b   
  8. on a.user_id = b.user_id;  

 

这样一来,就要产生2个求子查询的Job,一个用于关联的Job,还有一个计数的Job,一共有4个Job。

可是咱们直接用面向统计的方法去计算的话(也就是用group by替代join),则会更加符合M/R的模式,并且生成了一个彻底不带子查询的sql,只须要用一个Job就能跑完:

 

[js]  view plain copy
 
 
 
  1. select count(*)   
  2. from logs group by user_id  
  3. having (count(case when page_name = ‘a’ then 1 end) > 0  
  4.     and count(case when page_name = ‘b’ then 1 end) > 0)  

 

第一种查询方法符合思考问题的直觉,是工程师和分析师在实际查数据中最早想到的写法,可是若是在目前Hive的query planner不是那么智能的状况下,想要更加快速的跑出结果,懂一点工具的内部机理也是必须的。(做者:孙逸 / 审校:刘亚琼)

相关文章
相关标签/搜索