数据仓库的定义:java
数据仓库VS数据库:python
OLTP VS OLAP:算法
常规的数仓架构:sql
为何建设数据仓库:数据库
数据仓库建设目标:apache
如何实现:编程
数仓建设背景:json
为何进行数仓分层:bash
常见的分层含义:服务器
STG层
stg_主题_表内容_分表规则
ODS层
ods_主题_表内容_分表规则
DWD层
dwd_业务描述时间粒度
DWS层
dws_业务描述_时间粒度_sum
DIM层
dim_维度描述
DM层
dm_主题_表内容_分表规则
分层之间的数据流转:
Hive简介:
Hive的简单架构图:
Hive VS Hadoop:
产品定位
Hive是数据仓库,为海量数据的离线分析设计的,不支持OLTP(联机事务处理所需的关键功能ACID,而更接近于OLAP(联机分析技术)),适给离线处理大数据集。而MySQL是关系型数据库,是为实时业务设计的。
可扩展性
Hive中的数据存储在HDFS(Hadoop的分布式文件系统),metastore元数据一 般存储在独立的关系型数据库中,而MySQL则是服务器本地的文件系统。所以Hive具备良好的可扩展性,数据库因为ACID语义的严格限制,扩展性十分有限。
读写模式
Hive为读时模式,数据的验证则是在查询时进行的,这有利于大数据集的导入,读时模式使数据的加载很是迅速,数据的加载仅是文件复制或移动。MySQL为写时模式,数据在写入数据库时对照模式检查。写时模式有利于提高查询性能,由于数据库能够对列进行索引。
数据更新
Hive是针对数据仓库应用设计的,而数仓的内容是读多写少的,Hive中不支持对数据进行改写,全部数据都是在加载的时候肯定好的。而数据库中的数据一般是须要常常进行修改的。
索引
Hive支持索引,可是Hive的索引与关系型数据库中的索引并不相同,好比,Hive不支持主键或者外键。Hive提供了有限的索引功能,能够为-些字段创建索引,一张表的索引数据存储在另一张表中。因为数据的访问延迟较高,Hive不适合在线数据查询。数据库在少星的特定条件的数据访问中,索引能够提供较低的延迟。
计算模型
Hive默认使用的模型是MapReduce(也能够on spark、on tez),而MySQL使用的是本身设计的Executor计算模型
参考:
Hive数据类型:
Hive分区:
Hive经常使用基础语法:
USE DATABASE_NAME
CREATE DATABASE IF NOT EXISTS DB NAME
DESC DATABASE DB NAME
CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
SELECT * FROM TABLE NAME
ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME
写个Python脚本生成一些测试数据:
import json import random import uuid name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline') hobby = ('reading', 'play', 'dancing', 'sing') subject = ('math', 'chinese', 'english', 'computer') data = [] for item in name: scores = {key: random.randint(60, 100) for key in subject} data.append("|".join([uuid.uuid4().hex, item, ','.join( random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])])) with open('test.csv', 'w') as f: f.write('\n'.join(data))
执行该脚本,生成测试数据文件:
[root@hadoop01 ~/py-script]# python3 gen_data.py [root@hadoop01 ~/py-script]# ll -h ... -rw-r--r--. 1 root root 745 11月 9 11:09 test.csv [root@hadoop01 ~/py-script]#
咱们能够看一下生成的数据:
[root@hadoop01 ~/py-script]# cat test.csv f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77 ...
|
符进行分割,前两个字段都是string
类型,第三个字段是array
类型,第四个字段是map
类型建立测试用的数据库:
0: jdbc:hive2://localhost:10000> create database hive_test; No rows affected (0.051 seconds) 0: jdbc:hive2://localhost:10000> use hive_test; No rows affected (0.06 seconds) 0: jdbc:hive2://localhost:10000>
建立测试表:
CREATE TABLE test( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
将本地数据加载到Hive中:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test; No rows affected (0.785 seconds) 0: jdbc:hive2://localhost:10000>
查询数据:
了解了Hive中的SQL基本操做以后,咱们来看看Hive是如何将SQL转换为MapReduce任务的,整个转换过程分为六个阶段:
与普通SQL同样,咱们能够经过在HQL前面加上explain
关键字查看HQL的执行计划:
explain select * from test where id > 10 limit 1000
Hive会将这条语句解析成一个个的Operator,Operator就是Hive解析以后的最小单元,每一个Operator其实都是对应一个MapReduce任务。例如,上面这条语句被Hive解析后,就是由以下Operator组成:
同时,Hive实现了优化器对这些Operator的顺序进行优化,帮助咱们提高查询效率。Hive中的优化器主要分为三类:
内部表:
和传统数据库的Table概念相似,对应HDFS上存储目录,删除表时,删除元数据和表数据。内部表的数据,会存放在HDFS中的特定的位置中,能够经过配置文件指定。当删除表时,数据文件也会一并删除。适用于临时建立的中间表。
外部表:
指向已经存在的HDFS数据,删除时只删除元数据信息。适用于想要在Hive以外使用表的数据的状况,当你删除External Table时,只是删除了表的元数据,它的数据并无被删除。适用于数据多部门共享。建表时使用
create external table
。指定external
关键字便可。
分区表:
Partition对应普通数据库对Partition列的密集索引,将数据按照Partition列存储到不一样目录,便于并行分析,减小数据量。分区表建立表的时候须要指定分区字段。
分区字段与普通字段的区别:分区字段会在HDFS表目录下生成一个分区字段名称的目录,而普通字段则不会,查询的时候能够当成普通字段来使用,通常不直接和业务直接相关。
分桶表:
对数据进行hash,放到不一样文件存储,方便抽样和join查询。能够将内部表,外部表和分区表进一步组织成桶表,能够将表的列经过Hash算法进一步分解成不一样的文件存储。
对于内部表和外部表的概念和应用场景咱们很容易理解,咱们须要重点关注一下分区表和分桶表。 咱们为何要创建分区表和分桶表呢?HQL经过where
子句来限制条件提取数据,那么与其遍历一张大表,不如将这张大表拆分红多个小表,并经过合适的索引来扫描表中的一小部分,分区和分桶都是采用了这种理念。
分区会建立物理目录,而且能够具备子目录(一般会按照时间、地区分区),目录名以 分区名=值
形式命名,例如:create_time=202011
。分区名会做为表中的伪列,这样经过where
字句中加入分区的限制能够在仅扫描对应子目录下的数据。经过 partitioned by (feld1 type, ...)
建立分区列。
分桶能够继续在分区的基础上再划分小表,分桶根据哈希值来肯定数据的分布(即MapReducer中的分区),好比分区下的一部分数据能够根据分桶再分为多个桶,这样在查询时先计算对应列的哈希值并计算桶号,只须要扫描对应桶中的数据便可。分桶经过clustered by(field) into n buckets
建立。
接下来简单演示下这几种表的操做,首先将上一小节生成的测试数据文件上传到hdfs中:
[root@hadoop01 ~]# hdfs dfs -mkdir /test [root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test [root@hadoop01 ~]# hdfs dfs -ls /test Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv [root@hadoop01 ~]#
建表SQL:
CREATE TABLE test_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
将hdfs数据加载到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table; No rows affected (0.169 seconds) 0: jdbc:hive2://localhost:10000>
查看建立的表存储在hdfs的哪一个目录下:
0: jdbc:hive2://localhost:10000> show create table test_table; +----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE TABLE `test_table`( | | `user_id` string, | | `user_name` string, | | `hobby` array<string>, | | `scores` map<string,int>) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | | WITH SERDEPROPERTIES ( | | 'collection.delim'=',', | | 'field.delim'='|', | | 'line.delim'='\n', | | 'mapkey.delim'=':', | | 'serialization.format'='|') | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.mapred.TextInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | | LOCATION | | 'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' | | TBLPROPERTIES ( | | 'bucketing_version'='2', | | 'transient_lastDdlTime'='1604893190') | +----------------------------------------------------+ 22 rows selected (0.115 seconds) 0: jdbc:hive2://localhost:10000>
在hdfs中能够查看到数据文件:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv [root@hadoop01 ~]#
删除表:
0: jdbc:hive2://localhost:10000> drop table test_table; No rows affected (0.107 seconds) 0: jdbc:hive2://localhost:10000>
查看hdfs会发现该表所对应的存储目录也一并被删除了:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/ Found 2 items drwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table drwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test [root@hadoop01 ~]#
建表SQL,与内部表的区别就在于external
关键字:
CREATE external TABLE external_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
将数据文件加载到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table; No rows affected (0.182 seconds) 0: jdbc:hive2://localhost:10000>
此时会发现hdfs中的数据文件会被移动到hive的目录下:
[root@hadoop01 ~]# hdfs dfs -ls /test [root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv [root@hadoop01 ~]#
删除表:
0: jdbc:hive2://localhost:10000> drop table external_table; No rows affected (0.112 seconds) 0: jdbc:hive2://localhost:10000>
查看hdfs会发现该表所对应的存储目录仍然存在:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv [root@hadoop01 ~]#
建表语句:
CREATE TABLE partition_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) PARTITIONED BY (create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
将数据文件加载到Hive中,并指定分区:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011'); No rows affected (0.747 seconds) 0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012'); No rows affected (0.347 seconds) 0: jdbc:hive2://localhost:10000>
执行以下sql,能够从不一样的分区统计结果:
0: jdbc:hive2://localhost:10000> select count(*) from partition_table; +------+ | _c0 | +------+ | 16 | +------+ 1 row selected (15.881 seconds) 0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011'; +------+ | _c0 | +------+ | 8 | +------+ 1 row selected (14.639 seconds) 0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012'; +------+ | _c0 | +------+ | 8 | +------+ 1 row selected (15.555 seconds) 0: jdbc:hive2://localhost:10000>
分区表在hdfs中的存储结构:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table Found 2 items drwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011 drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012 [root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011 Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv [root@hadoop01 ~]#
建表语句:
CREATE TABLE bucket_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) clustered by (user_name) sorted by (user_name) into 2 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
将test
表中的数据插入到bucket_table
中:
0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test; No rows affected (17.393 seconds) 0: jdbc:hive2://localhost:10000>
抽样查询:
分桶表在hdfs的存储目录以下:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table Found 2 items -rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0 -rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0 [root@hadoop01 ~]#
Hive常见内置函数:
查询引擎都自带了一部分函数来帮助咱们解决查询过程中一些复杂的数据计算或者数据转换操做,可是有时候自带的函数功能不能知足业务的须要。这时候就须要咱们本身开发自定义的函数来辅助完成了,这就是所谓的用户自定义函数UDF(User-Defined Functions)。Hive支持三类自定义函数:
UDF函数其实就是一段遵循必定接口规范的程序。在执行过程当中Hive将SQL转换为MapReduce程序,在执行过程中在执行咱们的UDF函数。
本小节简单演示下自定义UDF函数,首先建立一个空的Maven项目,而后添加hive-exec
依赖,版本与你安装的Hive版本需对应上。完整的pom
文件内容以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hive-udf-test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
首先建立一个继承UDF
的类,咱们实现的这个自定义函数功能就是简单的获取字段的长度:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class StrLen extends UDF { public int evaluate(final Text col) { return col.getLength(); } }
以上这种自定义函数只能支持处理普通类型的数据,若是要对复杂类型的数据作处理则须要继承GenericUDF
,并实现其抽象方法。例如,咱们实现一个对测试数据中的scores
字段求平均值的函数:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.text.DecimalFormat; public class AvgScore extends GenericUDF { /** * 函数的名称 */ private static final String FUNC_NAME = "AVG_SCORE"; /** * 函数所做用的字段类型,这里是map类型 */ private transient MapObjectInspector mapOi; /** * 控制精度只返回两位小数 */ DecimalFormat df = new DecimalFormat("#.##"); @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { // 在此方法中能够作一些前置的校验,例如检测函数参数个数、检测函数参数类型 mapOi = (MapObjectInspector) objectInspectors[0]; // 指定函数的输出类型 return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { // 函数的核心逻辑,取出map中的value进行求平均值,并返回一个Double类型的结果值 Object o = deferredObjects[0].get(); double v = mapOi.getMap(o).values().stream() .mapToDouble(a -> Double.parseDouble(a.toString())) .average() .orElse(0.0); return Double.parseDouble(df.format(v)); } @Override public String getDisplayString(String[] strings) { return "func(map)"; } }
对项目进行打包,并上传到服务器中:
[root@hadoop01 ~/jars]# ls hive-udf-test-1.0-SNAPSHOT.jar [root@hadoop01 ~/jars]#
将jar包上传到hdfs中:
[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs [root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs [root@hadoop01 ~/jars]# hdfs dfs -ls /udfs Found 1 items -rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar [root@hadoop01 ~/jars]#
在Hive中添加该jar包:
0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar; No rows affected (0.022 seconds) 0: jdbc:hive2://localhost:10000>
而后注册临时函数,临时函数只会在当前的session中生效:
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen"; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore"; No rows affected (0.008 seconds) 0: jdbc:hive2://localhost:10000>
使用自定义函数处理:
0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test; +------------+---------+------------+ | user_name | length | avg_score | +------------+---------+------------+ | Tom | 3 | 80.25 | | Jerry | 5 | 77.5 | | Jim | 3 | 83.75 | | Angela | 6 | 84.5 | | Ann | 3 | 90.0 | | Bella | 5 | 69.25 | | Bonnie | 6 | 76.5 | | Caroline | 8 | 84.5 | +------------+---------+------------+ 8 rows selected (0.083 seconds) 0: jdbc:hive2://localhost:10000>
删除已注册的临时函数:
0: jdbc:hive2://localhost:10000> drop temporary function strlen; No rows affected (0.01 seconds) 0: jdbc:hive2://localhost:10000> drop temporary function avg_score; No rows affected (0.009 seconds) 0: jdbc:hive2://localhost:10000>
临时函数只会在当前的session中生效,若是须要注册成永久函数则只须要把TEMPORARY
关键字给去掉便可。以下所示:
0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar'; No rows affected (0.049 seconds) 0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar'; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000>
删除永久函数也是把TEMPORARY
关键字给去掉便可。以下所示:
0: jdbc:hive2://localhost:10000> drop function strlen; No rows affected (0.031 seconds) 0: jdbc:hive2://localhost:10000> drop function avg_score; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000>
Hive支持的存储格式:
咱们都知道关系型数据库基本是使用行式存储做为存储格式,而大数据领域更多的是采用列式存储,由于大数据分析场景中一般须要读取大量行,可是只须要少数的几个列。这也是为何一般使用OrcFile做为Hive的存储格式的缘由。因而可知,大数据的绝大部分应用场景都是OLAP场景。
不一样于事务处理(OLTP)的场景,好比电商场景中加购物车、下单、支付等须要在原地进行大量insert、update、delete操做,数据分析(OLAP)场景一般是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制做等。
数据一次性写入后,分析师须要尝试从各个角度对数据作挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个须要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个特色作专门设计,而不是盲目采用传统数据库的技术架构。
在OLAP场景中,一般存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列做为维度列、其余少数几列做为指标列,而后对全表或某一个较大范围内的数据作聚合计算。这个过程会扫描大量的行数据,可是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。
OLTP类业务对于延时(Latency)要求更高,要避免让客户等待形成业务损失;而OLAP类业务,因为数据量很是大,一般更加关注写入吞吐(Throughput),要求海量数据可以尽快导入完成。一旦导入完成,历史数据每每做为存档,不会再作更新、删除操做。
OLAP类业务对于事务需求较少,一般是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。
分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中一般存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然能够在特定场景中加速计算,可是没法知足业务灵活多变的发展需求,维护成本太高。
行式存储和列式存储的对比图:
与行式存储将每一行的数据连续存储不一样,列式存储将每一列的数据连续存储。相比于行式存储,列式存储在分析场景下有着许多优良的特性:
OrcFile存储格式:
Orc列式存储优势:
除了Orc外,Parquet也是经常使用的列式存储格式。Orc VS Parquet:
离线数仓:
离线数仓架构:
实时数仓:
实时数仓架构:
Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。
Lambda 架构使开发人员可以构建大规模分布式数据处理系统。它具备很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性。
Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。
在 Lambda 架构中,每层都有本身所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。批处理层使用可处理大量数据的分布式处理系统预先计算结果。它经过处理全部的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来从新计算的,可以修复任何错误,而后更新现有的数据视图。输出一般存储在只读数据库中,更新则彻底取代现有的预先计算好的视图。
速度处理层会实时处理新来的数据。速度层经过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后当即可用。而当一样的数据在批处理层处理完成后,在速度层的数据就能够被替代掉了。
本质上,速度层弥补了批处理层所致使的数据视图滞后。好比说,批处理层的每一个任务都须要 1 个小时才能完成,而在这 1 个小时里,咱们是没法获取批处理层中最新任务给出的数据视图的。而速度层由于可以实时处理数据给出结果,就弥补了这 1 个小时的滞后。
全部在批处理层和速度层处理完的结果都输出存储在服务层中,服务层经过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。
全部的新用户行为数据均可以同时流入批处理层和速度层。批处理层会永久保存数据而且对数据进行预处理,获得咱们想要的用户行为模型并写入服务层。而速度层也同时对新用户行为数据进行处理,获得实时的用户行为模型。
而当“应该对用户投放什么样的广告”做为一个查询(Query)来到时,咱们从服务层既查询服务层中保存好的批处理输出模型,也对速度层中处理的实时行为进行查询,这样咱们就能够获得一个完整的用户行为历史了。
一个查询就以下图所示,既经过批处理层兼顾了数据的完整性,也能够经过速度层弥补批处理层的高延时性,让整个查询具备实时性。
虽然 Lambda 架构使用起来十分灵活,而且能够适用于不少的应用场景,但在实际应用的时候,Lambda 架构也存在着一些不足,主要表如今它的维护很复杂。
使用 Lambda 架构时,架构师须要维护两个复杂的分布式系统,而且保证他们逻辑上产生相同的结果输出到服务层中。举个例子吧,咱们在部署 Lambda 架构的时候,能够部署 Apache Hadoop 到批处理层上,同时部署 Apache Flink 到速度层上。
咱们都知道,在分布式框架中进行编程实际上是十分复杂的,尤为是咱们还会针对不一样的框架进行专门的优化。因此几乎每个架构师都认同,Lambda 架构在实战中维护起来具备必定的复杂性。
那要怎么解决这个问题呢?咱们先来思考一下,形成这个架构维护起来如此复杂的根本缘由是什么呢?
维护 Lambda 架构的复杂性在于咱们要同时维护两套系统架构:批处理层和速度层。咱们已经说过了,在架构中加入批处理层是由于从批处理层获得的结果具备高准确性,而加入速度层是由于它在处理大规模数据时具备低延时性。
那咱们能不能改进其中某一层的架构,让它具备另一层架构的特性呢?例如,改进批处理层的系统让它具备更低的延时性,又或者是改进速度层的系统,让它产生的数据视图更具准确性和更加接近历史数据呢?
另一种在大规模数据处理中经常使用的架构——Kappa 架构(Kappa Architecture),即是在这样的思考下诞生的。
Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的做者之一,也是如今 Confluent 大数据公司的 CEO。
克雷普斯提出了一个改进 Lambda 架构的观点:
咱们能不能改进 Lambda 架构中速度层的系统性能,使得它也能够处理好数据的完整性和准确性问题呢?咱们能不能改进 Lambda 架构中的速度层,使它既可以进行实时数据处理,同时也有能力在业务逻辑更新的状况下从新处理之前处理过的历史数据呢?
他根据自身多年的架构经验发现,咱们是能够作到这样的改进的。咱们知道像 Apache Kafka 这样的流处理平台是具备永久保存数据日志的功能的。经过Kafka的这一特性,咱们能够从新处理部署于速度层架构中的历史数据。
下面我就以 Kafka 为例来介绍整个全新架构的过程。
第一步,部署 Kafka,并设置数据日志的保留期(Retention Period)。
这里的保留期指的是你但愿可以从新处理的历史数据的时间区间。例如,若是你但愿从新处理最多一年的历史数据,那就能够把 Apache Kafka 中的保留期设置为 365 天。若是你但愿可以处理全部的历史数据,那就能够把 Apache Kafka 中的保留期设置为“永久(Forever)”。
第二步,若是咱们须要改进现有的逻辑算法,那就表示咱们须要对历史数据进行从新处理。咱们须要作的就是从新启动一个 Kafka 做业实例(Instance)。这个做业实例将重头开始,从新计算保留好的历史数据,并将结果输出到一个新的数据视图中。
咱们知道 Kafka 的底层是使用 Log Offset 来判断如今已经处理到哪一个数据块了,因此只须要将 Log Offset 设置为 0,新的做业实例就会重头开始处理历史数据。
第三步,当这个新的数据视图处理过的数据进度遇上了旧的数据视图时,咱们的应用即可以切换到重新的数据视图中读取。
第四步,中止旧版本的做业实例,并删除旧的数据视图。
这个架构就如同下图所示。
与 Lambda 架构不一样的是,Kappa 架构去掉了批处理层这一体系结构,而只保留了速度层。你只须要在业务逻辑改变又或者是代码更改的时候进行数据的从新处理。Kappa 架构统一了数据的处理方式,再也不维护离线和实时两套代码逻辑。
Kappa 架构也是有着它自身的不足的。由于 Kappa 架构只保留了速度层而缺乏批处理层,在速度层上处理大规模数据可能会有数据更新出错的状况发生,这就须要咱们花费更多的时间在处理这些错误异常上面。若是需求发生变化或历史数据须要从新处理都得经过上游重放来完成。而且从新处理历史的吞吐能力会低于批处理。
还有一点,Kappa 架构的批处理和流处理都放在了速度层上,这致使了这种架构是使用同一套代码来处理算法逻辑的。因此 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。