(十七)Hive 优化策略

1、Hadoop 框架计算特性

一、数据量大不是问题,数据倾斜是个问题node

二、jobs 数比较多的做业运行效率相对比较低,好比即便有几百行的表,若是屡次关联屡次 汇总,产生十几个 jobs,耗时很长。缘由是 map reduce 做业初始化的时间是比较长的算法

三、sum,count,max,min 等 UDAF,不怕数据倾斜问题,hadoop 在 map 端的汇总合并优化,使 数据倾斜不成问题sql

四、count(distinct userid),在数据量大的状况下,效率较低,若是是多 count(distinct userid,month)效率更低,由于 count(distinct)是按 group by 字段分组,按 distinct 字段排序, 通常这种分布方式是很apache

倾斜的,好比 PV 数据,淘宝一天 30 亿的 pv,若是按性别分组,分 配 2 个 reduce,每一个 reduce 指望处理 15 亿数据,但现实一定是男少女多编程

回到顶部

2、优化经常使用手段

一、好的模型设计事半功倍缓存

二、解决数据倾斜问题服务器

三、减小 job 数网络

四、设置合理的 MapReduce 的 task 数,能有效提高性能。(好比,10w+级别的计算,用 160个 reduce,那是至关的浪费,1 个足够)并发

五、了解数据分布,本身动手解决数据倾斜问题是个不错的选择。这是通用的算法优化,但 算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,能够经过业务逻辑精 确有效的解决数据倾斜问题负载均衡

六、数据量较大的状况下,慎用 count(distinct),group by 容易产生倾斜问题

七、对小文件进行合并,是行之有效的提升调度效率的方法,假如全部的做业设置合理的文 件数,对云梯的总体调度效率也会产生积极的正向影响

八、优化时把握总体,单个做业最优不如总体最优

回到顶部

3、排序选择

cluster by:对同一字段分桶并排序,不能和 sort by 连用

distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证 每一个 reduceTask 结果有序

sort by:单机排序,单个 reduce 结果有序

order by:全局排序,缺陷是只能使用一个 reduce

必定要区分这四种排序的使用方式和适用场景

回到顶部

4、怎样作笛卡尔积

当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不容许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。由于找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

固然也可使用 limit 的办法来减小某个表参与 join 的数据量,但对于须要笛卡尔积语义的 需求来讲,常常是一个大表和一个小表的 Join 操做,结果仍然很大(以致于没法用单机处 理),这时 MapJoin才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操做。 这须要将 Join 操做的一个或多个表彻底读入内存。

PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表作笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条 目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。

精髓就在于复制几倍,最后就有几个 reduce 来作,并且大表的数据是前面小表扩张 key 值 范围里面随机出来的,因此复制了几倍 n,就至关于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。而且最后处理所用的 reduce 数量也是 n,并且也不会 出现数据倾斜。

回到顶部

5、怎样写 in/exists 语句

虽然通过测验,hive1.2.1 也支持 in/exists 操做,但仍是推荐使用 hive 的一个高效替代方案:left semi join

好比说:

select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);

应该转换成:

select a.id, a.name from a left semi join b on a.id = b.id;
回到顶部

6、设置合理的 maptask 数量

Map 数过大

  Map 阶段输出文件过小,产生大量小文件

  初始化和建立 Map 的开销很大

Map 数过小

  文件处理或查询并发度小,Job 执行时间过长

  大量做业时,容易堵塞集群 

在 MapReduce 的编程案例中,咱们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:

输入分片大小的计算是这么计算出来的:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默认状况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启 用一个 MapTask 进行处理,这样作的好处是避免了服务器节点之间的数据传输,提升 job 处 理效率

两种经典的控制 MapTask 的个数方案:减小 MapTask 数或者增长 MapTask 数

一、 减小 MapTask 数是经过合并小文件来实现,这一点主要是针对数据源

二、 增长 MapTask 数能够经过控制上一个 job 的 reduceTask 个数 

由于 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,默认状况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样若是任 务花费时间很短,又要屡次启动 JVM 的状况下,JVM 的启动时间会变成一个比较大的消耗, 这个时候,就能够经过重用 JVM 来解决:

 set mapred.job.reuse.jvm.num.tasks=5 

回到顶部

7、小文件合并

文件数目过多,会给 HDFS 带来压力,而且会影响处理效率,能够经过合并 Map 和 Reduce 的 结果文件来消除这样的影响:

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件

set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件

set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小

set mapred.max.split.size=256000000; ##每一个 Map 最大分割大小

set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并

回到顶部

8、设置合理的 reduceTask 的数量

Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情 况下,Hive 会猜想肯定一个 reducer 个数,基于如下两个设定:

一、hive.exec.reducers.bytes.per.reducer(默认为 256000000)

二、hive.exec.reducers.max(默认为 1009)

三、mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)

计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 一般状况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量一般会比输入有 大幅减小,所以即便不设定 reducer 个数,重设参数 2 仍是必要的。

依据 Hadoop 的经验,能够将参数 2 设定为 0.95*(集群中 datanode 个数)。 

回到顶部

9、合并 MapReduce 操做

Multi-group by 是 Hive 的一个很是好的特性,它使得 Hive 中利用中间结果变得很是方便。 例如:

FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了 multi-group by 特性连续 group by 了 2 次数据,使用不一样的 group by key。 这一特性能够减小一次 MapReduce 操做

回到顶部

10、合理利用分桶:Bucketing 和 Sampling

Bucket 是指将数据以指定列的值为 key 进行 hash,hash 到指定数目的桶中。这样就能够支 持高效采样了。以下例就是以 userid 这一列为 bucket 的依据,共设置 32 个 buckets

CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '1'
 COLLECTION ITEMS TERMINATED BY '2'
 MAP KEYS TERMINATED BY '3'
 STORED AS SEQUENCEFILE;

一般状况下,Sampling 在全体数据上进行采样,这样效率天然就低,它要去访问全部数据。 而若是一个表已经对某一列制做了 bucket,就能够采样全部桶中指定序号的某个桶,这就 减小了访问量。

以下例所示就是采样了 page_view 中 32 个桶中的第三个桶的所有数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);

以下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);

 

回到顶部

11、合理利用分区:Partition 

 Partition 就是分区。分区经过在建立表时启用 partitioned by 实现,用来 partition 的维度并不 是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时 能够采用 where 语句,形似 where tablename.partition_column = a 来实现。

建立含分区的表

CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '1'
STORED AS TEXTFILE;

载入内容,并指定分区标志

load data local inpath '/home/hadoop/pv_2008-06-08_us.txt' into table page_view
partition(date='2008-06-08', country='US');

查询指定标志的分区内容

SELECT page_views.* FROM page_views
 WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND
page_views.referrer_url like '%xyz.com';

 

回到顶部

12、Join 优化

整体原则:

  一、 优先过滤后再进行 Join 操做,最大限度的减小参与 join 的数据量

  二、 小表 join 大表,最好启动 mapjoin

  三、 Join on 的条件相同的话,最好放入同一个 job,而且 join 表的排列顺序从小到大 

在使用写有 Join 操做的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操做 符的左边。缘由是在 Join 操做的 Reduce 阶段,位于 Join 操做符左边的表的内容会被加 载进内存,将条目少的表放在左边,能够有效减小发生 OOM 错误的概率。对于一条语句 中有多个 Join 的状况,若是 Join 的条件相同,好比查询

INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);

 

若是 Join 的 key 相同,无论有多少个表,都会则会合并为一个 Map-Reduce 任务,而不 是”n”个,在作 OUTER JOIN 的时候也是同样

若是 join 的条件不相同,好比:

INSERT OVERWRITE TABLE pv_users
 SELECT pv.pageid, u.age FROM page_view p
 JOIN user u ON (pv.userid = u.userid)
 JOIN newuser x on (u.age = x.age);

Map-Reduce 的任务数目和 Join 操做的数目是对应的,上述查询和如下查询是等价的

--先 page_view 表和 user 表作连接
INSERT OVERWRITE TABLE tmptable
 SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid);
-- 而后结果表 temptable 和 newuser 表作连接
INSERT OVERWRITE TABLE pv_users
 SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age); 

在编写 Join 查询语句时,若是肯定是因为 join 出现的数据倾斜,那么请作以下设置:

set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置

set hive.optimize.skewjoin=true; // 若是是 join 过程出现倾斜应该设置为 true 

回到顶部

十3、Group By 优化

 

 一、Map 端部分聚合

并非全部的聚合操做都须要在 Reduce 端完成,不少聚合操做均可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。

MapReduce 的 combiner 组件参数包括:

set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True

set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操做的条目数目

 

二、使用 Group By 有数据倾斜的时候进行负载均衡

 set hive.groupby.skewindata = true

当 sql 语句使用 groupby 时数据出现倾斜时,若是该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分红两个:第一个先作预汇总,第二个再作最终汇总

在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每一个 Reduce 作部分聚 合操做,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不一样的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程能够保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操做。

回到顶部

十4、合理利用文件存储格式 

建立表时,尽可能使用 orc、parquet 这些列式存储格式,由于列式存储的表,每一列的数据在 物理上是存储在一块儿的,Hive 查询时会只遍历须要列数据,大大减小处理的数据量。

回到顶部

十5、本地模式执行 MapReduce

Hive 在集群上查询时,默认是在集群上 N 台机器上运行, 须要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。可是当 Hive 查询处理的数据量比较小时,其实 没有必要启动分布式模式去执行,由于以分布式方式执行就涉及到跨网络传输、多节点协调 等,而且消耗资源。这个时间能够只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。启动本地模式涉及到三个参数:

set hive.exec.mode.local.auto=true 是打开 hive 自动判断是否启动本地模式的开关,可是只 是打开这个参数并不能保证启动本地模式,要当 map 任务数不超过

hive.exec.mode.local.auto.input.files.max 的个数而且 map 输入文件大小不超过

hive.exec.mode.local.auto.inputbytes.max 所指定的大小时,才能启动本地模式。

回到顶部

十6、并行化处理

一个 hive sql 语句可能会转为多个 mapreduce Job,每个 job 就是一个 stage,这些 job 顺序 执行,这个在 cli 的运行日志中也能够看到。可是有时候这些任务之间并非是相互依赖的, 若是集群资源容许的话,可让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度,可是若是集群资源匮乏时,启用并行化反却是会致使各个 job 相互抢占资源 而致使总体执行性能的降低。启用并行化:

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=8; //同一个 sql 容许并行任务的最大线程数

回到顶部

十7、设置压缩存储

 

一、压缩的缘由

Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减小数据量,对数据进行压缩是个好的方式。压缩 虽然是减小了数据量,可是压缩过程要消耗 CPU 的,可是在 Hadoop 中, 每每性能瓶颈不 在于 CPU,CPU 压力并不大,因此压缩充分利用了比较空闲的 CPU

 

二、经常使用压缩方法对比

各个压缩方式所对应的 Class 类:

 

三、压缩方式的选择

压缩比率

压缩解压缩速度

是否支持 Split

 

四、压缩使用

Job 输出文件按照 block 以 GZip 的方式进行压缩:

set mapreduce.output.fileoutputformat.compress=true // 默认值是 false

set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默认值是 Record

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

Map 输出结果也以 Gzip 进行压缩:

set mapred.map.output.compress=true

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec 

对 Hive 输出结果和中间都进行压缩:

set hive.exec.compress.output=true // 默认值是 false,不压缩

set hive.exec.compress.intermediate=true // 默认值是 false,为 true 时 MR 设置的压缩才启用

相关文章
相关标签/搜索