本套系列博客从真实商业环境抽取案例进行总结和分享,并给出大数据商业实战指导,请持续关注本套博客。版权声明:本套大数据商业实战系列归做者(秦凯新)全部,禁止转载,欢迎学习。git
STAGE 层做为数据缓冲层,主要负责采集不一样类型的业务系统数据并保存必定期限内的相关业务数据,完成不一样类型数据源的统一临时存储,同时避免 ETL 操做对业务系统性能形成影响,STAGE 层数据在数据结构、数据之间的逻辑关系上都与业务系统基本保持一致。github
ODS(Operational Data Store)层数据来源于 STAGE 层,它的数据通过了对 STAGE 层数据的清洗,包括编码表去重、去空、垃圾数据过滤、数据类型规则化等。ODS存储了运营系统(如OLTP(联机事务处理)系统)近实时的详细数据。数据库
另外 ODS 做为 DW 和 STAGE 层的桥梁,也能够实现指标一致性的管理,将不一样系统不一样部门相同指标的定义及指标数据按照业务规则取其一,保证不一样源数据的数据一致性,也能够知足用户对明细数据的查询要求,直接从ODS层获取明细数据进行分析。数组
DWD(Data Warehouse Detail)层数据是将 ODS 层数据根据数据清洗规则,通过质量检查、数据清洗、转换、标准化后,造成符合质量要求的公共数据中心,有的也称为 ODS层,是业务层与数据仓库的隔离层数据结构
把 ODS 数据表结构改变成项目主题数据仓库的表结构,对 DWD 层的全部表添加了代理键,标准化了业务系统编码类型不统一的问题,创建了数据仓库维度表和事实表的关联体系,也为缓慢变化维的实现奠基了基础。并发
DWC(Data Warehouse Center)层主要管理固化报表的数据存储,数据主要来源于 DWD 层,根据前台所需数据创建物理模型,使用 ETL 抽取 DWD 层数据推送给 DWC 层,这样显著减小前台应用直接关联 DWD 层查询明细数据的成本,提升平台数据获取的速度。高并发
DWB数据层(data warehouse base)表示 基础数据层,存储的是客观数据,通常用做中间层,能够认为是大量指标的数据层。post
DWS数据层 (data warehouse service)表示 服务数据层,基于DWB上的基础数据,整合汇总成分析某一个主题域的服务数据,通常是宽表。性能
DM(Data Mart)层即数据集市,将指标与维度创建物理模型组成数据集市,这是 OLAP 的数据基础。该层实现了合并不一样系统的数据源来知足面向主题的业务需求,它的建模是终端用户驱动的,也是由业务需求驱动的。按主题,维度及 KPI 指标对 DM 层进行模型设计、建模,DM 层数据是将 DWD 层数据进行进一步整合、转换、汇总、计算等 ETL 操做处理获取的。学习
主要是执行基本平常的事务处理,好比数据库记录的增删查改。好比在银行的一笔交易记录,就是一个典型的事务。
联机分析处理OLAP(On-Line Analytical Processing) 是数据仓库系统的主要应用,支持复杂的分析操做,侧重决策支持,而且提供直观易懂的查询结果。典型的应用就是复杂的动态的报表系统。
OLAP是数据仓库的核心部心,所谓数据仓库是对于大量已经由OLTP造成的数据的一种分析型的数据库,用于处理商业智能、决策支持等重要的决策信息;数据仓库是在数据库应用到必定程序以后而对历史数据的加工与分析,读取较多,更新较少。
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (
id INT,
name STRING,
cty STRING,
st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
复制代码
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (
sk INT,
id INT,
name STRING,
cty STRING,
st STRING,
version INT,
effective_date DATE,
expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');
复制代码
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY tbl_stg.id) + t2.sk_max,
tbl_stg.*,
1,
CAST('1900-01-01' AS DATE),
CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
复制代码
drop table tbl_stg;
LOAD DATA LOCAL INPATH '/root/opendir/test-data/aa.txt' INTO TABLE tbl_stg;
复制代码
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
复制代码
1,张三,US,CA -->scd2 scd1
2,李四,US,CB -->删除
3,王五,CA,BB -->没变
4,赵六,CA,BC -->scd1
5,老刘,AA,AA -->scd2
1,张,U,C
3,王五,CA,BB
4,赵六,AC,CB
5,刘,AA,AA
6,老杨,DD,DD
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);
tbl_dim.sk tbl_dim.id tbl_dim.name tbl_dim.cty tbl_dim.st tbl_dim.version tbl_dim.effective_date tbl_dim.expiry_date
1 1 张三 US CA 1 1900-01-01 2018-09-03 -->过时
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->过时(记录删除)
3 3 王五 CA BB 1 1900-01-01 2200-01-01
4 4 赵六 CA BC 1 1900-01-01 2200-01-01
5 5 老刘 AA AA 1 1900-01-01 2018-09-03 -->过时
复制代码
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.id id,
t2.name name,
t2.cty cty,
t2.st st,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON t1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
tbl_dim.sk tbl_dim.id tbl_dim.name tbl_dim.cty tbl_dim.st tbl_dim.version tbl_dim.effective_date tbl_dim.expiry_date
1 1 张三 US CA 1 1900-01-01 2018-09-03
6 1 张 U C 2 2018-09-03 2200-01-01 --->新增行
2 2 李四 US CB 1 1900-01-01 2018-09-03
3 3 王五 CA BB 1 1900-01-01 2200-01-01
4 4 赵六 CA BC 1 1900-01-01 2200-01-01
5 5 老刘 AA AA 1 1900-01-01 2018-09-03
7 5 刘 AA AA 2 2018-09-03 2200-01-01 --->新增行
复制代码
用先delete再insert代替update,由于SCD1自己就不保存历史数据,因此这里更新维度表里的全部cty或st改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;
原始数据
1,张三,US,CA -->scd2 scd1
2,李四,US,CB -->删除
3,王五,CA,BB -->没变
4,赵六,CA,BC -->scd1
5,老刘,AA,AA -->scd2
过分业务数据
1,张,U,C
3,王五,CA,BB
4,赵六,AC,CB
5,刘,AA,AA
6,老杨,DD,DD
复制代码
修改了第4条数据的cty列和st列(按SCD1处理)
6 1 张 U C 2 2018-09-03 2200-01-01 -->scd2 scd1
1 1 张三 U C 1 1900-01-01 2018-09-03 -->scd2 scd1
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->删除
3 3 王五 CA BB 1 1900-01-01 2200-01-01 -->没变
4 4 赵六 AC CB 1 1900-01-01 2200-01-01 -->scd1
5 5 老刘 AA AA 1 1900-01-01 2018-09-03 -->scd2
7 5 刘 AA AA 2 2018-09-03 2200-01-01 -->scd2
复制代码
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
1,张三,US,CA -->scd2 scd1
2,李四,US,CB -->删除
3,王五,CA,BB -->没变
4,赵六,CA,BC -->scd1
5,老刘,AA,AA -->scd2
1,张,U,C
3,王五,CA,BB
4,赵六,AC,CB
5,刘,AA,AA
6,老杨,DD,DD
6 1 张 U C 2 2018-09-03 2200-01-01 -->scd2 scd1
1 1 张三 U C 1 1900-01-01 2018-09-03 -->scd2 scd1
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->删除
3 3 王五 CA BB 1 1900-01-01 2200-01-01 -->没变
4 4 赵六 AC CB 1 1900-01-01 2200-01-01 -->scd1
5 5 老刘 AA AA 1 1900-01-01 2018-09-03 -->scd2
7 5 刘 AA AA 2 2018-09-03 2200-01-01 -->scd2
8 6 老杨 DD DD 1 2018-09-03 2200-01-01 -->新增
复制代码
drop table if exists articles;
create external table articles(
article_id STRING,
url STRING,
kws array< STRING >
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
LOCATION '/hivepeixun/articles';
复制代码
drop table if exists user_actions;
复合数组元素,拆分split
create external table user_actions(
user_id STRING,
article_id STRING,
ts STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/hivepeixun/users';
collect_set去重功能:
select user_id, collect_set(article_id) from user_actions group by user_id;
collect_list没有去重功能:用户分组后,进行文章汇总成集合:
select user_id, collect_list(article_id) from user_actions group by user_id;
:
select user_id,sort_array(collect_list(article_id)) from user_actions group by user_id;
拆开某条字段,一条数据变成多条记录,没有关联时为空的就过滤掉了:
select article_id,kw from articles2 lateral view explode(kws) t as kw;
select a.* , b.kw from user_actions as a left outer join (select article_id, kw from articles lateral view explode(kws) t as kw) b on (a.article_id =b.article_id)
复制代码
select a.user_id , b.kw , count(1) as weight from user_actions as a
left outer join (select article_id, kw from articles lateral view explode(kws) t as kw)
b on (a.article_id =b.article_id) group by a.user_id ,b.kw order by a.user_id ,weight desc;
复制代码
未完待续
本节内容主要探讨了数据仓库模型与缓慢变化维度技术深度剖析,本文参考了经典案例,并进行从新编排,内容有所变化,可能部分截图来自github公开源码,部分是个人测试案例,若有雷同某位大神私有内容,请直接留言于我,我来从新修正案例。
秦凯新 于深圳