深刻理解 Hive 分区分桶 (Inceptor)

分区是hive存放数据的一种方式。将列值做为目录来存放数据,就是一个分区。这样查询时使用分区列进行过滤,只需根据列值直接扫描对应目录下的数据,不扫描其余不关心的分区,快速定位,提升查询效率。分动态和静态分区两种:java

1. 静态分区:若分区的值是肯定的,那么称为静态分区。新增分区或者是加载分区数据时,已经指定分区名。node

create table if not exists day_part1(git

uid int,github

uname string数据库

)apache

partitioned by(year int,month int)缓存

row format delimited fields terminated by '\t'服务器

;网络

##加载数据指定分区app

load data local inpath '/root/Desktop/student.txt' into table day_part1 partition(year=2017,month=04);

##新增分区指定分区名

alter table day_part1 add partition(year=2017,month=1) partition(year=2016,month=12);

 1. 动态分区:分区的值是非肯定的,由输入数据来肯定

 2.1 动态分区的相关属性:

hive.exec.dynamic.partition=true :是否容许动态分区

hive.exec.dynamic.partition.mode=strict :分区模式设置

strict:最少须要有一个是静态分区

nostrict:能够所有是动态分区

hive.exec.max.dynamic.partitions=1000 :容许动态分区的最大数量

hive.exec.max.dynamic.partitions.pernode =100 :单个节点上的mapper/reducer容许建立的最大分区

 2.2 动态分区的操做

##建立临时表

create table if not exists tmp(

uid int,

commentid bigint,

recommentid bigint,

year int,

month int,

day int

)

row format delimited fields terminated by '\t';

##加载数据

load data local inpath '/root/Desktop/comm' into table tmp;

##建立动态分区表

create table if not exists dyp1(

uid int,

commentid bigint,

recommentid bigint

)

partitioned by(year int,month int,day int)

row format delimited fields terminated by '\t'

;

##严格模式

insert into table dyp1 partition(year=2016,month,day)

select uid,commentid,recommentid,month,day from tmp;

##非严格模式

##设置非严格模式动态分区

set hive.exec.dynamic.partition.mode=nostrict;

##建立动态分区表

create table if not exists dyp2(

uid int,

commentid bigint,

recommentid bigint

)

partitioned by(year int,month int,day int)

row format delimited fields terminated by '\t';

##为非严格模式动态分区加载数据

insert into table dyp2 partition(year,month,day)

select uid,commentid,recommentid,year,month,day from tmp;

 3.分区注意细节

(1)、尽可能不要是用动态分区,由于动态分区的时候,将会为每个分区分配reducer数量,当分区数量多的时候,reducer数量将会增长,对服务器是一种灾难。

(2)、动态分区和静态分区的区别,静态分区无论有没有数据都将会建立该分区,动态分区是有结果集将建立,不然不建立。

(3)、hive动态分区的严格模式和hive提供的hive.mapred.mode的严格模式。

hive提供咱们一个严格模式:为了阻止用户不当心提交恶意hql

hive.mapred.mode=nostrict : strict

若是该模式值为strict,将会阻止如下三种查询:

(1)、对分区表查询,where中过滤字段不是分区字段。

(2)、笛卡尔积join查询,join查询语句,不带on条件 或者 where条件。

(3)、对order by查询,有order by的查询不带limit语句。

hive中建立分区表没有什么复杂的分区类型(范围分区、列表分区、hash分区、混合分区等)。分区列也不是表中的一个实际的字段,而是一个或者多个伪列。意思是说在表的数据文件中实际上并不保存分区列的信息与数据。
下面的语句建立了一个简单的分区表:

create table partition_test
(member_id string,
name string
)
partitioned by (
stat_date string,
province string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

这个例子中建立了stat_date和province两个字段做为分区列。一般状况下须要先预先建立好分区,而后才能使用该分区,例如:

alter table partition_test add partition(stat_date='20110728',province='zhejiang');

这样就建立好了一个分区。这时咱们会看到hive在HDFS存储中建立了一个相应的文件夹:

$ hadoop fs -ls/user/hive/warehouse/partition_test/stat_date=20110728
Found 1 items
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728/province=zhejiang

每个分区都会有一个独立的文件夹,下面是该分区全部的数据文件。在这个例子中stat_date是主层次,province是副层次,全部stat_date='20110728',而province不一样的分区都会在/user/hive/warehouse/partition_test/stat_date=20110728下面,而stat_date不一样的分区都会在/user/hive/warehouse/partition_test/下面,如:

$ hadoop fs -ls /user/hive/warehouse/partition_test/
Found 2 items
drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728

注意,由于分区列的值要转化为文件夹的存储路径,因此若是分区列的值中包含特殊值,如 '%', ':', '/','#',它将会被使用%加上2字节的ASCII码进行转义,如:

hive> alter table partition_test add partition(stat_date='2011/07/28',province='zhejiang');
OK
Time taken: 4.644 seconds

$hadoop fs -ls /user/hive/warehouse/partition_test/
Found 3 items
drwxr-xr-x - admin supergroup 0 2011-07-29 10:06/user/hive/warehouse/partition_test/stat_date=2011% 2F07%2F28
drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728

我使用一个辅助的非分区表partition_test_input准备向partition_test中插入数据:

hive> desc partition_test_input;
OK
stat_date string
member_id string
name string
province string

hive> select * from partition_test_input;
OK
20110526 1 liujiannan liaoning
20110526 2 wangchaoqun hubei
20110728 3 xuhongxing sichuan
20110728 4 zhudaoyong henan
20110728 5 zhouchengyu heilongjiang

而后我向partition_test的分区中插入数据:

hive> insert overwrite table partition_testpartition(stat_date='20110728',province='henan') selectmember_id,name from partition_test_input where stat_date='20110728'and province='henan';
Total MapReduce jobs = 2
...
1 Rows loaded to partition_test
OK

还能够同时向多个分区插入数据,0.7版本之后不存在的分区会自动建立,0.6以前的版本官方文档上说必需要预先建立好分区:

hive>
> from partition_test_input
> insert overwrite table partition_test partition(stat_date='20110526',province='liaoning')
> select member_id,name where stat_date='20110526'and province='liaoning'
> insert overwrite table partition_test partition(stat_date='20110728',province='sichuan')
> select member_id,name where stat_date='20110728'and province='sichuan'
> insert overwrite table partition_test partition(stat_date='20110728',province='heilongjiang')
> select member_id,name where stat_date='20110728'and province='heilongjiang';
Total MapReduce jobs = 4
...
3 Rows loaded to partition_test
OK

特别要注意,在其余数据库中,通常向分区表中插入数据时系统会校验数据是否符合该分区,若是不符合会报错。而在hive中,向某个分区中插入什么样的数据彻底是由人来控制的,由于分区键是伪列,不实际存储在文件中,如:


hive> insert overwrite table partition_testpartition(stat_date='20110527',province='liaoning') selectmember_id,name from partition_test_input;
Total MapReduce jobs = 2
...
5 Rows loaded to partition_test
OK

hive> select * from partition_test wherestat_date='20110527' and province='liaoning';
OK
1 liujiannan 20110527 liaoning
2 wangchaoqun 20110527 liaoning
3 xuhongxing 20110527 liaoning
4 zhudaoyong 20110527 liaoning
5 zhouchengyu 20110527 liaoning

能够看到在partition_test_input中的5条数据有着不一样的stat_date和province,可是在插入到partition(stat_date='20110527',province='liaoning')这个分区后,5条数据的stat_date和province都变成相同的了,由于这两列的数据是根据文件夹的名字读取来的,而不是实际从数据文件中读取来的:

$ hadoop fs -cat/user/hive/warehouse/partition_test/stat_date=20110527/province=liaoning/000000_0
1,liujiannan
2,wangchaoqun
3,xuhongxing
4,zhudaoyong
5,zhouchengyu

下面介绍一下动态分区,由于按照上面的方法向分区表中插入数据,若是源数据量很大,那么针对一个分区就要写一个insert,很是麻烦。何况在以前的版本中,必须先手动建立好全部的分区后才能插入,这就更麻烦了,你必须先要知道源数据中都有什么样的数据才能建立分区。
使用动态分区能够很好的解决上述问题。动态分区能够根据查询获得的数据自动匹配到相应的分区中去。 


使用动态分区要先设置hive.exec.dynamic.partition参数值为true,默认值为false,即不容许使用:

hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=false
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=true

动态分区的使用方法很简单,假设我想向stat_date='20110728'这个分区下面插入数据,至于province插入到哪一个子分区下面让数据库本身来判断,那能够这样写:

hive> insert overwrite table partition_testpartition(stat_date='20110728',province)
> select member_id,name,province frompartition_test_input where stat_date='20110728';
Total MapReduce jobs = 2
...
3 Rows loaded to partition_test
OK

stat_date叫作静态分区列,province叫作动态分区列。select子句中须要把动态分区列按照分区的顺序写出来,静态分区列不用写出来。这样stat_date='20110728'的全部数据,会根据province的不一样分别插入到/user/hive/warehouse/partition_test/stat_date=20110728/下面的不一样的子文件夹下,若是源数据对应的province子分区不存在,则会自动建立,很是方便,并且避免了人工控制插入数据与分区的映射关系存在的潜在风险。

注意,动态分区不容许主分区采用动态列而副分区采用静态列,这样将致使全部的主分区都要建立副分区静态列所定义的分区:

hive> insert overwrite table partition_testpartition(stat_date,province='liaoning')
> select member_id,name,province frompartition_test_input where province='liaoning';
FAILED: Error in semantic analysis: Line 1:48 Dynamic partitioncannot be the parent of a static partition 'liaoning'

动态分区能够容许全部的分区列都是动态分区列,可是要首先设置一个参数hive.exec.dynamic.partition.mode:

hive> set hive.exec.dynamic.partition.mode;
hive.exec.dynamic.partition.mode=strict

它的默认值是strick,即不容许分区列所有是动态的,这是为了防止用户有可能原意是只在子分区内进行动态建分区,可是因为疏忽忘记为主分区列指定值了,这将致使一个dml语句在短期内建立大量的新的分区(对应大量新的文件夹),对系统性能带来影响。
因此咱们要设置:
hive> sethive.exec.dynamic.partition.mode=nostrick;

再介绍3个参数:
hive.exec.max.dynamic.partitions.pernode (缺省值100):每个mapreducejob容许建立的分区的最大数量,若是超过了这个数量就会报错
hive.exec.max.dynamic.partitions(缺省值1000):一个dml语句容许建立的全部分区的最大数量
hive.exec.max.created.files (缺省值100000):全部的mapreducejob容许建立的文件的最大数量

当源表数据量很大时,单独一个mapreducejob中生成的数据在分区列上可能很分散,举个简单的例子,好比下面的表要用3个map:
1
1
1
2
2
2
3
3
3

若是数据这样分布,那每一个mapreduce只须要建立1个分区就能够了: 
        |1
map1 --> |1 
        |1 

        |2
map2 --> |2 
        |2 

        |3
map3 --> |3 
        |3
可是若是数据按下面这样分布,那第一个mapreduce就要建立3个分区: 

        |1
map1 --> |2 
        |3 

        |1
map2 --> |2 
        |3 

        |1
map3 --> |2 
        |3

下面给出了一个报错的例子:
hive> sethive.exec.max.dynamic.partitions.pernode=4;
hive> insert overwrite table partition_testpartition(stat_date,province)
> select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province;
Total MapReduce jobs = 1
...
[Fatal Error] Operator FS_4 (id=4): Number of dynamic partitionsexceeded hive.exec.max.dynamic.partitions.pernode.. Killing thejob.
Ended Job = job_201107251641_0083 with errors
FAILED: Execution Error, return code 2 fromorg.apache.hadoop.hive.ql.exec.MapRedTask

为了让分区列的值相同的数据尽可能在同一个mapreduce中,这样每个mapreduce能够尽可能少的产生新的文件夹,能够借助distributeby的功能,将分区列值相同的数据放到一块儿:

hive> insert overwrite table partition_testpartition(stat_date,province)
> select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province;
Total MapReduce jobs = 1
...
18 Rows loaded to partition_test
OK

好了,关于hive的分区表先简单介绍到这里,后续版本若是有功能的更新我也会再更新。

 

为什么分区分桶
咱们知道传统的DBMS系统通常都具备表分区的功能,经过表分区可以在特定的区域检索数据,减小扫描成本,在必定程度上提升查询效率,固然咱们还能够经过进一步在分区上创建索引进一步提高查询效率。在此就不赘述了。

在Hive数仓中也有分区分桶的概念,在逻辑上分区表与未分区表没有区别,在物理上分区表会将数据按照分区键的列值存储在表目录的子目录中,目录名=“分区键=键值”。其中须要注意的是分区键的值不必定要基于表的某一列(字段),它能够指定任意值,只要查询的时候指定相应的分区键来查询便可。咱们能够对分区进行添加、删除、重命名、清空等操做。由于分区在特定的区域(子目录)下检索数据,它做用同DNMS分区同样,都是为了减小扫描成本。

分桶则是指定分桶表的某一列,让该列数据按照哈希取模的方式随机、均匀地分发到各个桶文件中。由于分桶操做须要根据某一列具体数据来进行哈希取模操做,故指定的分桶列必须基于表中的某一列(字段)。由于分桶改变了数据的存储方式,它会把哈希取模相同或者在某一区间的数据行放在同一个桶文件中。如此一来即可提升查询效率,如:咱们要对两张在同一列上进行了分桶操做的表进行JOIN操做的时候,只须要对保存相同列值的桶进行JOIN操做便可。同时分桶也能让取样(Sampling)更高效。

 

分区
Hive(Inceptor)分区又分为单值分区、范围分区。单值分区又分为静态分区和动态分区。咱们先看下分区长啥样。以下,假若有一张表名为persionrank表,记录每一个人的评级,有id、name、score字段。咱们即可以建立分区rank(注意rank不是表中的列,咱们能够把它当作虚拟列),并将相应数据导入指定分区(将数据插入指定目录)。

单值分区
单值分区根据插入时是否须要手动指定分区能够分为:单值静态分区:导入数据时须要手动指定分区。单值动态分区:导入数据时,系统能够动态判断目标分区。

单值分区表的建表方式有两种:直接定义列和 CREATE TABLE LIKE。注意,单值分区表不能用 CREATE
TABLE AS SELECT 建表。而范围分区表只能经过直接定义列来建表。

一、静态分区建立

直接在 PARTITIONED BY 后面跟上分区键、类型便可。(分区键不能和任何列重名)

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type> [, <col_name> <data_type> ...])
    -- 指定分区键和数据类型
    PARTITIONED BY  (<partition_key> <data_type>, ...) 
    [CLUSTERED BY ...] 
    [ROW FORMAT <row_format>] 
    [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
二、静态分区写入

-- 覆盖写入
INSERT OVERWRITE TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...]) 
    SELECT <select_statement>;
 
-- 追加写入
INSERT INTO TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...])
    SELECT <select_statement>;
 
 
三、动态分区建立

建立方式与静态分区表彻底同样,一张表可同时被静态和动态分区键分区,只是动态分区键须要放在静态分区建的后面(由于HDFS上的动态分区目录下不能包含静态分区的子目录),以下 spk 即 static partition key, dpk 即 dynamic partition key。

CREATE TABLE <table_name>
 PARTITIONED BY ([<spk> <data_type>, ... ,] <dpk> <data_type>, [<dpk>
<data_type>,...]);
-- ...略
四、动态分区写入

静态分区键要用 <spk>=<value> 指定分区值;动态分区只须要给出分出分区键名称 <dpk>。

-- 开启动态分区支持,并设置最大分区数
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions=2000;
 
-- <dpk>为动态分区键, <spk>为静态分区键
INSERT (OVERWRITE | INTO) TABLE <table_name>
    PARTITION ([<spk>=<value>, ..., ] <dpk>, [..., <dpk>]) 
    SELECT <select_statement>; 
范围分区
单值分区每一个分区对应于分区键的一个取值,而每一个范围分区则对应分区键的一个区间,只要落在指定区间内的记录都被存储在对应的分区下。分区范围须要手动指定,分区的范围为前闭后开区间 [最小值, 最大值)。最后出现的分区可使用 MAXVALUE 做为上限,MAXVALUE 表明该分区键的数据类型所容许的最大
值。

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type>, <col_name> <data_type>, ...)
    PARTITIONED BY RANGE (<partition_key> <data_type>, ...) 
        (PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>), 
            [PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>),
              ...
            ]
            PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>|MAXVALUE) 
        )
    [ROW FORMAT <row_format>] [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
eg:多个范围分区键的状况:

DROP TABLE IF EXISTS test_demo;
CREATE TABLE test_demo (value INT)
PARTITIONED BY RANGE (id1 INT, id2 INT, id3 INT)
(
-- id1在(--∞,5]之间,id2在(-∞,105]之间,id3在(-∞,205]之间
PARTITION p5_105_205 VALUES LESS THAN (5, 105, 205),
-- id1在(--∞,5]之间,id2在(-∞,105]之间,id3在(205,215]之间
PARTITION p5_105_215 VALUES LESS THAN (5, 105, 215),
PARTITION p5_115_max VALUES LESS THAN (5, 115, MAXVALUE),
PARTITION p10_115_205 VALUES LESS THAN (10, 115, 205),
PARTITION p10_115_215 VALUES LESS THAN (10, 115, 215),
PARTITION pall_max values less than (MAXVALUE, MAXVALUE, MAXVALUE)
);
 

分桶
说完分区,咱们来继续搞分桶。对Hive(Inceptor)表分桶能够将表中记录按分桶键的哈希值分散进多个文件中,这些小文件称为桶。

建立分桶表
咱们先看一下建立分桶表的建立,分桶表的建表有三种方式:直接建表,CREATE TABLE LIKE 和 CREATE TABLE AS SELECT ,单值分区表不能用 CREATETABLE AS SELECT 建表。这里以直接建表为例:

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type> [, <col_name> <data_type> ...])]
    [PARTITIONED BY ...] 
    CLUSTERED BY (<col_name>) 
        [SORTED BY (<col_name> [ASC|DESC] [, <col_name> [ASC|DESC]...])] 
        INTO <num_buckets> BUCKETS  
    [ROW FORMAT <row_format>] 
    [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
分桶键只能有一个即<col_name>。表能够同时分区和分桶,当表分区时,每一个分区下都会有<num_buckets> 个桶。咱们也能够选择使用 SORTED BY … 在桶内排序,排序键和分桶键无需相同。ASC 为升序选项,DESC 为降序选项,默认排序方式是升序。<num_buckets> 指定分桶个数,也就是表目录下小文件的个数。

向分桶表写入数据
由于分桶表在建立的时候只会定义Scheme,且写入数据的时候不会自动进行分桶、排序,须要人工先进行分桶、排序后再写入数据。确保目标表中的数据和它定义的分布一致。

目前有两种方式往分桶表中插入数据:

方法一:打开enforce bucketing开关。

SET hive.enforce.bucketing=true; ①
INSERT (INTO|OVERWRITE) TABLE <bucketed_table> SELECT <select_statement>
[SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; ②
方法二:将reducer个数设置为目标表的桶数,并在 SELECT 语句中用 DISTRIBUTE BY <bucket_key>对查询结果按目标表的分桶键分进reducer中。

SET mapred.reduce.tasks = <num_buckets>; 
INSERT (INTO|OVERWRITE) TABLE <bucketed_table>
SELECT <select_statement>
DISTRIBUTE BY <bucket_key>, [<bucket_key>, ...] 
[SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; 
若是分桶表建立时定义了排序键,那么数据不只要分桶,还要排序
若是分桶键和排序键不一样,且按降序排列,使用Distribute by … Sort by分桶排序
若是分桶键和排序键相同,且按升序排列(默认),使用 Cluster by 分桶排序,即以下:
SET mapred.reduce.tasks = <num_buckets>;
INSERT (INTO|OVERWRITE) TABLE <bucketed_table>
SELECT <select_statement>
CLUSTER BY <bucket_sort_key>, [<bucket_sort_key>, ...];
 

另外补充说明一下,在Hive(Inceptor)中,ORC事务表必须进行分桶(为了提升效率)。每一个桶的文件大小应在100~200MB之间(ORC表压缩后的数据)。一般作法是先分区后分桶。
--------------------- 

使用Hive SQL插入动态分区的Parquet表OOM异常分析

舒适提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github:https://github.com/fayson/cdhproject 提示:代码块部分能够左右滑动查看噢

1.异常描述


当运行“INSERT ... SELECT”语句向Parquet或者ORC格式的表中插入数据时,若是启用了动态分区,你可能会碰到如下错误,而致使做业没法正常执行。

Hive客户端:

 
  1. Task with the most failures(4):

  2. Diagnostic Messages for this Task:

  3. Error: GC overhead limit exceeded

  4. ...

  5. FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

  6. MapReduce Jobs Launched:

  7. Stage-Stage-1: Map: 1 HDFS Read: 0 HDFS Write: 0 FAIL

  8. Total MapReduce CPU Time Spent: 0 msec

(可左右滑动)

YARN的8088中查看具体map task报错:

2017-10-27 17:08:04,317 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded

(可左右滑动)

2.异常分析


Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件以前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每一个动态分区目录打开一个文件写入器(file writer)。因为这些缓冲区是按分区维护的,所以在运行时所需的内存量随着分区数量的增长而增长。因此常常会致使mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。

经过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。

若是没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的做业。mapper任务会读取输入记录而后将它们发送到目标分区目录。在这种状况下,每一个mapper必须为遇到的每一个动态分区建立一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增长而增长。

3.异常重现与解决

3.1.生成动态分区的几个参数说明


hive.exec.dynamic.partition

默认值:false

是否开启动态分区功能,默认false关闭。

使用动态分区时候,该参数必须设置成true;

hive.exec.dynamic.partition.mode

默认值:strict

动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示容许全部的分区字段均可以使用动态分区。

通常须要设置为nonstrict

hive.exec.max.dynamic.partitions.pernode

默认值:100

在每一个执行MR的节点上,最大能够建立多少个动态分区。

该参数须要根据实际的数据来设定。

好比:源数据中包含了一年的数据,即day字段有365个值,那么该参数就须要设置成大于365,若是使用默认值100,则会报错。

hive.exec.max.dynamic.partitions

默认值:1000

在全部执行MR的节点上,最大一共能够建立多少个动态分区。

同上参数解释。

hive.exec.max.created.files

默认值:100000

整个MR Job中,最大能够建立多少个HDFS文件。

通常默认值足够了,除非你的数据量很是大,须要建立的文件数大于100000,可根据实际状况加以调整。

mapreduce.map.memory.mb

map任务的物理内存分配值,常见设置为1GB,2GB,4GB等。

mapreduce.map.java.opts

map任务的Java堆栈大小设置,通常设置为小于等于上面那个值的75%,这样能够保证map任务有足够的堆栈外内存空间。

mapreduce.input.fileinputformat.split.maxsize

mapreduce.input.fileinputformat.split.minsize

这个两个参数联合起来用,主要是为了方便控制mapreduce的map数量。好比我设置为1073741824,就是为了让每一个map处理1GB的文件。

3.2.一个例子


Fayson在前两天给人调一个使用Hive SQL插入动态分区的Parquet表时,老是报错OOM,也是折腾了好久。如下咱们来看看整个过程。

1.首先咱们看看执行脚本的内容,基本其实就是使用Hive的insert语句将文本数据表插入到另一张parquet表中,固然使用了动态分区。

2.咱们看看原始数据文件,是文本文件,一共120个,每一个30GB大小,总共差很少3.6TB。

3.咱们看看报错

4.由于是一个只有map的mapreduce任务,当咱们从YARN的8088观察这个做业时能够发现,基本没有一个map可以执行成功,所有都是失败的。报上面的错误。

5.把mapreduce.map.memory.mb从2GB增大到4GB,8GB,16GB,相应mapreduce.map.java.opts增大到3GB,6GB,12GB。依旧报错OOM。

6.后面又将mapreduce.input.fileinputformat.split.maxsize从1GB,减小为512MB,256MB,从而增大map数量,缩小单个map处理文件的大小。依旧报错OOM。

7.最后启用hive.optimize.sort.dynamic.partition,增长reduce过程,做业执行成功。

8.最后查看结果文件大约1.2TB,约为输入文件的三分之一。一共1557个分区,最大的分区文件为2GB。

4.异常总结


对于这个异常,咱们建议有如下三种方式来处理:

1.启用hive.optimize.sort.dynamic.partition,将其设置为true。经过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段好比日期在传到reducer时会被排序。因为分区字段是排序的,所以每一个reducer只须要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的全部行后,关闭记录写入器(record writer),从而减少内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

 
  1. SET hive.optimize.sort.dynamic.partition=true;

  2. INSERT OVERWRITE TABLE [table] SELECT ...

2.第二种方式就是增长每一个mapper的内存分配,即增大mapreduce.map.memory.mb和mapreduce.map.java.opts,这样全部文件写入器(filewriter)缓冲区对应的内存会更充沛。

3.将查询分解为几个较小的查询,以减小每一个查询建立的分区数量。这样可让每一个mapper打开较少的文件写入器(file writer)。

备注:

默认状况下,Hive为每一个打开的Parquet文件缓冲区(file buffer)分配128MB。这个buffer大小由参数parquet.block.size控制。为得到最佳性能,parquet的buffer size须要与HDFS的block size保持对齐(好比相等),从而使每一个parquet文件在单个HDFS的块中,以便每一个I/O请求均可以读取整个数据文件,而无需经过网络传输访问后续的block。

 
  1. -- set Parquetbuffer size to 256MB (in bytes)

  2. set parquet.block.size=268435456

相关文章
相关标签/搜索