第4节 hive调优:一、二、fetch抓取和表的优化

hive的调优:
第一个调优:fetch抓取,可以避免使用mr的,就尽可能不要用mr,由于mr太慢了
set hive.fetch.task.conversion=more 表示咱们的全局查找,字段查找,limit查找都不走mr
这个属性配置有三个取值 more minimal none 若是配置成none,全部的都要走mr程序

hive的本地模式:
set hive.exec.mode.local.auto=true 开启本地模式,解决多个小文件输入的时候,分配资源时间超过数据的计算时间
set hive.exec.mode.local.auto.inputbytes.max=51234560; 设置输入的数据临界值,若是小于这值都认为是小任务模式,启动本地模式来执行
set hive.exec.mode.local.auto.input.files.max=10; 设置输入文件个数的临界值,若是小于这个数量,那么也认为是小任务模式node

注:mapreduce中还有一个小任务模式:linux

hadoop当中的小任务模式 ubermode:
启动一个maptask,分配资源花了30s,而后maptask去处理一个小文件,花了3s
hadoop当中的小任务模式:
mapreduce.job.ubertask.enable 设置咱们须要开启hadoop任务的小任务模式 小任务模式能够根据咱们输入的数据量作判断,若是输入的数据量比较小
输入10个文件,每一个文件2KB,输入的数据总量也就是20KB,10个小文件,占用10个block块,每一个block块对应要启动一个maptask
能够考虑使用ubermode 小任务模式来实现全部的数据就在一个maptask里面去处理。sql

set mapreduce.job.ubertask.enable; //查看该参数的值。数据库

set mapreduce.job.ubertask.enable=true; //设置该参数的值数据结构



第二个优化:hive表的优化
去重的优化:
select count(distinct s_id) from score;这种写法全部的去重数据都会在一个reduce当中去,形成数据处理比较慢
select count(1) from (select s_id from score group by s_id) bysid; 这种写法,使用了一个嵌套子查询,先对数据进行group by去重,而后再进行统计
尽可能避免大sql,能够将一个很大的sql拆成多段,分步的去执行

大表join大表的优化:
空key的过滤

不过滤:
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
结果:
No rows affected (152.135 seconds)负载均衡

过滤:过滤掉咱们全部的为null的id,使得咱们的输入数据量变少
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
结果:
No rows affected (141.585 seconds)函数

空key的转换
若是规定这些空key过滤不调,那么咱们能够对空key进行转换
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;
若是空key比较多,那么就会将大量的空key转换成 hive,那么就会遇到一个问题,数据倾斜
数据倾斜的表现形式:有一个reduce处理的数据量远远比其余reduce处理的数据量要大,形成其余的reduce数据都处理完了,这个还没处理完

怎么发现的数据倾斜,如何出现的数据倾斜,怎么解决的数据倾斜

空key的打散
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;
经过将空key打散成不一样的随机字符串,就能够解决咱们hive的数据倾斜的问题oop

hive第三个优化:map端的join
hive已经开启了自动的map端的join功能,不论是咱们的大表join小表,仍是小表join大表,都会将咱们的小表加载到内存当中来
首先第一步:启动一个local的task,寻找哪一个表的数据是小表数据


hive的group by优化:能在map端聚合的数据,就尽可能在map端进行聚合
多加一层mr的程序,让咱们的数据实现均衡的负载,避免数据的倾斜
测试

count(distinct)的优化:
这种写法效率低下:SELECT count(DISTINCT id) FROM bigtable;
能够准换成这种写法:SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;
fetch

笛卡尔积:任什么时候候都要避免笛卡尔积,避免无效的on条件
select from A left join B -- on A.id = B.id

使用分区裁剪,列裁剪:
分区裁剪:若是是咱们的分区表,那么查询的时候,尽可能带上咱们的分区条件
列裁剪:尽可能避免使用select * ,须要查询哪些列,就选择哪些列

动态分区调整:
分区表的数据加载两种方式:
load data inpath '/export/xxx' into table xxx partition(month = 'xxx')
insert overwrite table xxx partition (month = 'xxx') select xxx

使用动态分区动态的添加数据
若是要使用动态分区添加数据,最后一个字段必定要是咱们的分区字段
INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)
SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time
FROM ori_partitioned;

==========================================================

9、调优

9.1 Fetch抓取(Hive能够避免进行MapReduce)

Hive中对某些状况的查询能够没必要使用MapReduce计算。例如:SELECT * FROM employees;在这种状况下,Hive能够简单地读取employee对应的存储目录下的文件,而后输出查询结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改成more之后,在全局查找、字段查找、limit查找等都不走mapreduce。

<property>

    <name>hive.fetch.task.conversion</name>

    <value>more</value>

    <description>

      Expects one of [none, minimal, more].

      Some select queries can be converted to single FETCH task minimizing latency.

      Currently the query should be single sourced not having any subquery and should not have

      any aggregations or distincts (which incurs RS), lateral views and joins.

      0. none : disable hive.fetch.task.conversion

      1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 全部的都要走MR

      2. more    : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

    </description>

  </property>

案例实操:

       1)把hive.fetch.task.conversion设置成none,而后执行查询语句,都会执行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

2)把hive.fetch.task.conversion设置成more,而后执行查询语句,以下查询方式都不会执行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

9.2 本地模式

大多数的Hadoop Job是须要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是很是小的。在这种状况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种状况,Hive能够经过本地模式在单台机器上处理全部的任务。对于小数据集,执行时间能够明显被缩短。

用户能够经过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

set hive.exec.mode.local.auto=true;  //开启本地mr

//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local  mr的方式,默认为134217728,即128M

set hive.exec.mode.local.auto.inputbytes.max=51234560;

//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4

set hive.exec.mode.local.auto.input.files.max=10;

案例实操:

1)开启本地模式,并执行查询语句

hive (default)> set hive.exec.mode.local.auto=true; 

hive (default)> select * from score cluster by s_id;

18 rows selected (1.568 seconds)

2)关闭本地模式,并执行查询语句

hive (default)> set hive.exec.mode.local.auto=false; 

hive (default)> select * from score cluster by s_id;

18 rows selected (11.865 seconds)

9.2 表的优化

9.2.1 Join

Join原则:

1)小表Join大表,

将key相对分散,而且数据量小的表放在join的左边,这样能够有效减小内存溢出错误发生的概率;再进一步,可使用Group让小的维度表(1000条如下的记录条数)先进内存。在map端完成reduce。

select  count(distinct s_id)  from score;

select count(s_id) from score group by s_id; 在map端进行聚合,效率更高

select count(1) from (select s_id from score group by s_id) t;

 

 

2)多个表关联时,最好分拆成小段,避免大sql(没法控制中间Job)

3)大表Join大表

(1)空KEY过滤

有时join超时是由于某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而致使内存不够。此时咱们应该仔细分析这些异常的key,不少状况下,这些key对应的数据是异常数据,咱们须要在SQL语句中进行过滤。例如key对应的字段为空,操做以下:

环境准备:

create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/export/servers/hivedatas/hive_big_table/*' into table ori;

load data local inpath '/export/servers/hivedatas/hive_have_null_id/*' into table nullidtable;

 

不过滤:

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;

结果:

No rows affected (152.135 seconds)

 

过滤:

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL) a JOIN ori b ON a.id = b.id;

结果:

No rows affected (141.585 seconds)

 (2)空key转换

有时虽然某个key为空对应的数据不少,可是相应的数据不是异常数据,必需要包含在join的结果中,此时咱们能够表a中key为空的字段赋一个随机的值,使得数据随机均匀地分布到不一样的reducer上。例如:

不随机分布:

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;

No rows affected (41.668 seconds)   52.477

结果:这样的后果就是全部为null值的id所有都变成了相同的字符串,及其容易形成数据的倾斜(全部的key相同,相同key的数据会到同一个reduce当中去)

 (3)空key打散

为了解决这种状况,咱们能够经过hiverand函数,随机地给每个为空的id赋上一个随机值,这样就不会形成数据倾斜

随机分布:

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;

No rows affected (42.594 seconds)

 

 

 

 

4)案例实操

(0)需求:测试大表JOIN小表和小表JOIN大表的效率 (新的版本当中已经没有区别了,旧的版本当中须要使用小表)

(1)建大表、小表和JOIN后表的语句

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable2(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

(2)分别向大表和小表中导入数据

hive (default)> load data local inpath '/export/servers/hivedatas/big_data' into table bigtable;

hive (default)>load data local inpath '/export/servers/hivedatas/small_data' into table smalltable;

(3)关闭mapjoin功能(默认是打开的)

set hive.auto.convert.join = false;

(4)执行小表JOIN大表语句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM smalltable s

left JOIN bigtable  b

ON b.id = s.id;

Time taken: 67.411 seconds 

 

(5)执行大表JOIN小表语句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM bigtable  b

left JOIN smalltable  s

ON s.id = b.id;

Time taken: 69.376seconds

能够看出大表join小表或者小表join大表,就算是关闭map端join的状况下,在新的版本当中基本上没有区别了(hive为了解决数据倾斜的问题,会自动进行过滤)

9.2.2 MapJoin

若是不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操做转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。能够用MapJoin把小表所有加载到内存在map端进行join,避免reducer处理。

1)开启MapJoin参数设置:

(1)设置自动选择Mapjoin

set hive.auto.convert.join = true; 默认为true

(2)大表小表的阈值设置(默认25M如下认为是小表):

set hive.mapjoin.smalltable.filesize=25123456;

2)MapJoin工做机制

 

首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,以后将该文件加载到DistributeCache中。

接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

因为MapJoin没有Reduce,因此由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

案例实操:

(1)开启Mapjoin功能

set hive.auto.convert.join = true; 默认为true

(2)执行小表JOIN大表语句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM smalltable s

JOIN bigtable  b

ON s.id = b.id;

Time taken: 31.814 seconds

 

(3)执行大表JOIN小表语句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM bigtable  b

JOIN smalltable  s

ON s.id = b.id;

Time taken: 28.46 seconds

9.2.3 Group By

默认状况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。

    并非全部的聚合操做都须要在Reduce端完成,不少聚合操做均可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

1)开启Map端聚合参数设置

       (1)是否在Map端进行聚合,默认为True

set hive.map.aggr = true;

(2)在Map端进行聚合操做的条目数目

    set hive.groupby.mapaggr.checkinterval = 100000;

(3)有数据倾斜的时候进行负载均衡(默认是false)

    set hive.groupby.skewindata = true;

    当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每一个Reduce作部分聚合操做,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不一样的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程能够保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操做。

9.2.4 Count(distinct)

数据量小的时候无所谓,数据量大的状况下,因为COUNT DISTINCT操做须要用一个Reduce Task来完成,这一个Reduce须要处理的数据量太大,就会致使整个Job很难完成,通常COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:

环境准备:

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/home/admin/softwares/data/100万条大表数据(id除以10取整)/bigtable' into table bigtable;

 

set hive.exec.reducers.bytes.per.reducer=32123456;

SELECT count(DISTINCT id) FROM bigtable;

结果:

c0

10000

Time taken: 35.49 seconds, Fetched: 1 row(s)

能够转换成:

set hive.exec.reducers.bytes.per.reducer=32123456;

SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;

结果:

Stage-Stage-1: Map: 1  Reduce: 4   Cumulative CPU: 13.07 sec   HDFS Read: 120749896 HDFS Write: 464 SUCCESS

Stage-Stage-2: Map: 3  Reduce: 1   Cumulative CPU: 5.14 sec   HDFS Read: 8987 HDFS Write: 7 SUCCESS

_c0

10000

Time taken: 51.202 seconds, Fetched: 1 row(s)

虽然会多用一个Job来完成,但在数据量大的状况下,这个绝对是值得的。

9.2.5 笛卡尔积

尽可能避免笛卡尔积,即避免join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。

9.2.6 使用分区剪裁、列剪裁

在SELECT中,只拿须要的列,若是有,尽可能使用分区过滤,少用SELECT *。

在分区剪裁中,当使用外关联时,若是将副表的过滤条件写在Where后面,那么就会先全表关联,以后再过滤,好比:

环境准备:

create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/home/admin/softwares/data/加递增id的原始数据/ori' into table ori;

 

load data local inpath '/home/admin/softwares/data/100万条大表数据(id除以10取整)/bigtable' into table bigtable;

先关联再Where:

SELECT a.id

FROM bigtable a

LEFT JOIN ori b ON a.id = b.id

WHERE b.id <= 10;

正确的写法是写在ON后面:先Where再关联

SELECT a.id

FROM ori a

LEFT JOIN bigtable b ON (a.id <= 10 AND a.id = b.id);

或者直接写成子查询:

SELECT a.id

FROM bigtable a

RIGHT JOIN (SELECT id

FROM ori

WHERE id <= 10

) b ON a.id = b.id;

9.2.7 动态分区调整

关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了相似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,须要进行相应的配置。

 

说白了就是以第一个表的分区规则,来对应第二个表的分区规则,将第一个表的全部分区,所有拷贝到第二个表中来,第二个表在加载数据的时候,不须要指定分区了,直接用第一个表的分区便可

 

 

1)开启动态分区参数设置

(1)开启动态分区功能(默认true,开启)

set hive.exec.dynamic.partition=true;

(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示容许全部的分区字段均可以使用动态分区。)

set hive.exec.dynamic.partition.mode=nonstrict;

(3)在全部执行MR的节点上,最大一共能够建立多少个动态分区。

set  hive.exec.max.dynamic.partitions=1000;

       (4)在每一个执行MR的节点上,最大能够建立多少个动态分区。该参数须要根据实际的数据来设定。好比:源数据中包含了一年的数据,即day字段有365个值,那么该参数就须要设置成大于365,若是使用默认值100,则会报错。

set hive.exec.max.dynamic.partitions.pernode=100

(5)整个MR Job中,最大能够建立多少个HDFS文件。

       在linux系统当中,每一个linux用户最多能够开启1024个进程,每个进程最多能够打开2048个文件,即持有2048个文件句柄,下面这个值越大,就能够打开文件句柄越大

set hive.exec.max.created.files=100000;

(6)当有空分区生成时,是否抛出异常。通常不须要设置。

set hive.error.on.empty.partition=false;

2)案例实操

需求:将ori中的数据按照时间(如:20111231234568),插入到目标表ori_partitioned的相应分区中。

(1)准备数据原表

create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)

PARTITIONED BY (p_time bigint)

row format delimited fields terminated by '\t';

 

load data local inpath '/export/servers/hivedatas/small_data' into  table ori_partitioned partition (p_time='20111230000010');

 

load data local inpath '/export/servers/hivedatas/small_data' into  table ori_partitioned partition (p_time='20111230000011');

(2)建立分区表

create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';

(3)分析

若是按照以前介绍的往指定一个分区中Insert数据,那么这个需求很不容易实现。这时候就须要使用动态分区来实现。

set hive.exec.dynamic.partition = true;

set hive.exec.dynamic.partition.mode = nonstrict;

set hive.exec.max.dynamic.partitions = 1000;

set hive.exec.max.dynamic.partitions.pernode = 100;

set hive.exec.max.created.files = 100000;

set hive.error.on.empty.partition = false;

 

INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)

SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time

FROM ori_partitioned;

注意:在PARTITION (month,day)中指定分区字段名便可;

在SELECT子句的最后几个字段,必须对应前面PARTITION (month,day)中指定的分区字段,包括顺序。

查看分区

hive> show partitions ori_partitioned_target;

OK

p_time=20111230000010

p_time=20111230000011

9.2.8 分桶

参见分桶表

相关文章
相关标签/搜索