数据仓库之Hive快速入门 - 离线&实时数仓架构

数据仓库VS数据库

数据仓库的定义:java

  • 数据仓库是将多个数据源的数据通过ETL(Extract(抽取)、Transform(转换)、Load(加载))理以后,按照必定的主题集成起来提供决策支持和联机分析应用的结构化数据环境

数据仓库VS数据库:python

  • 数据库是面向事务的设计,数据仓库是面向主题设计的
  • 数据库通常存储在线交易数据,数据仓库存储的通常是历史数据
  • 数据库设计是避免冗余,采用三范式的规则来设计,数据仓库在设计是有意引入冗余,采用反范式的方式来设计

OLTP VS OLAP:算法

  • 联机事务处理OLTP是传统的关系型数据库的主要应用,主要是基本的、平常的事务处理,例如银行交易
  • 联机分析处理OLAP是数据仓库系统的主要应用,支持复杂的分析操做,侧重决策支持,而且提供直观易懂的查询结果

常规的数仓架构:
数据仓库之Hive快速入门 - 离线&实时数仓架构sql

为何建设数据仓库:数据库

  • 各个业务数据存在不一致,数据关系混乱
  • 业务系统通常针对于OLTP,而数据仓库能够实现OLAP分析
  • 数据仓库是多源的复杂环境,能够对多个业务的数据进行统一分析

数据仓库建设目标:apache

  • 集成多源数据,数据来源和去向可追溯,梳理血缘关系
  • 减小重复开发,保存通用型中间数据,避免重复计算
  • 屏蔽底层业务逻辑,对外提供一致的、 结构清晰的数据

如何实现:编程

  • 实现通用型数据ETL工具
  • 根据业务创建合理的数据分层模型

数据仓库分层建设

数仓建设背景:json

  • 数据建设刚起步,大部分数据通过粗暴的数据接入后直接对接业务
  • 数据建设发展到必定阶段,发现数据的使用杂乱无章,各类业务都是从原始数据直接计算而得。
  • 各类重复计算,严重浪费了计算资源,须要优化性能

为何进行数仓分层:bash

  • 清晰数据结构:每一个数据分层都有对应的做用域
  • 数据血缘追踪:对各层之间的数据表转换进行跟踪,创建血缘关系
  • 减小重复开发:规范数据分层,开发通用的中间层数据
  • 屏蔽原始数据的异常:经过数据分层管控数据质量
  • 屏蔽业务的影响:没必要改一次业务就须要从新接入数据
  • 复杂问题简单化:将复杂的数仓架构分解成多个数据层来完成

常见的分层含义:
数据仓库之Hive快速入门 - 离线&实时数仓架构服务器

STG层

  • 原始数据层:存储原始数据,数据结构与采集数据一致
  • 存储周期:保存所有数据
  • 表命名规范:stg_主题_表内容_分表规则

ODS层

  • 数据操做层:对STG层数据进行初步处理,如去除脏数据,去除无用字段.
  • 存储周期:默认保留近30天数据
  • 表命名规范:ods_主题_表内容_分表规则

DWD层

  • 数据明细层:数据处理后的宽表,目标为知足80%的业务需求
  • 存储周期:保留历史至今全部的数据
  • 表命名规范:dwd_业务描述时间粒度

DWS层

  • 数据汇总层:汇总数据,解决数据汇总计算和数据完整度问题
  • 存储周期:保留历史至今全部的数据
  • 表命名规范:dws_业务描述_时间粒度_sum

DIM层

  • 公共维度层:存储公共的信息数据,用于DWD、DWS的数据关联
  • 存储周期:按需存储,通常保留历史至今全部的数据
  • 表命名规范:dim_维度描述

DM层

  • 数据集市层:用于BI、多维分析、标签、数据挖掘等
  • 存储周期:按需存储,--般保留历史至今全部的数据
  • 表命名规范:dm_主题_表内容_分表规则

分层之间的数据流转:
数据仓库之Hive快速入门 - 离线&实时数仓架构


Hive是什么

Hive简介:

  • Hive是基于Hadoop的数据仓库工具,提供类SQL语法(HiveQL)
  • 默认以MR做为计算引擎(也支持其余计算引擎,例如tez)、HDFS 做为存储系统,提供超大数据集的计算/扩展能力
  • Hive是将数据映射成数据库和一张张的表,库和表的元数据信息通常存在关系型数据库

Hive的简单架构图:
数据仓库之Hive快速入门 - 离线&实时数仓架构

Hive VS Hadoop:

  • Hive数据存储:Hive的数据是存储在HDFS.上的,Hive的库和表是对HDFS.上数据的映射
  • Hive元数据存储:元数据存储通常在外部关系库( Mysql )与Presto Impala等共享
  • Hive语句的执行过程:将HQL转换为MapReduce任务运行

Hive与关系数据库Mysql的区别

产品定位

Hive是数据仓库,为海量数据的离线分析设计的,不支持OLTP(联机事务处理所需的关键功能ACID,而更接近于OLAP(联机分析技术)),适给离线处理大数据集。而MySQL是关系型数据库,是为实时业务设计的。

可扩展性

Hive中的数据存储在HDFS(Hadoop的分布式文件系统),metastore元数据一 般存储在独立的关系型数据库中,而MySQL则是服务器本地的文件系统。所以Hive具备良好的可扩展性,数据库因为ACID语义的严格限制,扩展性十分有限。

读写模式

Hive为读时模式,数据的验证则是在查询时进行的,这有利于大数据集的导入,读时模式使数据的加载很是迅速,数据的加载仅是文件复制或移动。MySQL为写时模式,数据在写入数据库时对照模式检查。写时模式有利于提高查询性能,由于数据库能够对列进行索引。

数据更新

Hive是针对数据仓库应用设计的,而数仓的内容是读多写少的,Hive中不支持对数据进行改写,全部数据都是在加载的时候肯定好的。而数据库中的数据一般是须要常常进行修改的。

索引

Hive支持索引,可是Hive的索引与关系型数据库中的索引并不相同,好比,Hive不支持主键或者外键。Hive提供了有限的索引功能,能够为-些字段创建索引,一张表的索引数据存储在另一张表中。因为数据的访问延迟较高,Hive不适合在线数据查询。数据库在少星的特定条件的数据访问中,索引能够提供较低的延迟。

计算模型

Hive默认使用的模型是MapReduce(也能够on spark、on tez),而MySQL使用的是本身设计的Executor计算模型

数据仓库之Hive快速入门 - 离线&实时数仓架构


Hive安装部署

参考:


Hive基本使用(上)Hive数据类型/分区/基础语法

Hive数据类型:

  • 基本数据类型:int、 float、 double、 string、 boolean、 bigint等
  • 复杂类型:array、map、 struct

Hive分区:

  • Hive将海量数据按某几个字段进行分区,查询时没必要加载所有数据
  • 分区对应到HDFS就是HDFS的目录.
  • 分区分为静态分区和动态分区两种

Hive经常使用基础语法:

  • USE DATABASE_NAME
  • CREATE DATABASE IF NOT EXISTS DB NAME
  • DESC DATABASE DB NAME
  • CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
  • SELECT * FROM TABLE NAME
  • ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME

写个Python脚本生成一些测试数据:

import json
import random
import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')

data = []
for item in name:
    scores = {key: random.randint(60, 100) for key in subject}
    data.append("|".join([uuid.uuid4().hex, item, ','.join(
        random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:
    f.write('\n'.join(data))

执行该脚本,生成测试数据文件:

[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root  745 11月  9 11:09 test.csv
[root@hadoop01 ~/py-script]#

咱们能够看一下生成的数据:

[root@hadoop01 ~/py-script]# cat test.csv 
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...
  • 数据以 | 符进行分割,前两个字段都是string类型,第三个字段是array类型,第四个字段是map类型

建立测试用的数据库:

0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000>

建立测试表:

CREATE TABLE test(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将本地数据加载到Hive中:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000>

查询数据:
数据仓库之Hive快速入门 - 离线&实时数仓架构

Hive将HQL转换为MapReduce的流程

了解了Hive中的SQL基本操做以后,咱们来看看Hive是如何将SQL转换为MapReduce任务的,整个转换过程分为六个阶段:

  1. Antr定义SQL的语法规则,完成SQL词法,语法解析,将SQL 转化为抽象语法树AST Tree
  2. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
  3. 遍历QueryBlock,翻译为执行操做树OperatorTree
  4. 逻辑层优化器进行OperatorTree变换,合并没必要要的ReduceSinkOperator,减小shufle数据量
  5. 遍历OperatorTree,翻译为MapReduce任务
  6. 物理层优化器进行MapReduce任务的变换,生成最终的执行计划

数据仓库之Hive快速入门 - 离线&实时数仓架构

与普通SQL同样,咱们能够经过在HQL前面加上explain关键字查看HQL的执行计划:

explain select * from test where id > 10 limit 1000

Hive会将这条语句解析成一个个的Operator,Operator就是Hive解析以后的最小单元,每一个Operator其实都是对应一个MapReduce任务。例如,上面这条语句被Hive解析后,就是由以下Operator组成:
数据仓库之Hive快速入门 - 离线&实时数仓架构

同时,Hive实现了优化器对这些Operator的顺序进行优化,帮助咱们提高查询效率。Hive中的优化器主要分为三类:

  • RBO(Rule-Based Optimizer):基于规则的优化器
  • CBO(Cost-Based Optimizer):基于代价的优化器,这是默认的优化器
  • 动态CBO:在执行计划生成的过程当中动态优化的方式

Hive基本使用(中)内部表/外部表/分区表/分桶表

内部表:

和传统数据库的Table概念相似,对应HDFS上存储目录,删除表时,删除元数据和表数据。内部表的数据,会存放在HDFS中的特定的位置中,能够经过配置文件指定。当删除表时,数据文件也会一并删除。适用于临时建立的中间表。

外部表:

指向已经存在的HDFS数据,删除时只删除元数据信息。适用于想要在Hive以外使用表的数据的状况,当你删除External Table时,只是删除了表的元数据,它的数据并无被删除。适用于数据多部门共享。建表时使用create external table。指定external关键字便可。

分区表:

Partition对应普通数据库对Partition列的密集索引,将数据按照Partition列存储到不一样目录,便于并行分析,减小数据量。分区表建立表的时候须要指定分区字段。

分区字段与普通字段的区别:分区字段会在HDFS表目录下生成一个分区字段名称的目录,而普通字段则不会,查询的时候能够当成普通字段来使用,通常不直接和业务直接相关。

分桶表:

对数据进行hash,放到不一样文件存储,方便抽样和join查询。能够将内部表,外部表和分区表进一步组织成桶表,能够将表的列经过Hash算法进一步分解成不一样的文件存储。

对于内部表和外部表的概念和应用场景咱们很容易理解,咱们须要重点关注一下分区表和分桶表。 咱们为何要创建分区表和分桶表呢?HQL经过where子句来限制条件提取数据,那么与其遍历一张大表,不如将这张大表拆分红多个小表,并经过合适的索引来扫描表中的一小部分,分区和分桶都是采用了这种理念。

分区会建立物理目录,而且能够具备子目录(一般会按照时间、地区分区),目录名以 分区名=值 形式命名,例如:create_time=202011。分区名会做为表中的伪列,这样经过where字句中加入分区的限制能够在仅扫描对应子目录下的数据。经过 partitioned by (feld1 type, ...) 建立分区列。

分桶能够继续在分区的基础上再划分小表,分桶根据哈希值来肯定数据的分布(即MapReducer中的分区),好比分区下的一部分数据能够根据分桶再分为多个桶,这样在查询时先计算对应列的哈希值并计算桶号,只须要扫描对应桶中的数据便可。分桶经过clustered by(field) into n buckets建立。

接下来简单演示下这几种表的操做,首先将上一小节生成的测试数据文件上传到hdfs中:

[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]#

内部表

建表SQL:

CREATE TABLE test_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将hdfs数据加载到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000>

查看建立的表存储在hdfs的哪一个目录下:

0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_table`(                         |
|   `user_id` string,                                |
|   `user_name` string,                              |
|   `hobby` array<string>,                           |
|   `scores` map<string,int>)                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'line.delim'='\n',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1604893190')            |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000>

在hdfs中能够查看到数据文件:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]#

删除表:

0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000>

查看hdfs会发现该表所对应的存储目录也一并被删除了:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x   - root supergroup          0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]#

外部表

建表SQL,与内部表的区别就在于external关键字:

CREATE external TABLE external_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将数据文件加载到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000>

此时会发现hdfs中的数据文件会被移动到hive的目录下:

[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#

删除表:

0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000>

查看hdfs会发现该表所对应的存储目录仍然存在:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#

分区表

建表语句:

CREATE TABLE partition_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将数据文件加载到Hive中,并指定分区:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000>

执行以下sql,能够从不一样的分区统计结果:

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0  |
+------+
| 16   |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000>

分区表在hdfs中的存储结构:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x   - root supergroup          0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]#

分桶表

建表语句:

CREATE TABLE bucket_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

test表中的数据插入到bucket_table中:

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000>

抽样查询:
数据仓库之Hive快速入门 - 离线&实时数仓架构

分桶表在hdfs的存储目录以下:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r--   1 root supergroup        465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r--   1 root supergroup        281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]#

Hive基本使用(下)内置函数/自定义函数/实现UDF

Hive常见内置函数:

  • 字符串类型:concat、substr、 upper、 lower
  • 时间类型:year、month、 day
  • 复杂类型:size、 get_json_object

查询引擎都自带了一部分函数来帮助咱们解决查询过程中一些复杂的数据计算或者数据转换操做,可是有时候自带的函数功能不能知足业务的须要。这时候就须要咱们本身开发自定义的函数来辅助完成了,这就是所谓的用户自定义函数UDF(User-Defined Functions)。Hive支持三类自定义函数:

  • UDF:普通的用户自定义函数。用来处理输入一行,输出一行的操做,相似Map操做。如转换字符串大小写,获取字符串长度等
  • UDAF:用户自定义聚合函数(User-defined aggregate function),用来处理输入多行,输出一行的操做,相似Reduce操做。好比MAX、COUNT函数。
  • UDTF:用户自定义表产生函数(User defined table-generating function),用来处理输入一行,输出多行(即一个表)的操做, 不是特别经常使用

UDF函数其实就是一段遵循必定接口规范的程序。在执行过程当中Hive将SQL转换为MapReduce程序,在执行过程中在执行咱们的UDF函数。

本小节简单演示下自定义UDF函数,首先建立一个空的Maven项目,而后添加hive-exec依赖,版本与你安装的Hive版本需对应上。完整的pom文件内容以下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-udf-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

首先建立一个继承UDF的类,咱们实现的这个自定义函数功能就是简单的获取字段的长度:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

    public int evaluate(final Text col) {
        return col.getLength();
    }
}

以上这种自定义函数只能支持处理普通类型的数据,若是要对复杂类型的数据作处理则须要继承GenericUDF,并实现其抽象方法。例如,咱们实现一个对测试数据中的scores字段求平均值的函数:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

    /**
     * 函数的名称
     */
    private static final String FUNC_NAME = "AVG_SCORE";

    /**
     * 函数所做用的字段类型,这里是map类型
     */
    private transient MapObjectInspector mapOi;

    /**
     * 控制精度只返回两位小数
     */
    DecimalFormat df = new DecimalFormat("#.##");

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 在此方法中能够作一些前置的校验,例如检测函数参数个数、检测函数参数类型
        mapOi = (MapObjectInspector) objectInspectors[0];
        // 指定函数的输出类型
        return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 函数的核心逻辑,取出map中的value进行求平均值,并返回一个Double类型的结果值
        Object o = deferredObjects[0].get();
        double v = mapOi.getMap(o).values().stream()
                .mapToDouble(a -> Double.parseDouble(a.toString()))
                .average()
                .orElse(0.0);

        return Double.parseDouble(df.format(v));
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "func(map)";
    }
}

对项目进行打包,并上传到服务器中:

[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#

将jar包上传到hdfs中:

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r--   1 root supergroup       4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#

在Hive中添加该jar包:

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000>

而后注册临时函数,临时函数只会在当前的session中生效:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000>

使用自定义函数处理:

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name  | length  | avg_score  |
+------------+---------+------------+
| Tom        | 3       | 80.25      |
| Jerry      | 5       | 77.5       |
| Jim        | 3       | 83.75      |
| Angela     | 6       | 84.5       |
| Ann        | 3       | 90.0       |
| Bella      | 5       | 69.25      |
| Bonnie     | 6       | 76.5       |
| Caroline   | 8       | 84.5       |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000>

删除已注册的临时函数:

0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000>

临时函数只会在当前的session中生效,若是须要注册成永久函数则只须要把TEMPORARY关键字给去掉便可。以下所示:

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>

删除永久函数也是把TEMPORARY关键字给去掉便可。以下所示:

0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>

Hive存储结构 - OrcFile

Hive支持的存储格式:
数据仓库之Hive快速入门 - 离线&实时数仓架构

  • TextFile是默认的存储格式,经过简单的分隔符能够对csv等类型的文件进行解析。但实际应用中一般都是使用OrcFile格式,由于ORCFile是列式存储格式,更加适合大数据查询的场景。

咱们都知道关系型数据库基本是使用行式存储做为存储格式,而大数据领域更多的是采用列式存储,由于大数据分析场景中一般须要读取大量行,可是只须要少数的几个列。这也是为何一般使用OrcFile做为Hive的存储格式的缘由。因而可知,大数据的绝大部分应用场景都是OLAP场景。

OLAP场景的特色

读多于写

不一样于事务处理(OLTP)的场景,好比电商场景中加购物车、下单、支付等须要在原地进行大量insert、update、delete操做,数据分析(OLAP)场景一般是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制做等。

数据一次性写入后,分析师须要尝试从各个角度对数据作挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个须要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个特色作专门设计,而不是盲目采用传统数据库的技术架构。

大宽表,读大量行可是少许列,结果集较小

在OLAP场景中,一般存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列做为维度列、其余少数几列做为指标列,而后对全表或某一个较大范围内的数据作聚合计算。这个过程会扫描大量的行数据,可是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。

数据批量写入,且数据不更新或少更新

OLTP类业务对于延时(Latency)要求更高,要避免让客户等待形成业务损失;而OLAP类业务,因为数据量很是大,一般更加关注写入吞吐(Throughput),要求海量数据可以尽快导入完成。一旦导入完成,历史数据每每做为存档,不会再作更新、删除操做。

无需事务,数据一致性要求低

OLAP类业务对于事务需求较少,一般是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。

灵活多变,不适合预先建模

分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中一般存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然能够在特定场景中加速计算,可是没法知足业务灵活多变的发展需求,维护成本太高。

行式存储和列式存储

行式存储和列式存储的对比图:
数据仓库之Hive快速入门 - 离线&实时数仓架构

与行式存储将每一行的数据连续存储不一样,列式存储将每一列的数据连续存储。相比于行式存储,列式存储在分析场景下有着许多优良的特性:

  1. 如前所述,分析场景中每每须要读大量行可是少数几个列。在行存模式下,数据按行连续存储,全部列的数据都存储在一个block中,不参与计算的列在IO时也要所有读出,读取操做被严重放大。而列存模式下,只须要读取参与计算的列便可,极大的减低了IO cost,加速了查询。
  2. 同一列中的数据属于同一类型,压缩效果显著。列存每每有着高达十倍甚至更高的压缩比,节省了大量的存储空间,下降了存储成本。
  3. 更高的压缩比意味着更小的data size,从磁盘中读取相应数据耗时更短。
  4. 自由的压缩算法选择。不一样列的数据具备不一样的数据类型,适用的压缩算法也就不尽相同。能够针对不一样列类型,选择最合适的压缩算法。
  5. 高压缩比,意味着同等大小的内存可以存放更多数据,系统cache效果更好。

OrcFile

OrcFile存储格式:
数据仓库之Hive快速入门 - 离线&实时数仓架构

Orc列式存储优势:

  • 查询时只须要读取查询所涉及的列,下降IO消耗,同时保存每一列统计信息,实现部分谓词下推
  • 每列数据类型一致,可针对不一样的数据类型采用其高效的压缩算法
  • 列式存储格式假设数据不会发生改变,支持分片、流式读取,更好的适应分布式文件存储的特性

除了Orc外,Parquet也是经常使用的列式存储格式。Orc VS Parquet:

  • OrcFile和Parquet都是Apache的顶级项目
  • Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新
  • Parquet的压缩能力较高,Orc的查询效率较高

离线数仓VS实时数仓

数据仓库之Hive快速入门 - 离线&实时数仓架构

离线数仓:

  • 离线数据仓库主要基于Hive等技术来构建T+1的离线数据
  • 经过定时任务天天拉取增量数据导入到Hive表中
  • 建立各个业务相关的主题维度数据,对外提供T+1的数据查询接口

离线数仓架构:

  • 数据源经过离线的方式导入到离线数仓中
  • 数据分层架构:ODS、DWD、 DM
  • 下游应用根据业务需求选择直接读取DM

实时数仓:

  • 实时数仓基于数据采集工具,将原始数据写入到Kafka等数据通道
  • 数据最终写入到相似于HBase这样支持快速读写的存储系统
  • 对外提供分钟级别、甚至秒级别的查询方案

实时数仓架构:

  • 业务实时性要求的不断提升,实时处理从次要部分变成了主要部分
  • Lambda架构:在离线大数据架构基础上加了一个加速层,使用流处理技术完成实时性较高的指标计算
  • Kappa架构:以实时事件处理为核心,统一数据处理

图解Lambda架构数据流程

Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。

Lambda 架构使开发人员可以构建大规模分布式数据处理系统。它具备很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性。

Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。
数据仓库之Hive快速入门 - 离线&实时数仓架构

在 Lambda 架构中,每层都有本身所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。批处理层使用可处理大量数据的分布式处理系统预先计算结果。它经过处理全部的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来从新计算的,可以修复任何错误,而后更新现有的数据视图。输出一般存储在只读数据库中,更新则彻底取代现有的预先计算好的视图。

速度处理层会实时处理新来的数据。速度层经过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后当即可用。而当一样的数据在批处理层处理完成后,在速度层的数据就能够被替代掉了。

本质上,速度层弥补了批处理层所致使的数据视图滞后。好比说,批处理层的每一个任务都须要 1 个小时才能完成,而在这 1 个小时里,咱们是没法获取批处理层中最新任务给出的数据视图的。而速度层由于可以实时处理数据给出结果,就弥补了这 1 个小时的滞后。

全部在批处理层和速度层处理完的结果都输出存储在服务层中,服务层经过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。

全部的新用户行为数据均可以同时流入批处理层和速度层。批处理层会永久保存数据而且对数据进行预处理,获得咱们想要的用户行为模型并写入服务层。而速度层也同时对新用户行为数据进行处理,获得实时的用户行为模型。

而当“应该对用户投放什么样的广告”做为一个查询(Query)来到时,咱们从服务层既查询服务层中保存好的批处理输出模型,也对速度层中处理的实时行为进行查询,这样咱们就能够获得一个完整的用户行为历史了。

一个查询就以下图所示,既经过批处理层兼顾了数据的完整性,也能够经过速度层弥补批处理层的高延时性,让整个查询具备实时性。
数据仓库之Hive快速入门 - 离线&实时数仓架构


Kappa 架构 VS Lambda

Lambda 架构的不足

虽然 Lambda 架构使用起来十分灵活,而且能够适用于不少的应用场景,但在实际应用的时候,Lambda 架构也存在着一些不足,主要表如今它的维护很复杂。

使用 Lambda 架构时,架构师须要维护两个复杂的分布式系统,而且保证他们逻辑上产生相同的结果输出到服务层中。举个例子吧,咱们在部署 Lambda 架构的时候,能够部署 Apache Hadoop 到批处理层上,同时部署 Apache Flink 到速度层上。

咱们都知道,在分布式框架中进行编程实际上是十分复杂的,尤为是咱们还会针对不一样的框架进行专门的优化。因此几乎每个架构师都认同,Lambda 架构在实战中维护起来具备必定的复杂性。

那要怎么解决这个问题呢?咱们先来思考一下,形成这个架构维护起来如此复杂的根本缘由是什么呢?

维护 Lambda 架构的复杂性在于咱们要同时维护两套系统架构:批处理层和速度层。咱们已经说过了,在架构中加入批处理层是由于从批处理层获得的结果具备高准确性,而加入速度层是由于它在处理大规模数据时具备低延时性。

那咱们能不能改进其中某一层的架构,让它具备另一层架构的特性呢?例如,改进批处理层的系统让它具备更低的延时性,又或者是改进速度层的系统,让它产生的数据视图更具准确性和更加接近历史数据呢?

另一种在大规模数据处理中经常使用的架构——Kappa 架构(Kappa Architecture),即是在这样的思考下诞生的。

Kappa 架构

Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的做者之一,也是如今 Confluent 大数据公司的 CEO。

克雷普斯提出了一个改进 Lambda 架构的观点:

咱们能不能改进 Lambda 架构中速度层的系统性能,使得它也能够处理好数据的完整性和准确性问题呢?咱们能不能改进 Lambda 架构中的速度层,使它既可以进行实时数据处理,同时也有能力在业务逻辑更新的状况下从新处理之前处理过的历史数据呢?

他根据自身多年的架构经验发现,咱们是能够作到这样的改进的。咱们知道像 Apache Kafka 这样的流处理平台是具备永久保存数据日志的功能的。经过Kafka的这一特性,咱们能够从新处理部署于速度层架构中的历史数据。

下面我就以 Kafka 为例来介绍整个全新架构的过程。

第一步,部署 Kafka,并设置数据日志的保留期(Retention Period)。

这里的保留期指的是你但愿可以从新处理的历史数据的时间区间。例如,若是你但愿从新处理最多一年的历史数据,那就能够把 Apache Kafka 中的保留期设置为 365 天。若是你但愿可以处理全部的历史数据,那就能够把 Apache Kafka 中的保留期设置为“永久(Forever)”。

第二步,若是咱们须要改进现有的逻辑算法,那就表示咱们须要对历史数据进行从新处理。咱们须要作的就是从新启动一个 Kafka 做业实例(Instance)。这个做业实例将重头开始,从新计算保留好的历史数据,并将结果输出到一个新的数据视图中。

咱们知道 Kafka 的底层是使用 Log Offset 来判断如今已经处理到哪一个数据块了,因此只须要将 Log Offset 设置为 0,新的做业实例就会重头开始处理历史数据。

第三步,当这个新的数据视图处理过的数据进度遇上了旧的数据视图时,咱们的应用即可以切换到重新的数据视图中读取。

第四步,中止旧版本的做业实例,并删除旧的数据视图。

这个架构就如同下图所示。
数据仓库之Hive快速入门 - 离线&实时数仓架构

与 Lambda 架构不一样的是,Kappa 架构去掉了批处理层这一体系结构,而只保留了速度层。你只须要在业务逻辑改变又或者是代码更改的时候进行数据的从新处理。Kappa 架构统一了数据的处理方式,再也不维护离线和实时两套代码逻辑。

Kappa 架构的不足

Kappa 架构也是有着它自身的不足的。由于 Kappa 架构只保留了速度层而缺乏批处理层,在速度层上处理大规模数据可能会有数据更新出错的状况发生,这就须要咱们花费更多的时间在处理这些错误异常上面。若是需求发生变化或历史数据须要从新处理都得经过上游重放来完成。而且从新处理历史的吞吐能力会低于批处理。

还有一点,Kappa 架构的批处理和流处理都放在了速度层上,这致使了这种架构是使用同一套代码来处理算法逻辑的。因此 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。

Lambda VS Kappa

数据仓库之Hive快速入门 - 离线&实时数仓架构


主流大公司的实时数仓架构

阿里菜鸟实时数仓

数据仓库之Hive快速入门 - 离线&实时数仓架构
数据仓库之Hive快速入门 - 离线&实时数仓架构

美团实时数仓

数据仓库之Hive快速入门 - 离线&实时数仓架构

实时数仓建设特征

  • 总体架构设计经过分层设计为OLAP查询分担压力
  • 复杂的计算统一在实时计算层作,避免给OLAP查询带来过大的压力
  • 汇总计算经过OLAP数据查询引擎进行
  • 整个架构中实时计算通常 是Spark+Flink配合
  • 消息队列Kafka一家独大,配合HBase、ES、 Mysq|进行数据落盘
  • OLAP领域Presto、Druid、 Clickhouse、 Greenplum等等层出不穷
相关文章
相关标签/搜索