在工做中使用hive比较多,也写了不少HiveQL。这里从三个方面对 Hive 经常使用的一些性能优化进行了总结。算法
分区表 是在某一个或者几个维度上对数据进行分类存储,一个分区对应一个目录。若是筛选条件里有分区字段,那么 Hive 只须要遍历对应分区目录下的文件便可,不须要遍历全局数据,使得处理的数据量大大减小,从而提升查询效率。sql
当一个 Hive 表的查询大多数状况下,会根据某一个字段进行筛选时,那么很是适合建立为分区表。apache
指定桶的个数后,存储数据时,根据某一个字段进行哈希后,肯定存储在哪一个桶里,这样作的目的和分区表相似,也是使得筛选时不用全局遍历全部的数据,只须要遍历所在桶就能够了。性能优化
Apache Hive 支持 Apache Hadoop 中使用的几种熟悉的文件格式。网络
TextFile 默认格式,若是建表时不指定默认为此格式。架构
存储方式:行存储。并发
每一行都是一条记录,每行都以换行符\n
结尾。数据不作压缩时,磁盘会开销比较大,数据解析开销也比较大。app
可结合 Gzip、Bzip2 等压缩方式一块儿使用(系统会自动检查,查询时会自动解压),但对于某些压缩算法 hive 不会对数据进行切分,从而没法对数据进行并行操做。负载均衡
SequenceFilejvm
一种Hadoop API 提供的二进制文件,使用方便、可分割、个压缩的特色。
支持三种压缩选择:NONE、RECORD、BLOCK。RECORD压缩率低,通常建议使用BLOCK压缩。
RCFile
存储方式:数据按行分块,每块按照列存储 。
ORC
存储方式:数据按行分块,每块按照列存储
Hive 提供的新格式,属于 RCFile 的升级版,性能有大幅度提高,并且数据能够压缩存储,压缩快,快速列存取。
Parquet
存储方式:列式存储
Parquet 对于大型查询的类型是高效的。对于扫描特定表格中的特定列查询,Parquet特别有用。Parquet通常使用 Snappy、Gzip 压缩。默认 Snappy。
Parquet 支持 Impala 查询引擎。
表的文件存储格式尽可能采用 Parquet 或 ORC,不只下降存储量,还优化了查询,压缩,表关联等性能;
Hive 语句最终是转化为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在与 网络IO 和 磁盘IO,要解决性能瓶颈,最主要的是 减小数据量,对数据进行压缩是个好方式。压缩虽然是减小了数据量,可是压缩过程要消耗CPU,可是在Hadoop中,每每性能瓶颈不在于CPU,CPU压力并不大,因此压缩充分利用了比较空闲的CPU。
经常使用压缩算法对比
如何选择压缩方式
支持分割的文件能够并行的有多个 mapper 程序处理大数据文件,大多数文件不支持可分割是由于这些文件只能从头开始读。
Hive 在读数据的时候,能够只读取查询中所须要用到的列,而忽略其余的列。这样作能够节省读取开销,中间表存储开销和数据整合开销。
set hive.optimize.cp = true; -- 列裁剪,取数只取查询中须要用到的列,默认为真
复制代码
在查询的过程当中只选择须要的分区,能够减小读入的分区数目,减小读入的数据量。
set hive.optimize.pruner=true; // 默认为true
复制代码
在执行 MapReduce 程序的时候,通常状况是一个文件须要一个 mapper 来处理。可是若是数据源是大量的小文件,这样岂不是会启动大量的 mapper 任务,这样会浪费大量资源。能够将输入的小文件进行合并,从而减小mapper任务数量。详细分析
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端输入、合并文件以后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端输入,不合并
复制代码
大量的小文件会给 HDFS 带来压力,影响处理效率。能够经过合并 Map 和 Reduce 的结果文件来消除影响。
set hive.merge.mapfiles=true; -- 是否合并Map输出文件, 默认值为真
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端输出文件,默认值为假
set hive.merge.size.per.task=25610001000; -- 合并文件的大小,默认值为 256000000
复制代码
减小 mapper 数能够经过合并小文件来实现 增长 mapper 数能够经过控制上一个 reduce
默认的 mapper 个数计算方式
输入文件总大小:total_size
hdfs 设置的数据块大小:dfs_block_size
default_mapper_num = total_size/dfs_block_size
复制代码
MapReduce 中提供了以下参数来控制 map 任务个数:
set mapred.map.tasks=10;
复制代码
从字面上看,貌似是能够直接设置 mapper 个数的样子,可是很遗憾不行,这个参数设置只有在大于default_mapper_num
的时候,才会生效。
那若是咱们须要减小 mapper 数量,可是文件大小是固定的,那该怎么办呢?
能够经过mapred.min.split.size
设置每一个任务处理的文件的大小,这个大小只有在大于dfs_block_size
的时候才会生效
split_size=max(mapred.min.split.size, dfs_block_size)
split_num=total_size/split_size
compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
复制代码
这样就能够减小mapper数量了。
总结一下控制 mapper 个数的方法:
mapred.map.tasks
为一个较大的值maperd.min.split.size
为一个较大的值hive.input.format
合并小文件若是想要调整 mapper 个数,在调整以前,须要肯定处理的文件大概大小以及文件的存在形式(是大量小文件,仍是单个大文件),而后再设置合适的参数。
若是 reducer 数量过多,一个 reducer 会产生一个结数量果文件,这样就会生成不少小文件,那么若是这些结果文件会做为下一个 job 的输入,则会出现小文件须要进行合并的问题,并且启动和初始化 reducer 须要耗费和资源。
若是 reducer 数量过少,这样一个 reducer 就须要处理大量的数据,而且还有可能会出现数据倾斜的问题,使得整个查询耗时长。 默认状况下,hive 分配的 reducer 个数由下列参数决定:
hive.exec.reducers.bytes.per.reducer
(默认1G)hive.exec.reducers.max
(默认为999)reducer的计算公式为:
N = min(参数2, 总输入数据量/参数1)
复制代码
能够经过改变上述两个参数的值来控制reducer的数量。 也能够经过
set mapred.map.tasks=10;
复制代码
直接控制reducer个数,若是设置了该参数,上面两个参数就会忽略。
尽可能减小每一个阶段的数据量,对于分区表能用上分区字段的尽可能使用,同时只选择后面须要使用到的列,最大限度的减小参与 join 的数据量。
小表 join 大表的时应遵照小表 join 大表原则,缘由是 join 操做的 reduce 阶段,位于 join 左边的表内容会被加载进内存,将条目少的表放在左边,能够有效减小发生内存溢出的概率。join 中执行顺序是从左到右生成 Job,应该保证连续查询中的表的大小从左到右是依次增长的。
在 hive 中,当对 3 个或更多张表进行 join 时,若是 on 条件使用相同字段,那么它们会合并为一个 MapReduce Job,利用这种特性,能够将相同的 join on 的放入一个 job 来节省执行时间。
mapjoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操做,这样就不用进行 reduce 步骤,从而提升了速度。只有 join 操做才能启用 mapjoin。
set hive.auto.convert.join = true; -- 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入内存表的大小(字节)
set hive.mapjoin.maxsize=1000000; -- Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出
复制代码
尽可能避免一个SQL包含复杂的逻辑,可使用中间表来完成复杂的逻辑。
当两个分桶表 join 时,若是 join on的是分桶字段,小表的分桶数是大表的倍数时,能够启用 mapjoin 来提升效率。
set hive.optimize.bucketmapjoin = true; -- 启用桶表 map join
复制代码
默认状况下,Map阶段同一个Key的数据会分发到一个Reduce上,当一个Key的数据过大时会产生 数据倾斜。进行group by
操做时能够从如下两个方面进行优化:
1. Map端部分聚合
事实上并非全部的聚合操做都须要在 Reduce 部分进行,不少聚合操做均可以先在 Map 端进行部分聚合,而后在 Reduce 端的得出最终结果。
set hive.map.aggr=true; -- 开启Map端聚合参数设置
set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端进行聚合操做的条目数目
复制代码
2. 有数据倾斜时进行负载均衡
set hive.groupby.skewindata = true; -- 有数据倾斜的时候进行负载均衡(默认是false)
复制代码
当选项设定为 true 时,生成的查询计划有两个 MapReduce 任务。在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每一个 reduce 作部分聚合操做,并输出结果,这样处理的结果是相同的group by key
有可能分发到不一样的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处理的数据结果按照group by key
分布到各个 reduce 中,最后完成最终的聚合操做。
order by
只能是在一个reduce进程中进行,因此若是对一个大数据集进行order by
,会致使一个reduce进程中处理的数据至关大,形成查询执行缓慢。
order by
,不要在中间的大数据集上进行排序。若是最终结果较少,能够在一个reduce上进行排序时,那么就在最后的结果集上进行order by
。distribute by
和sort by
在各个reduce上进行排序后前N条,而后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,由于参与全局排序的order by
的数据量最可能是reduce个数 * N
,因此执行效率很高。-- 优化前(只有一个reduce,先去重再count负担比较大):
select count(distinct id) from tablename;
-- 优化后(启动两个job,一个job负责子查询(能够有多个reduce),另外一个job负责count(1)):
select count(1) from (select distinct id from tablename) tmp;
复制代码
有些场景是从一张表读取数据后,要屡次利用,这时可使用multi insert
语法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
select shop_name, customer_id, total_price where .....;
复制代码
说明:
- 通常状况下,单个SQL中最多能够写128路输出,超过128路,则报语法错误。
- 在一个multi insert中:
- 对于分区表,同一个目标分区不容许出现屡次。
- 对于未分区表,该表不能出现屡次。
- 对于同一张分区表的不一样分区,不能同时有
insert overwrite
和insert into
操做,不然报错返回。
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
复制代码
中间数据压缩就是对 hive 查询的多个 job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。能够采用snappy
压缩算法,该算法的压缩和解压效率都很是高。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
复制代码
最终的结果数据(Reducer输出数据)也是能够进行压缩的,能够选择一个压缩效果比较好的,能够减小数据的大小和数据的磁盘读写时间; 注:经常使用的gzip,snappy压缩算法是不支持并行处理的,若是数据源是gzip/snappy压缩文件大文件,这样只会有有个mapper来处理这个文件,会严重影响查询效率。 因此若是结果数据须要做为其余查询任务的数据源,能够选择支持splitable的LZO
算法,这样既能对结果文件进行压缩,还能够并行的处理,这样就能够大大的提升job执行的速度了。关于如何给Hadoop集群安装LZO压缩库能够查看这篇文章。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
复制代码
Hadoop集群支持一下算法:
Hive 从 HDFS 中读取数据,有两种方式:启用 MapReduce 读取、直接抓取。
直接抓取数据比 MapReduce 方式读取数据要快的多,可是只有少数操做可使用直接抓取方式。
能够经过hive.fetch.task.conversion
参数来配置在什么状况下采用直接抓取方式:
select *
、在分区字段上 where
过滤、有 limit
这三种场景下才启用直接抓取方式。select
、where
筛选、limit
时,都启用直接抓取方式。set hive.fetch.task.conversion=more; -- 启用fetch more模式
复制代码
Hive 在集群上查询时,默认是在集群上多台机器上运行,须要多个机器进行协调运行,这种方式很好的解决了大数据量的查询问题。可是在Hive查询处理的数据量比较小的时候,其实没有必要启动分布式模式去执行,由于以分布式方式执行设计到跨网络传输、多节点协调等,而且消耗资源。对于小数据集,能够经过本地模式,在单台机器上处理全部任务,执行时间明显被缩短。
set hive.exec.mode.local.auto=true; -- 打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto.input.files.max=4; -- map任务数最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map输入文件最大大小
复制代码
Hive 语句最终会转换为一系列的 MapReduce 任务,每个MapReduce 任务是由一系列的Map Task 和 Reduce Task 组成的,默认状况下,MapReduce 中一个 Map Task 或者 Reduce Task 就会启动一个 JVM 进程,一个 Task 执行完毕后,JVM进程就会退出。这样若是任务花费时间很短,又要屡次启动 JVM 的状况下,JVM的启动时间会变成一个比较大的消耗,这时,能够经过重用 JVM 来解决。
set mapred.job.reuse.jvm.num.tasks=5;
复制代码
JVM也是有缺点的,开启JVM重用会一直占用使用到的 task 的插槽,以便进行重用,直到任务完成后才会释放。若是某个
不平衡的job
中有几个 reduce task 执行的时间要比其余的 reduce task 消耗的时间要多得多的话,那么保留的插槽就会一直空闲却没法被其余的 job 使用,直到全部的 task 都结束了才会释放。
有的查询语句,hive会将其转化为一个或多个阶段,包括:MapReduce 阶段、抽样阶段、合并阶段、limit 阶段等。默认状况下,一次只执行一个阶段。可是,若是某些阶段不是互相依赖,是能够并行执行的。多阶段并行是比较耗系统资源的。
set hive.exec.parallel=true; -- 能够开启并发执行。
set hive.exec.parallel.thread.number=16; -- 同一个sql容许最大并行度,默认为8。
复制代码
在分布式集群环境下,由于程序Bug(包括Hadoop自己的bug),负载不均衡或者资源分布不均等缘由,会形成同一个做业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其余任务(好比一个做业的某个任务进度只有50%,而其余全部任务已经运行完毕),则这些任务会拖慢做业的总体执行进度。为了不这种状况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据必定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最早成功运行完成任务的计算结果做为最终结果。
set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
复制代码
建议:
若是用户对于运行时的误差很是敏感的话,那么能够将这些功能关闭掉。若是用户由于输入数据量很大而须要执行长时间的map或者Reduce task的话,那么启动推测执行形成的浪费是很是巨大大。
扩展阅读