Apache Hive做为处理大数据量的大数据领域数据建设核心工具,数据量每每不是影响Hive执行效率的核心因素,数据倾斜、job数分配的不合理、磁盘或网络I/O太高、MapReduce配置的不合理等等才是影响Hive性能的关键。node
Hive在执行任务时,一般会将Hive SQL转化为MapReduce job进行处理。所以对Hive的调优,除了对Hive语句自己的优化,也要考虑Hive配置项以及MapReduce相关的优化。从更底层思考如何优化性能,而不是仅仅局限于代码/SQL的层面。
列裁剪和分区裁剪
Hive在读数据的时候,只读取查询中所须要用到的列,而忽略其它列。例如,如有如下查询:面试
SELECT age, name FROM people WHERE age > 30;
在实施此项查询中,people表有3列(age,name,address),Hive只读取查询逻辑中真正须要的两列age、name,而忽略列address;这样作节省了读取开销,中间表存储开销和数据整合开销。sql
同理,对于Hive分区表的查询,咱们在写SQL时,经过指定实际须要的分区,能够减小没必要要的分区数据扫描【当Hive表中列不少或者数据量很大时,若是直接使用select * 或者不指定分区,效率会很低下(全列扫描和全表扫描)】。shell
Hive中与列裁剪和分区裁剪优化相关的配置参数分别为:hive.optimize.cp和hive.optimize.pruner,默认都是true。数据库
谓词下推
apache
在关系型数据库如MySQL中,也有谓词下推(Predicate Pushdown,PPD)的概念。它就是将SQL语句中的where谓词逻辑都尽量提早执行,减小下游处理的数据量。微信
以下Hive SQL语句:网络
select a.*, b.* from a join b on (a.id = b.id)where a.id > 15 and b.num > 16;
若是没有谓词下推,上述SQL须要在完成join处理以后才会执行where条件过滤。在这种状况下,参与join的数据可能会很是多,从而影响执行效率。架构
使用谓词下推,那么where条件会在join以前被处理,参与join的数据量减小,提高效率。app
在Hive中,能够经过将参数hive.optimize.ppd设置为true,启用谓词下推。与它对应的逻辑优化器是PredicatePushDown。该优化器就是将OperatorTree中的FilterOperator向上提,见下图:
Hive join优化
关于Hive join,参考文章:《Hive join优化》。
hive.fetch.task.conversion
虽然Hive底层能够将Hive SQL转化为MapReduce执行,但有些状况不使用MapReduce处理效率跟高。好比对于以下SQL:
SELECT name FROM people;
在这种状况下,Hive能够简单地读取people对应的存储目录下的文件,而后返回数据。
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改成more之后,在全局查找、字段查找、limit查找等都不走mapreduce。
<property> <name>hive.fetch.task.conversion</name> <value>more</value> <description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) </description> </property>
将hive.fetch.task.conversion设置成none,在Hive shell中执行以下语句,都会执行MapReduce程序。
hive> set hive.fetch.task.conversion=none; hive> select name from people; hive> select * from people;
把hive.fetch.task.conversion设置成more,而后执行以下语句,以下查询方式都不会执行MapReduce程序。
hive> set hive.fetch.task.conversion=more; hive> select name from people; hive> select * from people;
经过参数说明发现当把hive.fetch.task.conversion设置成none时,全部的程序都走mapreduce程序会耗费必定的时间。但就算设置成more,也只有部分sql语句会不走MapReduce程序,那有没有什么办法能够优化这个问题呢?这就不得不提本地模式了。
group by
1)map端预聚合
经过在map端进行一次预聚合(起一个combiner),能够有效减小shuffle的数据量,而后再在reduce端获得最终结果。
预聚合的配置参数为hive.map.aggr,默认值true。
此外,经过hive.groupby.mapaggr.checkinterval参数能够设置map端预聚合的条数阈值,超过该值就会分拆job,默认值100000。
2)数据倾斜时进行负载均衡处理
当group by时,若是某些key对应的数据量过大,会致使数据倾斜。
经过将参数hive.groupby.skewindata(默认false)设置为true,那么在进行group by时,会启动两个MR job。第一个job会将map端数据随机输入reducer,每一个reducer作部分聚合操做,相同的group by key会分布在不一样的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。
可是,相对于正常的任务执行,该参数配置为true时会多启动一个MR job,会增长开销,单纯依赖它解决数据倾斜并不能从根本上解决问题。所以,建议分析数据、Hive SQL语句等,了解产生数据倾斜的根本缘由进行解决。
count(distinct)
count(distinct)采用很是少的reducer进行数据处理。数据量小时对执行效率影响不明显,可是当数据量大时,效率会很低,尤为是数据倾斜的时候。
能够经过group by代替count(distinct)使用。示例以下:
原SQL:SELECT count(DISTINCT id) FROM people; group by替换后:SELECT count(id) FROM (SELECT id FROM people GROUP BY id) tmp;
注意:上述group by替换后,会启动两个MR job(只是distinct只会启动一个),因此要确保启动job的开销远小于计算耗时,才考虑这种方法。不然当数据集很小或者key的倾斜不明显时,group by还可能会比count(distinct)还慢。
此外,如何用group by方式同时统计多个列?下面提供一种SQL方案:
select tmp.a, sum(tmp.b), count(tmp.c), count(tmp.d) from ( select a, b, null c, null d from some_table union all select a, 0 b, c, null d from some_table group by a,c union all select a, 0 b, null c, d from some_table group by a,d ) tmp;
笛卡尔积
除非业务须要,在生产中要极力避免笛卡尔积,好比在join语句中不指定on链接条件,或者无效的on链接条件,Hive只能使用1个reducer来完成笛卡尔积。
本地模式
对于处理小数据量的任务,咱们不须要经过集群模式进行处理(由于为该任务实际触发的job执行等开销可能比实际任务的执行时间还要长),Hive能够经过本地模式在单台机器上处理全部的任务。
能够经过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true;
设置本地MR的最大输入数据量,当输入数据量小于这个值时采用本地MR的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
设置本地MR的最大输入文件个数,当输入文件个数小于这个值时采用本地MR的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
left semi join替代in/exsits
left semi join是in、exists的高效实现。好比,对于以下SQL
select t1.id, t1.name from t1 where t1.id in (select t2.id from t2);
改成left semi join执行:
select t1.id, t1.name from t1 left semi join t2 on t1.id = t2.id;
MapReduce相关的优化
1. mapper和reducer个数
关于MapReduce中mapper和reducer个数的决定机制,建议阅读文章:《详解MapReduce》
2. 合并小文件
1)输入阶段合并
设置参数hive.input.format为org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。(默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat)。
此外还需配置两个参数:mapred.min.split.size.per.node(单节点上的最小split大小)和mapred.min.split.size.per.rack(单机架上的最小split大小)。
若是有split大小小于这两个值,则会进行合并。
2)输出阶段合并
将hive.merge.mapfiles和hive.merge.mapredfiles都设为true,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。
此外,hive.merge.size.per.task能够指定每一个task输出后合并文件大小的指望值,hive.merge.size.smallfiles.avgsize能够指定全部输出文件大小的均值阈值。若是平均大小不足的话,就会另外启动一个任务来进行合并。
3. 启用压缩
压缩job的中间结果数据和输出数据,能够用少许CPU时间节省不少空间,压缩方式通常选择Snappy。(关于Hadoop支持的压缩格式,参考文章:《 Hadoop支持的压缩格式对比和应用场景以及Hadoop native库 》)
要启用中间压缩,须要设定hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。
另外,参数hive.intermediate.compression.type能够选择对块(BLOCK)仍是记录(RECORD)压缩,BLOCK的压缩率比较高。
输出压缩的配置基本相同,打开hive.exec.compress.output便可。
4. JVM重用
在MR job中,默认是每执行一个task就启动一个JVM。能够经过配置参数mapred.job.reuse.jvm.num.tasks来进行JVM重用。
例如将这个参数设成5,那么就表明同一个MR job中顺序执行的5个task能够重复使用一个JVM,减小启动和关闭的开销。但它对不一样MR job中的task无效。
采用合适的存储格式
在HiveQL的create table语句中,可使用stored as ...指定表的存储格式。Hive目前支持的存储格式有TextFile、SequenceFile、RCFile、avro、orc、parquet等。
固然,咱们也能够采用alter table … [PARTITION partition_spec] set fileformat,修改具体表的文件格式
parquet和orc是企业中经常使用的两种数据存储格式,具体能够参考官网:
https://parquet.apache.org/和https://orc.apache.org/。
推测执行
在分布式集群环境下,因为负载不均衡或者资源分布不均等缘由,会形成同一个做业的多个job之间运行速度不一致,有些job的运行速度可能明显慢于其余任务,则这些job会拖慢整个做业的执行进度。为了不这种状况发生,Hadoop采用了推测执行(Speculative Execution)机制。
"推测执行"机制,根据必定的规则推测出"拖后腿"的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最早成功运行完成任务的计算结果做为最终结果。
Hive一样能够开启推测执行。设置开启推测执行参数(在配置文件mapred-site.xml中进行配置)
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
hive自己也提供了配置项来控制reduce-side的推测执行:
<property> <name>hive.mapred.reduce.tasks.speculative.execution</name> <value>true</value> <description>Whether speculative execution for reducers should be turned on. </description> </property>
关于调优这些推测执行变量,目前还很难给出一个具体建议。若是用户对于运行时的误差很是敏感的话,那么能够将这些功能关闭掉。若是用户由于输入数据量很大而须要执行长时间的map或者reduce task的话,那么启动推测执行形成的浪费是很是巨大。
推荐文章:
一次Java内存泄漏的排查
经典的SparkSQL/Hive-SQL/MySQL面试-练习题
关注微信公众号:大数据学习与分享,获取更对技术干货