基于 Hive 的离线数仓每每是企业大数据生产系统中不可缺乏的一环。Hive 数仓有很高的成熟度和稳定性,但因为它是离线的,延时很大。在一些对延时要求比较高的场景,须要另外搭建基于 Flink 的实时数仓,将链路延时下降到秒级。可是一套离线数仓加一套实时数仓的架构会带来超过两倍的资源消耗,甚至致使重复开发。 数据库
想要搭建流式链路就必须得抛弃现有的 Hive 数仓吗?并非,借助 Flink 能够实现已有的 Hive 离线数仓准实时化。本文整理自 Apache Flink Committer、阿里巴巴技术专家李劲松的分享,文章将分析当前离线数仓实时化的难点,详解 Flink 如何解决 Hive 流批一体准实时数仓的难题,实现更高效、合理的资源配置。文章大纲以下:缓存
上图是一个典型的离线数仓,假设如今公司有一个需求,目前公司的数据量很大,须要天天出一个报表且输出到业务数据库中。首先是刚入库的业务数据,大体分为两种,一种是 MySQL 的 binlog,另一种是业务系统中的业务打点,这个日志打点信息能够经过 Flume 等工具去采集,再离线入库到数仓中。而后随着业务愈来愈多,业务中的各个表能够作一些抽象,抽象的好处是更好的管理和更高效的数据复用和计算复用。因此数仓就分红了多层 (明细层、中间层、服务层等等),每一层存的是数据表,数据表之间经过 HiveSQL 的计算来实现 ETL 转换。数据结构
不止是 HiveSQL ,Hive 只是静态的批计算,而业务天天都要出报表,这意味着天天都要进行计算,这种状况下会依赖于调度工具和血缘管理:架构
当任务十分庞大的时候,咱们得出结果每每须要很长的一段时间,也就是咱们常说的 T+1,H+1 ,这就是离线数仓的问题。并发
上面说过,离线数仓不只仅是简单的 Hive 计算,它还依赖了其它的第三方工具,好比:框架
不管是离线数仓仍是第三方工具,其实主要的问题仍是“慢”,如何解决慢的问题,此时就该实时数仓出场了。运维
实时数仓实际上是从 Hive+HDFS 的组合换成了 Kafka,ETL 的功能经过 Flink 的流式处理解决。此时就不存在调度和血缘管理的问题了,经过实时不断的增量更新,最终输出到业务的 DB 中。分布式
虽然延时下降了,但此时咱们会面临另一些问题:工具
因此此时不少人就会选择一套实时一套离线的作法,互不干扰,根据任务是否须要走实时的需求来对需求进行分离。学习
这套架构看似解决了全部问题,但实际带来的问题也是很是多。首先,Lambda 架构形成了离线和实时的割裂问题,它们解决的业务问题都是同样的,可是两套方案让一样的数据源产生了不一样的计算结果。不一样层级的表结构可能不一致,而且当数据产生不一致的问题时,还须要去进行比对排查。
随着这套 Lambda 架构越走越远,开发团队、表结构表依赖、计算模型等均可能会被割裂开,越到后面越会发现,成本愈来愈高,而统一的代价愈来愈大。
那么问题来了,实时数仓会耗费如此大的资源,且还不能保留历史数据,Lambda 架构存在如此多的问题,有什么方案能够解决呢?
数据湖拥有很多的优势,原子性可让咱们作到准实时的批流一体,而且支持已有数据的修改操做。可是毕竟数据湖是新一代数仓存储架构,各方面都还不是很完美,目前已有的数据湖都强依赖于 Spark(固然 Flink 也正在拥抱数据湖),将数据迁移到数据湖须要团队对迁移成本和人员学习成本进行考量。
若是没有这么大的决心迁移数据湖,那有没有一个稍微缓和一些的方案加速已有的离线数仓呢?
Flink 一直持续致力于离线和实时的统一,首先是统一元数据。简单来讲就是把 Kafka 表的元数据信息存储到 HiveMetaStore 中,作到离线和实时的表 Meta 的统一。(目前开源的实时计算并无一个较为完善的持久化 MetaStore,Hive MetaStore 不只能保存离线表,也能够承担实时计算的 MetaStore 能力)。
一样的元数据以后,实时和离线的表结构和层次能够设计成同样,接下来就是能够共用:
分析了元数据和计算引擎的统一,更进一步,是否能统一实时和离线的数据,避免数据的不一致,避免数据的重复存储和重复计算。ETL 计算是否能统一呢?既然实时表设计上能够和离线表如出一辙,是否能够干脆只有实时表的 ETL 计算,离线表从实时表里获取数据?
而且,经过实时链路能够加速离线链路的数据准备,批计算能够把调度换成流输入。
Flink Hive/File Streaming Sink 即为解决这个问题,实时 Kafka 表能够实时的同步到对于的离线表中:
此时离线的批计算也能够交由实时调度,在实时任务处理中某个契机 (Partition Commit 见后续) 自行调度离线那块的任务进行数据同步操做。
此时实时和离线的表已经基本统一,那么问题来了,Kafka 中的表和 Hive 中的表可否就共用一张表呢?个人想法是以后可能会出现如下状况,在数仓中定义一张表,分别对应着 Kafka 和 Hive+HDFS 两种物理存储:
Flink 1.11 前已经有了 StreamingFileSink,在 1.11 中不但把它集成到 SQL 中,让这个 Hive Streaming Sink 能够像离线的 Hive SQL 那样,全部的业务逻辑都由 SQL 去处理,并且带来了进一步的增量。
接下来介绍下 Hive/File Streaming Sink,分为两个组件,FileWriter 和 PartitionCommitter:
由于流式做业是不间断的在运行的,如何设置分区提交的时间,某个分区何时提交它呢?
若是当前时间 Current time > 分区产生的时间 + commitDelay 延时,便是能够开始进行分区提交的时间。一个简单的例子是小时分区,好比当前已经 12 点过 1 分了,已通过了 11 点的分区 + 一个小时,因此咱们能够说不会再有 11 点分区的数据过来了,就能够提交 11 点的分区。(要是有 LateEvent 怎么办?因此也要求分区的提交是幂等的。)
接下来介绍分区的提交具体做用,最直接的就是写 SuccessFile 和 Add partition 到 Hive metastore。
Flink 内置支持了 Hive-MetaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 为 "metastore,success-file",便可作到在 commit 分区的时候自动 add 分区到 Hive 中,并且写 SuccessFile,当 add 操做完成的时候,这个 partition 才真正的对 Hive 可见。
Custom 机制容许自定义一个 Partition Commit Policy 的类,实现这个类能够作到在这个分区的任务处理完成后:好比触发下游的调度、Statistic Analysis、又或者触发 Hive 的小文件合并。(固然触发 Hive 的小文件合并不但须要启动另外一个做业,并且作不到一致性保证,后续 Flink 也会有进一步的探索,在 Flink 做业中,主动完成小文件的合并)。
不止是准实时的数据摄入,Flink 也带来了维表关联 Hive 表和流实时消费 Hive 表。
咱们知道 Flink 是支持维表关联查询 MySQL 和 HBase 的,在计算中维护一个 LRU 的缓存,未命中查询 MySQL 或 HBase。可是没有 Lookup 的能力怎么办呢?数据通常是放在离线数仓中的,因此业务上咱们通常采用 Hive Table 按期同步到 HBase 或者 MySQL。Flink 也能够容许直接维表关联 Hive 表,目前的实现很简单,须要在每一个并发中全量 Load Hive 表的全部数据,只能针对小表的关联。
传统的 Hive Table 只支持按照批的方式进行读取计算,可是咱们如今可使用流的方式来监控 Hive 里面的分区 / 文件生成,也就是每一条数据过来,均可以实时的进行消费计算,它也是彻底复用 Flink Streaming SQL 的方式,能够和 HBase、MySQL、Hive Table 进行 Join 操做,最后再经过 FileWriter 实时写入到 Hive Table 中。
案例以下:经过 Flume 采集日志打点 Logs,计算各年龄层的 PV,此时咱们存在两条链路:
这里就是咱们刚刚提到的,虽然是对应两个 database:realtime_db 和 offline_db,可是它们共用一份元数据。
对于 Hive 表咱们能够经过 Flink SQL 提供的 Hive dialect 语法,而后经过 Hive 的 DDL 语法来在 Flink 中建立 Hive 表,这里设置 PARTITION BY 天和小时,是与实时链路的不一样之处,由于实时链路是没有分区概念的。
如何在表结构里避免分区引发的 Schema 差别?一个能够解决的方案是考虑引入 Hidden Partition 的定义,Partition 的字段能够是某个字段的 Computed Column,这也能够与实际常见的状况作对比,如天或小时是由时间字段计算出的,以后是下面的三个参数:
以后设置回默认的 Flink dialect,建立 Kafka 的实时表,经过 insert into 将 Kafka 中的数据同步到 Hive 之中。
这部分是关于 Kafka 中的表如何经过 Dim join 的方式,拿到 User 表的年龄字段。图中须要关心的是 lookup.join.cache.ttl 这个参数,咱们会将 user 这张表用相似于 broadcast 的方式,广播到每个 task 中,可是这个过程当中可能出现 Hive 中的 table 存在更新操做,这里的 1h 就说明,数据有效期仅为 1 小时。建立 view 的目的是将 Dim join 所须要的 process time 加上(Dim Join 须要定义 Process time 是个不太天然的过程,后续也在考虑如何在不破坏 SQL 语义的同时,简化 DimJoin 的语法。)
经过实时 Pipeline 的手段消费 Hive Table,而不是经过调度或者以往手动触发的 batch 做业,第一个参数 streaming-source.enable,打开流处理机制,而后使用 start-offset 参数指定从哪一个分区 / 文件开始消费。此时,整个流批一体准实时数仓应用基本算是完成啦。
Hive 做为分区级别管理的 Table Format 在一些方便有比较大的限制,若是是新型的 Table Format 好比 Iceberg 会有更好的支持,将来 Flink 会在下面几个方面增强:
做者介绍:
李劲松,花名之信,阿里巴巴技术专家,Apache Flink Committer。2014 年起专一于阿里内部 Galaxy 流计算框架;2017 年起开始 Flink 研发,主要专一于 Batch 计算、数据结构与类型。