hive.apache.orgjava
Hive是什么?node
Hive是Facebook开源的用于解决海量结构化日志的数据统计,是基于Hadoop的一个数据仓库工具,能够将结构化的数据文件映射为一张表,而且提供类SQL查询功能,本质是将HQL转化成MapReduce程序。mysql
数据存储在HDFS,分析数据底层实现默认是MapReduce,执行程序运行在Yarn上。sql
若是没有Hiveshell
想象一下数据统计的时候写大量的MapReduce程序,那会是多么痛苦。若是是写SQL就开心多了,尤为是离线数据仓库方面普遍应用。数据库
因为 Hive 采用了相似SQL 的查询语言 HQL(hive query language),所以很容易将 Hive 理解为数据库。其实从结构上来看,Hive 和数据库除了拥有相似的查询语言,再无相似之处。express
Hive经过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用本身的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。apache
mysql字段类型 | hive:ods字段类型 | hive:dwd字段类型 |
---|---|---|
tinyint | tinyint | tinyint |
int | int | int |
bigint | bigint | bigint |
varchar | string | string |
datetime | bigint | string |
bit | boolean | int |
double | double | double |
hiveserver2
beeline
!connect jdbc:hive2://cdh01.cm:10000
CREATE DATABASE 数据库名字
show databases
drop database 数据库名字 drop database 数据库名字 cascade
desc database 数据库名字 desc database extended 数据库名字
use 数据库名字
show tables
desc formatted 表名
建表语法浏览器
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION hdfs_path]
建表经常使用参数安全
EXTERNAL:外部表关键字,Hive份内部表(元数据和数据会被一块儿删除,通常不过重要的表如:中间临时表)和外部表(只删除元数据,不删除数据,通常而言都采用外部表,由于数据安全些),通常而言采用外部表。
PARTITIONED BY:分区指定字段为'dt'。
PARTITIONED BY ( `dt` String COMMENT 'partition' )
row format delimited fields terminated by '\t' :数据使用什么作切分,这里使用制表符。
stored as parquet:文件存储格式,推荐parquet(spark自然支持parquet)。
location '/warehouse/层名/库名/表名' :文件存储位置。
tblproperties ("parquet.compression"="snappy") :文件压缩策略,推荐snappy。
CLUSTERED BY建立分桶表(抽样场景使用)
SORTED BY排序(通常状况查询语句都有排序,故不经常使用)
建表例子
#删除表 drop table if exists dwd.student #建立表(库名.表名) CREATE EXTERNAL TABLE `dwd.student`( `ID` bigint COMMENT '', `CreatedBy` string COMMENT '建立人', `CreatedTime` string COMMENT '建立时间', `UpdatedBy` string COMMENT '更新人', `UpdatedTime` string COMMENT '更新时间', `Version` int COMMENT '版本号', `name` string COMMENT '姓名' ) COMMENT '学生表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' location '/warehouse/dwd/test/student/' stored as parquet tblproperties ("parquet.compression"="snappy")
建立内部表就是去掉EXTERNAL关键字,使用插入数据案例测试一下删除表后从新创建的效果吧。
INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi")
SELECT * FROM dwd.student SELECT * FROM dwd.student WHERE dt="2020-04-05" SELECT * FROM dwd.student WHERE dt="2020-04-06"
LOAD DATA INPATH '/warehouse/dwd/test/student/dt=2020-04-05/000000_0' INTO TABLE dwd.student1 partition(dt='2020-04-05')
TRUNCATE TABLE dwd.student1
insert into table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
因为student1表2020-04-05分区中已经有一条张三数据,这时插入李四数据,查询结果为2条数据
insert overwrite table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
由上面的例子使student1表2020-04-05分区中有2条数据,这时候在插入一条,发现只有一条,由于数据被复写。
with s1 as ( select id,createdby,createdtime,version,name from dwd.student where dt="2020-04-05" ), s2 as ( select id,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06" ) insert overwrite table dwd.student1 partition(dt='2020-04-06') select s1.id as id, s1.createdby as createdby, s1.createdtime as createdtime, "" as updatedby, "0" as updatedtime, s1.version as version, s1.name as name from s1 union all select s2.id as id, "" as createdby, "0" as createdtime, s2.updatedby as updatedby, s2.updatedtime as updatedtime, s2.version as version, s2.name as name from s2
hive -e “select * from xx”:hive命令行SQL语句
hive -f xx.hql:hive执行sql文件
CLUSTERED BY(字段名) into 分桶数 buckets
指定分桶条件。set hive.enforce.bucketing=true; set mapreduce.job.reduces=-1;
y必须是table总bucket数的倍数或者因子,根据Y值决定抽样的比例,如:table共4个桶,当y=2时,抽取4/2=2个桶数据,等y=8时,抽取4/8=1/2的bucket数据。
x是从哪一个桶开始抽,而后依次抽取x+y的桶数据,如:table共4个桶,当y=2,x=1时,抽取4/2=2个桶数据,桶数为1号,1+2=3号。
#建表sql CREATE EXTERNAL TABLE `dwd.student_buck`( `ID` bigint COMMENT '', `name` string COMMENT '姓名' ) COMMENT '学生表' CLUSTERED BY(ID) into 4 buckets row format delimited fields terminated by '\t' stored as parquet location '/warehouse/dwd/test/student_buck/' tblproperties ("parquet.compression"="snappy") #插入数据 INSERT INTO TABLE dwd.student_buck VALUES(1,"zhangsan"),(2,"lisi") ,(3,"wangwu") ,(4,"zhaoliu") ,(5,"haqi") ,(6,"xiba") ,(7,"heijiu") ,(8,"washi") #抽样查询 SELECT * from dwd.student_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id)
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
Order By 全局排序(全局一个Reducer)
Group By 分组(会根据跟的字段,发到不一样的Reducer中)
Distribute By 相似MR中的partition进行分区,需结合Sort By使用,并且要写在前面,如:根据日期分区,在根据年龄排序
#多reduce才有效果 select * from student distribute by createtime sort by age descSort By每一个Reducer内部排序,对全局结果来讲不是排序,结合Distribute By使用。(多Reducer区内排序)
Cluster By 当分区字段和区内排序字段相同,也就是distribute by和sort by须要字段相同可使用其代替,可是只能升序排列。
NVL(string,替换值),若是string为NULL,则返回替换值,不然返回string值
# 1 格式化时间-> 2020-04-05 00:00:00 SELECT date_format("2020-04-05","yyyy-MM-dd HH:mm:ss") # 2 时间天数相加-> 2020-04-05 ->2020-04-01 SELECT date_add("2020-04-05",4) SELECT date_add("2020-04-05",-4) # 3 时间天数相减->2020-04-01(通常直接用date_add了,这个不怎么用) SELECT date_sub("2020-04-05",4) # 4 两个时间相减-> 4 SELECT datediff("2020-04-09","2020-04-05") # 5 时间转时间戳-> 1586016000 SELECT unix_timestamp("2020-04-05","yyyy-MM-dd") # 6 时间戳转时间-> 2020-04-05 SELECT from_unixtime(cast("1586016000" as BIGINT),"yyyy-MM-dd") # 7 将日期转为星期-> 1 (星期一) SELECT pmod(datediff("2020-04-06","1920-01-01")-3,7)
财务部 | 男 |
---|---|
财务部 | 男 |
财务部 | 女 |
科技部 | 女 |
人事部 | 女 |
人事部 | 男 |
如上图假设表 emp 中有6我的,求每一个部门下面的男女个数。
#使用CASE WHEN select department, sum(case sex when '男' then 1 else 0 end) man_count, sum(case sex when '女' then 1 else 0 end) woman_count, from emp group by department #还可使用IF select department, sum(if(sex='男',1,0)) man_count, sum(if(sex='女',1,0)) woman_count, from emp group by department
# 1 concat拼接-> 2020-04-05 SELECT concat("2020","-","04","-","05") # 2 concat_ws拼接-> 2020-04-05 注意 concat_ws能够传collect_set,须要字段为string SELECT concat_ws("-","2020","04","05") # 3 汇总成Array类型字段-> [8,4,5,1,6,2,7,3] SELECT collect_set(id) from dwd.student_buck
EXPLODE(字段名):将一列复杂的array或者map结构拆分红多行
炸裂函数必须配合侧写视图函数:LATERAL VIEW udtf(expression) 侧写视图别名 AS 侧写结果列别名
# 建表 CREATE EXTERNAL TABLE `dwd.hobby`( `id` bigint COMMENT '', `name` string COMMENT '姓名', `hobby_name` array<string> COMMENT '爱好名字' ) COMMENT '爱好表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' collection items terminated by ',' stored as parquet location '/warehouse/dwd/test/hobby/' tblproperties ("parquet.compression"="snappy") # 插入3条数据 INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 1,"zhangsan",array("篮球","足球","羽毛球","恶做剧") INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 2,"lisi",array("足球","恶做剧") INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 3,"wangwu",array("羽毛球","恶做剧") # 使用侧写视图将炸裂结果做为临时视图,聚合须要的结果 select name,hobby_name_col from dwd.hobby lateral view explode(hobby_name) hobby_tmp as hobby_name_col
主要解决1行和n行数据没法聚合在一块儿展现的问题。
#建表 CREATE EXTERNAL TABLE `dwd.order`( `name` string COMMENT '姓名', `order_date` string COMMENT '购买日期', `price` bigint COMMENT '消费金额' ) COMMENT '订单表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' stored as parquet location '/warehouse/dwd/test/order/' tblproperties ("parquet.compression"="snappy") #插入数据 INSERT INTO TABLE `dwd.order` partition(dt='2020-04-05') VALUES("zhangsan","2020-01-01",15),("lisi","2020-01-02",22),("zhangsan","2020-04-01",34),("lisi","2020-04-01",15),("zhangsan","2020-04-04",42),("zhangsan","2020-03-01",24),("lisi","2020-02-01",65),("wangwu","2020-04-01",33),("zhangsan","2020-05-01",43),("zhangsan","2020-07-01",12),("wangwu","2020-02-05",32),("zhangsan","2020-03-06",22),("lisi","2020-04-07",14)
SELECT name,count(*) OVER() FROM dwd.`order` WHERE order_date BETWEEN "2020-04-01" AND "2020-04-30" GROUP BY name
SELECT *,sum(price) OVER() FROM dwd.`order`
SELECT *,sum(price) OVER(ORDER BY order_date) FROM dwd.`order` SELECT *,sum(price) OVER(ORDER BY order_date DESC) FROM dwd.`order`
over函数传参,仍是给全部结果集进行开窗,可是根据参数限定窗口大小,上面sql的意思为:
1号zhangsan数据窗口内只含有 1号张三
2号lisi数据窗口内含有 1号张三,2号李四
3号lisi数据窗口内含有 1号张三,2号李四,3号李四
依次类推
SELECT *,sum(price) OVER(distribute by name) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name rows between UNBOUNDED PRECEDING and CURRENT ROW) FROM dwd.`order`
SELECT *,lag(order_date,1,"1970-01-01") OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,ntile(5) OVER() FROM dwd.`order`
SELECT * FROM (SELECT *,ntile(5) OVER(ORDER BY order_date) ntile_5 FROM dwd.`order`) t1 WHERE t1.ntile_5=1
SELECT *,row_number() OVER(order by order_date) FROM dwd.`order`
SELECT *,rank() OVER(order by order_date) FROM dwd.`order`
SELECT *,dense_rank() OVER(order by order_date) FROM dwd.`order`
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
必须有返回值,能够返回null,可是类型不能是void。
须要继承UDF类,而且方法名为evaluate
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1</version> </dependency>
import org.apache.hadoop.hive.ql.exec.UDF; //模拟concat public class MyUDF extends UDF { public String evaluate(String... args) { StringBuilder s = new StringBuilder(); for (String arg : args) { s.append(arg); } return s.toString(); } }
# 1 添加jar包,注意服务器本地路径(hive客户端关闭则消失,须要从新添加,若是不想从新添加,能够直接使用4将jar包放入hive/lib下) add jar /root/test/function-1.0-SNAPSHOT.jar # 2 添加函数,注意hive中的函数别名,还有jar包全类名,以下为永久建立和临时建立temporary(hive客户端关闭则消失) create function myconcat as "com.hive.function.udf.MyUDF" create temporary function myconcat as "com.hive.function.udf.MyUDF" # 3 加载jar包的第二种方式,上传jar包至hdfs集群 # create temporary function myconcat as "com.hive.function.udf.MyUDF" using jar 'hdfs:///hive_jar/function-1.0-SNAPSHOT.jar'; # 4 加载jar包的第三种方式,直接放在hive的lib目录下,启动hive,使用2添加函数便可。下面是cdh环境的jars包路径,配置软链接到hive/lib下 # ln -s /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/function-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive/lib # 5 删除函数 # drop function myconcat
SELECT myconcat("a","-","b","-","c")
用户自定义UDAF必须继承UDAF,必须提供一个实现了UDAFEvaluator接口的内部类
import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; //模拟avg public class MyUDAF extends UDAF { public static class AvgState { private long mCount; private double mSum; } public static class AvgEvaluator implements UDAFEvaluator { AvgState state; public AvgEvaluator() { super(); state = new AvgState(); init(); } /** * init函数相似于构造函数,用于UDAF的初始化 */ public void init() { state.mSum = 0; state.mCount = 0; } /** * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean * * @param o * @return */ public boolean iterate(Double o) { if (o != null) { state.mSum += o; state.mCount++; } return true; } /** * terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据, * terminatePartial相似于hadoop的Combiner * * @return */ public AvgState terminatePartial() { // combiner return state.mCount == 0 ? null : state; } /** * merge接收terminatePartial的返回结果,进行数据merge操做,其返回类型为boolean * * @param o * @return */ public boolean merge(AvgState avgState) { if (avgState != null) { state.mCount += avgState.mCount; state.mSum += avgState.mSum; } return true; } /** * terminate返回最终的汇集函数结果 * * @return */ public Double terminate() { return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount); } } }
用户自定义UDAF必须继承GenericUDTF,重写initialize(),process(),close()方法。
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; //模拟EXPLODE和split public class MyUDTF extends GenericUDTF { private List<String> dataList = new ArrayList<>(); //初始化方法,返回对象结构校验器 @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //列名,会被用户传递的覆盖 List<String> fieldNames = new ArrayList<>(); fieldNames.add("word1"); //返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数。 List<ObjectInspector> fieldOIs = new ArrayList<>(); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] objects) throws HiveException { //获取数据 String data = objects[0].toString(); //获取分隔符 String splitKey = objects[1].toString(); //切分数据 String[] words = data.split(splitKey); //遍历写出 for (String word : words) { //将数据放入集合 dataList.clear(); dataList.add(word); //写出数据到缓冲区 forward(dataList); } } @Override public void close() throws HiveException { //没有流操做 } }
hive.execution.engine配置为spark
Fetch抓取修改成more,可使全局查找,字段查找,limit查找等都不走计算引擎,而是直接读取表对应储存目录下的文件,大大普通查询速度。
hive.fetch.task.conversion配置为more
hive能够经过本地模式在单台机器上处理全部的任务,对于小的数据集,执行时间能够明显被缩短。
hive-site.xml调整下面3个参数,开启本地模式,文件不超过50M,个数不超过10个。
hive.exec.mode.local.auto=true
hive.exec.mode.local.auto.inputbytes.max=50000000 (50M左右,默认128M->134217728,机器资源足建议使用默认值)
hive.exec.mode.local.auto.input.files.max=10 (模式4个)
<property><name>hive.exec.mode.local.auto</name><value>true</value></property> <property><name>hive.exec.mode.local.auto.inputbytes.max</name><value>50000000</value></property> <property><name>hive.exec.mode.local.auto.input.files.max</name><value>10</value></property>
在join问题上,让小表放在左边 去左连接(left join)大表,这样能够有效的减小内存溢出错误发生的概率。
hive.auto.convert.join开启。(默认开启)
hive.mapjoin.smalltable.filesize(默认25000000->接近24M,若是机器内存足能够适当调大,需在hive-site.xml中设置,如7.3)
大表和大表join,空key会打到同一个reduce上,形成数据倾斜,任务缓慢,内存泄漏。(reduce任务某个很是慢,其余很快,及发生数据倾斜)
- 空key过滤:使用子查询过滤掉空key,能够有效的提高查询速率。
- 空key转换:附随机值,使其能够随机均匀的分布在不一样的reduce上,使用case when then或if判断null值,赋予rand()函数。
默认状况下map阶段同一个key发送给一个reduce,当一个key数据过大时就发生数据倾斜。
那么把某些聚合操做提到Map端进行部分聚合,最后在reduce端得出最终结果,也能够有效的提高执行效率。
hive.map.aggr开启。(默认开启)
hive.groupby.mapaggr.checkinterval=100000(默认100000条,在map端进行聚合操做的条目数目,需在hive-site.xml中设置,如7.3)
hive.groupby.skewindata=true(默认false,有数据倾斜时进行负载均衡,需在hive-site.xml中设置,如7.3)
hive.groupby.skewindata当选项设置为true时,生成的查询计划会有两个MR Job,第一个MR Job会将key加随机数均匀的分布到Reduce中,作部分聚合操做(预处理),第二个MR Job在根据预处理结果还原原始key,按照Group By Key分布到Reduce中进行聚合运算,完成最终操做。
数据量小的时候不要紧,大数据量下,因为Count Distinct操做须要用一个Reduce任务来完成,这一个Reduce须要处理的数据量太大,会致使Job缓慢,可使用子查询Group By再Count的方式替换。
列处理:不使用Select *,使用什么字段就写什么字段,就算是全部字段,也要一一列出,养成好习惯。
行处理:在join操做中,不要直接关联表后使用where条件,这样会使全表关联后在过滤,使用子查询过滤后在join来替换,使查询效率提升。以下
#错误写法 select id from student s left join class c on s.cid=c.id where c.id<=10 #正确写法 select id from student s left join (select id from class where id<=10 ) c on s.cid=c.id
当数据会根据状况变化的时候,先有数据,再想分区的状况。
设计表时尽可能避免动态分区,速度比静态分区(也就是直接指定分区慢不少)。
开启动态分区参数
必要参数
相关参数
建表方式同静态分区,插入方式要注意,查询结果最后一个字段即为分区字段(无论名字是否同样,会将最后一个字段的值,直接拿来分区)
#错误写法:会将name值直接插入dt分区字段。 insert into table dwd.student1 partition(dt) select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student #正确写法 insert into table dwd.student1 partition(dt) select id,createdby,createdtime,updatedby,updatedtime,version,name,dt from dwd.student
hive.exec.parallel=true;默认false。需在hive-site.xml中设置,如7.3
hive.exec.parallel.thread.number=16;默认8,同一个sql容许的最大并行度,针对集群资源适当增长。需在hive-site.xml中设置,如7.3
在执行的查询sql前加上Explain指令,查询分析sql执行过程。