数据仓库是公司数据发展到必定规模后必然须要提供的一种基础服务,也是“数据智能”建设的基础环节。迅速获取数据反馈不只有利于改善产品及用户体验,更有利于公司的科学决策,所以获取数据的实时性尤其重要。html
目前企业的数仓建设大可能是离线一套,实时一套。业务要求低延时的使用实时数仓;业务复杂的使用离线数仓。架构十分复杂,须要使用不少系统和计算框架,这就要求企业储备多方面的人才,致使人才成本较高,且出了问题难以排查,终端用户也须要熟悉多种语法。本文分析目前的数仓架构,探索离线和实时数仓是否能放在一块儿考虑,探索Flink的统一架构是否能解决大部分问题。mysql
文末有福利,可下载电子书。sql
数据仓库能够分为三层:ODS(原始数据层)、DW(数据仓库层)、ADS(应用数据层)。数据库
从日志或者业务DB传输过来的原始数据,传统的离线数仓作法也有直接用CDC (Change Data Capture) 工具周期同步到数仓里面。用一套统一的Kafka来承接这个角色,可让数据更实时的落入数仓,也能够在这一层统一实时和离线的。apache
DW层通常也分为DWD层和DWS层:架构
和DWS不一样的是,这一层直接面向用户的数据服务,不须要再次计算,已是最终须要的数据。并发
主要分为两条链路:app
主流的数仓架构仍然是Lambda架构,Lambda架构虽然复杂,可是它能覆盖业务上须要的场景,对业务来讲,是最灵活的方式。框架
Lambda架构分为两条链路:分布式
上图标出了1-9条边,每条边表明数据的转换,就是大数据的计算,本文后续将分析这些边,探索Flink在其中能够发挥的做用。
先说下元数据的管理,离线数仓有Hive metastore来管理元数据,可是单纯的Kafka不具有元数据管理的能力,这里推荐两种作法:
搭建起schema registry服务后,经过confluent的url便可获取到表的schema信息,对于上百个字段的表,它能够省编写Flink做业时的不少事,后续Flink也正在把它的schema推断功能结合Confluent schema registry。可是它仍然省不掉建立表的过程,用户也须要填写Confluent对应的URL。
目前Flink内置已提供了HiveCatalog,Kafka的表能够直接集成到Hive metastore中,用户在SQL中能够直接使用这些表。可是Kafka的start-offset一些场景须要灵活的配置,为此,Flink也正在提供 LIKE [1] 和 Table Hints [2] 等手段来解决。
Flink中离线数仓和实时数仓都使用Hive Catalog:
use catalog my_hive; -- build streaming database and tables; create database stream_db; use stream_db; create table order_table ( id long, amount double, user_id long, status string, ts timestamp, … -- 可能还有几十个字段 ts_day string, ts_hour string ) with ( ‘connector.type’ = ‘kafka’, … -- Kafka table相关配置 ); -- build batch database and tables; create database batch_db; use batch_db; create table order_table like stream_db.order_table (excluding options) partitioned by (ts_day, ts_hour) with ( ‘connector.type’ = ‘hive’, … -- Hive table相关配置 );
使用Catalog,后续的计算能够彻底复用批和流,提供相同的体验。
计算①和⑤分别是实时数仓的导入和离线数仓的导入,近来,更加实时的离线数仓导入愈来愈成为数据仓库的常规作法,Flink的导入可让离线数仓的数据更实时化。
之前主要经过DataStream + StreamingFileSink的方式进行导入,可是不支持ORC和没法更新HMS。
Flink streaming integrate Hive后,提供Hive的streaming sink [3],用SQL的方式会更方便灵活,使用SQL的内置函数和UDF,并且流和批能够复用,运行两个流计算做业。
insert into [stream_db.|batch_db.]order_table select … from log_table;
计算②和⑥分别是实时数仓和离线数仓的中间数据处理,这里面主要有三种计算:
与离线计算不一样,离线计算只用关心某个时间点的维表数据,而Streaming的做业持续运行,因此它关注的不能只是静态数据,须要是动态的维表。
另外为了Join的效率,streaming做业每每是join一个数据库表,而不只仅是Hive表。
例子:
-- stream 维表 use stream_db; create table user_info ( user_id long, age int, address, primary key(user_id) ) with ( ‘connector.type’ = ‘jdbc’, ... ); -- 将离线数仓的维表导入实时数仓中 insert into user_info select * from batch_db.user_info; -- 维表Join,SQL批流复用 insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id;
这里有个很是麻烦的事情,那就是在实时数仓中,须要按时周期调度更新维表到实时维表数据库中,那能不能直接Join离线数仓的Hive维表呢?目前社区也正在开发Hive维表,它有哪些挑战:
例子:
select age, avg(amount) from order_with_user_age group by age;
一句简单的聚合SQL,它在批计算和流计算的执行模式是彻底不一样的。
Streaming的聚合和离线计算的聚合最大的不一样在于它是一个动态表[4],它的输出是在持续变化的。动态表的概念简单来讲,一个streaming的count,它的输出是由输入来驱动的,而不是像batch同样,获取所有输入后才会输出,因此,它的结果是动态变化的:
有状态计算后的输出:
例子:
-- batch:计算完成后,一次性输出到mysql中,同key只有一个数据 -- streaming:mysql里面的数据不断更新,不断变化 insert into mysql_table select age, avg(amount) from order_with_user_age group by age; -- batch: 同key只有一个数据,append便可 insert into hive_table select age, avg(amount) from order_with_user_age group by age; -- streaming: kafka里面的数据不断append,而且多出一列,来表示这是upsert的消息,后续的Flink消费会自动作出机制来处理upsert insert into kafka_table select age, avg(amount) from order_with_user_age group by age;
离线数仓能够进行计算⑨,对明细数据或者汇总数据均可以进行ad-hoc的查询,可让数据分析师进行灵活的查询。
目前实时数仓一个比较大的缺点是不能Ad-hoc查询,由于它自己没有保存历史数据,Kafka可能能够保存3天以上的数据,可是一是存储成本高、二是查询效率也很差。
一个思路是提供OLAP数据库的批流统一Sink组件:
本文从目前的Lambda架构出发,分析了Flink一栈式数仓计算方案的能力,本文中一些Flink新功能还在快速迭代演进中,随着不断的探索和实践,但愿朝着计算一体化的方向逐渐推动,未来的数仓架构但愿能真正统一用户的离线和实时,提供统一的体验:
[1]https://cwiki.apache.org/conf...
[2]https://cwiki.apache.org/conf...
[3]https://cwiki.apache.org/conf...
[4]https://ci.apache.org/project...
[5]https://cwiki.apache.org/conf...
从容应对生产环境中的技术难题,《Apache Flink 十大技术难点实战》电子书免费下载!
<p style="text-align:center"><font size=5>点击免费下载
《Apache Flink 电子书合辑》>>></font>
本书由 Apache Flink 核心贡献者及一线大厂生产环境使用者总结分享,内容全面丰富,涵盖原理解析、应用实践、demo演示、生产环境常见问题排查与解法、Flink 1.10 生态应用原理与实践,助力大数据开发者真正解决Flink生产应用难题!