Hive调优

Hive调优原则

 

Hive是将符合SQL语法的字符串解析生成能够在Hadoop上执行的MapReduce的工具。理解Hadoop的核心能力,是Hive 优化的根本。使用Hive尽可能按照分布式计算的一些特色来设计SQL,Hive的调优原则主要包括如下几点:node

 

原子化操做

尽可能原子化操做,避免一个SQL包含复杂逻辑,可使用中间表来完成复杂的逻辑,从而提升总体执行效率。通常来讲,单个SQL所起的JOB个数尽可能控制在5个如下。若是Union All的部分个数大于2,或者每一个Union部分数据量大,应该拆分红多个Insert Into语句apache

 

充分利用服务器资源

让服务器尽量的多作事情,充分利用服务器的计算资源,以实现最高的系统吞吐量为目标。好比一个做业能作完的事情就不要拆开两个做业来完成。Reduce个数过少不能真正发挥Hadoop并行计算的优点,但Reduce个数过多,会形成大量小文件问题,因此须要根据数据量和资源状况找到一个折衷点。服务器

Hive能够将没有依赖关系的屡次MR过程(例如Union All语义中的多个子查询)并发提交。使用hive.exec.parallel参数控制在同一个SQL中的不一样做业是否能够同时运行,从而提升做业的并发,充分利用服务器的资源:session

SET hive.exec.parallel=true;并发

SET hive.exec.parallel.thread.number=最大并发job数;app

 

选择最优的执行路径

让服务器尽可能少作事情,选择最优的执行路径,以资源消耗最少为目标。好比负载均衡

注意Join的使用jvm

两表关联时,若其中有一个表很小,则使用Map Join,不然使用普通的Reduce Join分布式

SET hive.auto.convert.join=true ;函数

SET hive.smalltable.filesize=25000000L(默认是25M);

 

注意小文件的问题

在Hive里有两种比较常见的处理办法。第一是使用Combinefileinputformat,将多个小文件打包做为一个总体的Inputsplit,减小Map任务数;

SET mapred.max.split.size=128000000;

SET mapred.min.split.size=128000000;

SET mapred.min.split.size.per.node=128000000;

SET mapred.min.split.size.per.rack=128000000;

SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

 

第二是文件数目小,容易在文件存储端形成瓶颈,给 HDFS 带来压力,影响处理效率。对此,能够经过合并Map和Reduce的结果文件来消除这样的影响。

设置Hive参数,将额外启动一个MR Job打包小文件。

用于设置合并属性的参数有:

是否合并Map输出文件:SET hive.merge.mapfiles=true(默认值为真)

是否合并Reduce 端输出文件:SET hive.merge.mapredfiles= true(默认值为假)

合并文件的大小:SET hive.merge.size.per.task=256*1000*1000(默认值为 256000000)

 

注意执行过程当中的数据倾斜问题

在Hive调优中比较经常使用的处理办法有两种:

第一,经过hive.groupby.skewindata = true控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到Reduce作次预汇总,减小某些key值条数过多或太小形成的数据倾斜问题。

第二,经过hive.map.aggr = true(默认为true)在Map端作Combiner,假如map各条数据基本上不同,聚合没什么意义,作Combiner反而多此一举,Hive会经过如下两个参数:

hive.groupby.mapaggr.checkinterval = 100000 (默认)

hive.map.aggr.hash.min.reduction = 0.5(默认)

预先取100000条数据聚合,若是聚合后的条数/100000>0.5,则再也不作聚合。

 

合理设置Map与Reduce的个数

增长Map数

同时可执行的Map数是有限的,一般状况下,做业会经过Input的目录产生一个或者多个Map任务,而主要的决定因素是Input的文件总个数和Input的文件大小。

若是表A只有一个文件,且大小超过100M,包含上千万记录,任务较为耗时,能够考虑用多个Map任务完成,有效提高性能

合理设置Reduce数

增长map数能够经过控制一个做业的Reduce数来加以控制。Reduce个数的设定会极大影响执行效率,通常基于如下参数设定:

hive.exec.reducers.bytes.per.reducer(每一个reduce任务处理的数据量,如1G)

hive.exec.reducers.max(每一个任务最大的reduce数)

若是Reduce的输入(Map的输出)总大小不超过1G,那么只会有一个Reduce任务。能够根据实际状况,经过缩小每一个Reduce任务处理的数据量来提升执行性能。

 

Hive SQL调优

Hive查询生成多个Map Reduce做业,一个Map Reduce做业又有map,reduce,spill,shuffle,sort等多个阶段,因此针对Hive SQL的优化能够大体分为针对MR中单个步骤的优化,针对MR全局的优化以及针对整个查询的优化。Hive SQL的调优贯穿全部阶段,主要是解决运行过程当中数据倾斜问题,减小做业数,对小文件进行合并处理,合理设置Map Reduce的任务数等,根据数据量和模型状况,经过迭代调测来有效提高性能。如下是常见的Hive SQL调优方法:

 

Hive join优化

减小没必要要的关联

Hive SQL和其余SQL同样,是一种功能强大的说明性语言,对于同一个业务功能,能够经过不一样的写法来实现,而不一样的写法会产生不一样的性能特色。

例如一个点击率表,包括了访问时间、SessionID、网址和来源IP地址共四个字段:

CREATE TABLE clicks (

timestamp date, sessionID string, url string, source_ip string)

STORED as ORC tblproperties (“orc.compress” = “SNAPPY”);

假设咱们须要查找每一个SessionID最后一次的访问网址:

方法一:

SELECT clicks.* FROM clicks inner join

(select sessionID, max(timestamp) as max_ts from clicks group by sessionID) latest

ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;

该方法使用了子查询方法去收集每一个SessionID的最后访问时间,而后经过Inner Join自关联去排除掉以前的点击访问记录,效率较低。

方法二:

SELECT * FROM (SELECT *, RANK() over (partition by sessionID,order by timestamp desc) as rank FROM clicks) ranked_clicks

WHERE ranked_clicks.rank=1;

这里使用了OLAP的排位函数去实现相同的业务查询,关键不须要表关联,仅为单表操做,删除没必要要的关联在大数据开发上意义重大,能大幅提升性能。

 

带表分区的Join

Hive是先关联再进行Where条件判断,若是在右表b中找不到左表a表的记录,b表中的因此列都会显示为NULL,这种状况下left outer的查询结果与where子句无关,解决办法是将Where条件放在JOIN ON的关联判断条件中。

SQL编写须要尽早过滤数据,以减小各个阶段的数据量,只选择须要用到的字段。

 

Skew Join优化

优化Skewed Join Key为Map Join。开启hive.optimize.skewjoin=true可优化处理过程当中倾斜的数据。但须要注意Skew Join优化须要额外的Map Join操做,且不能节省Shuffle的代价。

 

利用随机数减小数据倾斜

大表之间Join容易由于空值而产生数据倾斜,除了经过过滤方法排除空值,还能够利用随机数分散到不一样的Reduce上,例如:

select a.uid from big_table_a a left outer join big_table_b b on b.uid = case when a.uid is null or length(a.uid)=0 then concat('rd_sid',rand()) else a.uid end;

把空值的Key变成一个字符串加上随机数,就能把倾斜的数据分到不一样的Reduce上,解决数据倾斜问题。由于空值不参与关联,即便分到不一样的Reduce上,也不影响最终的结果,并且IO减小,做业数也减小了,执行效率更优。

 

Group by

Group By是在Reduce阶段的操做,防止数据倾斜:

Map端聚合,提早一部分计算:Hive.map.aggr = true

hive.groupby.skewindata为ture的时候,生成的查询计划会有两个MRJob:

第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每一个Reduce作部分聚合操做,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不一样的Reduce中,从而达到负载均衡的目的。

第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程能够保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操做。

Count Distinct

数据量小的时候无所谓,数据量大的状况下,因为COUNT DISTINCT操做须要用一个Reduce Task来完成,这一个Reduce须要处理的数据量太大,就会致使整个Job很难完成,通常COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:

SELECT day,

COUNT(DISTINCT id)AS uv

FROM test

GROUP BY day

能够转换成:

SELECT day,

COUNT(id) AS uv

FROM (SELECTday,id FROM test GROUP BY day,id) a

GROUP BY day;

虽然会多用一个Job来完成,但在数据量大的状况下,这个绝对是值得的。

Order by VS Sort by

Order by是在全局的排序,只用一个Reduce去运行,因此在SET Hive.mapred.mode=strict 模式下,不容许执行如下查询:

没有limit限制的order by语句

分区表上没有指定分区

笛卡尔积(JOIN时没有ON语句)。

而Sort by是在每一个Reduce内排序,只保证同一个Reduce下排序正确。

 

通用Hive SQL优化方法

一、尽可能利用分区,好比按时间进行分区。

二、关联条件不能忽略,避免Select *。

三、关联字段的类型保持一致,避免字段的强制转换。

四、避免使用LIKE 进行模糊匹配的查询。

五、对查询进行优化,要尽可能避免全表扫描

 

 

经常使用参数

(经常使用的一些可设置参数,具体数值按照须要进行调整!)

SET hive.optimize.skewjoin = true;

SET hive.skewjoin.key = 100000;

SET hive.exec.dynamic.partition.mode = nonstrict;

SET mapred.reducer.tasks = 50;

 

// Hive中间结果压缩和压缩输出

SET hive.exec.compress.output = true; -- 默认false

SET hive.exec.compress.intermediate = true; -- 默认false

SET mapred.output.compression.codec = org.apache.hadoop.io.compress.SnappyCodec; -- 默认org.apache.hadoop.io.compress.DefaultCodec

SET mapred.output.compression.type = BLOCK; -- 默认BLOCK

 

// 输出合并小文件

SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件

SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件

SET hive.merge.size.per.task = 268435456; -- 默认256M

SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge

 

// 设置map和reduce数量

SET mapred.max.split.size = 256000000;

SET mapred.min.split.size = 64000000;

SET mapred.min.split.size.per.node = 64000000;

SET mapred.min.split.size.per.rack = 64000000;

SET hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

SET hive.exec.reducers.bytes.per.reducer = 256000000; -- 默认64M,每一个reducer处理的数据量大小

 

// 设置数据倾斜和并行化

SET hive.exec.parallel = true; -- 并行执行

SET hive.exec.parallel.thread.number = 16;

SET mapred.job.reuse.jvm.num.tasks = 10;

SET hive.exec.dynamic.partition = true;

SET hive.optimize.cp = true; -- 列裁剪

SET hive.optimize.pruner = true; -- 分区裁剪

SET hive.groupby.skewindata = true; -- groupby数据倾斜

SET hive.exec.mode.local.auto = true; --本地执行

SET hive.exec.mode.local.auto.input.files.max = 10; //map数默认是4,当map数小于10就会启动任务本地执行

SET hive.exec.mode.local.auto.inputbytes.max = 128000000 --默认是128M

 

//关闭如下两个参数来完成关闭Hive任务的推测执行

SET mapred.map.tasks.speculative.execution=false;

SET mapred.reduce.tasks.speculative.execution=false;

相关文章
相关标签/搜索