Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

简介: 数据湖的架构中,CDC 数据实时读写的方案和原理

本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理。主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理。文章主要分为 4 个部份内容:数据库

  1. 常见的 CDC 分析方案
  2. 为什么选择 Flink + Iceberg
  3. 如何实时写入读取
  4. 将来规划

1、常见的 CDC 分析方案

咱们先看一下今天的 topic 须要设计的是什么?输入是一个 CDC 或者 upsert 的数据,输出是 Database 或者是用于大数据 OLAP 分析的存储。json

咱们常见的输入主要有两种数据,第一种数据是数据库的 CDC 数据,不断的产生 changeLog;另外一种场景是流计算产生的 upsert 数据,在最新的 Flink 1.12 版本已经支持了 upsert 数据。缓存

1.1 离线 HBase 集群分析 CDC 数据架构

咱们一般想到的第一个方案,就是把 CDC upsert 的数据经过 Flink 进行一些处理以后,实时的写到 HBase 当中。HBase 是一个在线的、能提供在线点查能力的一种数据库,具备很是高的实时性,对写入操做是很是友好的,也能够支持一些小范围的查询,并且集群可扩展。并发

这种方案其实跟普通的点查实时链路是同一套,那么用 HBase 来作大数据的 OLAP 的查询分析有什么问题呢?app

首先,HBase 是一个面向点查设计的一种数据库,是一种在线服务,它的行存的索引不适合分析任务。典型的数仓设计确定是要列存的,这样压缩效率和查询效率才会高。第二,HBase 的集群维护成本比较高。最后,HBase 的数据是 HFile,不方便与大数据里数仓当中典型的 Parquet、Avro、Orc 等结合。高并发

1.2 Apache Kudu 维护 CDC 数据集布局

针对 HBase 分析能力比较弱的状况,社区前几年出现了一个新的项目,这就是 Apache Kudu 项目。Kudu 项目拥有 HBase 的点查能力的同时,采用列存,这样列存加速很是适合 OLAP 分析。性能

这种方案会有什么问题呢?测试

首先 Kudu 是比较小众的、独立的集群,维护成本也比较高,跟 HDFS、S三、OSS 比较割裂。其次因为 Kudu 在设计上保留了点查能力,因此它的批量扫描性能不如 parquet,另外 Kudu 对于 delete 的支持也比较弱,最后它也不支持增量拉取。

1.3 直接导入 CDC 到 Hive 分析

第三种方案,也是你们在数仓中比较经常使用的方案,就是把 MySQL 的数据写到 Hive,流程是:维护一个全量的分区,而后天天作一个增量的分区,最后把增量分区写好以后进行一次 Merge ,写入一个新的分区,流程上这样是走得通的。Hive 以前的全量分区是不受增量的影响的,只有当增量 Merge 成功以后,分区才可查,才是一个全新的数据。这种纯列存的 append 的数据对于分析是很是友好的。

这种方案会有什么问题呢?

增量数据和全量数据的 Merge 是有延时的,数据不是实时写入的,典型的是一天进行一次 Merge,这就是 T+1 的数据了。因此,时效性不好,不支持实时 upsert。每次 Merge 都须要把全部数据所有重读重写一遍,效率比较差、比较浪费资源。

1.4 Spark + Delta 分析 CDC 数据

针对这个问题,Spark + Delta 在分析 CDC 数据的时候提供了 MERGE INTO 的语法。这并不只仅是对 Hive 数仓的语法简化,Spark + Delta 做为新型数据湖的架构(例如 Iceberg、Hudi),它对数据的管理不是分区,而是文件,所以 Delta 优化 MERGE INTO 语法,仅扫描和重写发生变化的文件便可,所以高效不少。

咱们评估一下这个方案,他的优势是仅依赖 Spark + Delta 架构简洁、没有在线服务、列存,分析速度很是快。优化以后的 MERGE INTO 语法速度也够快。

这个方案,业务上是一个 Copy On Write 的一个方案,它只须要 copy 少许的文件,可让延迟作的相对低。理论上,在更新的数据跟现有的存量没有很大重叠的话,能够把天级别的延迟作到小时级别的延迟,性能也是能够跟得上的。

这个方案在 Hive 仓库处理 upsert 数据的路上已经前进了一小步了。但小时级别的延迟毕竟不如实时更有效,所以这个方案最大的缺点在 Copy On Write 的 Merge 有必定的开销,延迟不能作的过低。

第一部分大概现有的方案就是这么多,同时还须要再强调一下,upsert 之因此如此重要,是由于在数据湖的方案中,upsert 是实现数据库准实时、实时入湖的一个关键技术点。

2、为什么选择 Flink + Iceberg

2.1 Flink 对 CDC 数据消费的支持

第一,Flink 原生支持 CDC 数据消费。在前文 Spark + Delta 的方案中,MARGE INTO 的语法,用户须要感知 CDC 的属性概念,而后写到 merge 的语法上来。可是 Flink 是原生支持 CDC 数据的。用户只要声明一个 Debezium 或者其余 CDC 的 format,Flink 上面的 SQL 是不须要感知任何 CDC 或者 upsert 的属性的。Flink 中内置了 hidden column 来标识它 CDC 的类型数据,因此对用户而言比较简洁。

以下图示例,在 CDC 的处理当中,Flink 在只用声明一个 MySQL Binlog 的 DDL 语句,后面的 select 都不用感知 CDC 属性。

2.2 Flink 对 Change Log Stream 的支持

下图介绍的是 Flink 原生支持 Change Log Stream,Flink 在接入一个 Change Log Stream 以后,拓扑是不用关心 Change Log flag 的 SQL。拓扑彻底是按照本身业务逻辑来定义,而且一直到最后写入 Iceberg,中间不用感知 Change Log 的 flag。

2.3 Flink + Iceberg CDC 导入方案评估

最后,Flink + Iceberg 的 CDC 导入方案的优势是什么?

对比以前的方案,Copy On Write 跟 Merge On Read 都有适用的场景,侧重点不一样。Copy On Write 在更新部分文件的场景中,当只须要重写其中的一部分文件时是很高效的,产生的数据是纯 append 的全量数据集,在用于数据分析的时候也是最快的,这是 Copy On Write 的优点。

另一个是 Merge On Read,即将数据连同 CDC flag 直接 append 到 Iceberg 当中,在 merge 的时候,把这些增量的数据按照必定的组织格式、必定高效的计算方式与全量的上一次数据进行一次 merge。这样的好处是支持近实时的导入和实时数据读取;这套计算方案的 Flink SQL 原生支持 CDC 的摄入,不须要额外的业务字段设计。

Iceberg 是统一的数据湖存储,支持多样化的计算模型,也支持各类引擎(包括 Spark、Presto、hive)来进行分析;产生的 file 都是纯列存的,对于后面的分析是很是快的;Iceberg 做为数据湖基于 snapshot 的设计,支持增量读取;Iceberg 架构足够简洁,没有在线服务节点,纯 table format 的,这给了上游平台方足够的能力来定制本身的逻辑和服务化。

3、如何实时写入读取

3.1 批量更新场景和 CDC 写入场景

首先咱们来了解一下在整个数据湖里面批量更新的两个场景。

  • 第一批量更新的这种场景,在这个场景中咱们使用一个 SQL 更新了成千上万行的数据,好比欧洲的 GDPR 策略,当一个用户注销掉本身的帐户以后,后台的系统是必须将这个用户全部相关的数据所有物理删除。
  • 第二个场景是咱们须要将 date lake 中一些拥有共同特性的数据删除掉,这个场景也是属于批量更新的一个场景,在这个场景中删除的条件多是任意的条件,跟主键(Primary key)没有任何关系,同时这个待更新的数据集是很是大,这种做业是一个长耗时低频次的做业。

另外是 CDC 写入的场景,对于对 Flink 来讲,通常经常使用的有两种场景,第一种场景是上游的 Binlog 可以很快速的写到 data lake 中,而后供不一样的分析引擎作分析使用; 第二种场景是使用 Flink 作一些聚合操做,输出的流是 upsert 类型的数据流,也须要可以实时的写到数据湖或者是下游系统中去作分析。以下图示例中 CDC 写入场景中的 SQL 语句,咱们使用单条 SQL 更新一行数据,这种计算模式是一种流式增量的导入,并且属于高频的更新。

3.2 Apache Iceberg 设计 CDC 写入方案须要考虑的问题

接下来咱们看下 iceberg 对于 CDC 写入这种场景在方案设计时须要考虑哪些问题。

  • 第一是正确性,即须要保证语义及数据的正确性,如上游数据 upsert 到 iceberg 中,当上游 upsert 中止后, iceberg 中的数据须要和上游系统中的数据保持一致。
  • 第二是高效写入,因为 upsert 的写入频率很是高,咱们须要保持高吞吐、高并发的写入。
  • 第三是快速读取,当数据写入后咱们须要对数据进行分析,这其中涉及到两个问题,第一个问题是须要支持细粒度的并发,看成业使用多个 task 来读取时能够保证为各个 task 进行均衡的分配以此来加速数据的计算;第二个问题是咱们要充分发挥列式存储的优点来加速读取。
  • 第四是支持增量读,例如一些传统数仓中的 ETL,经过增量读取来进行进一步数据转换。

3.3 Apache Iceberg Basic

在介绍具体的方案细节以前,咱们先了解一下 Iceberg 在文件系统中的布局,整体来说 Iceberg 分为两部分数据,第一部分是数据文件,以下图中的 parquet 文件,每一个数据文件对应一个校验文件(.crc文件)。第二部分是表元数据文件(Metadata 文件),包含 Snapshot 文件(snap-_.avro)、Manifest 文件(_.avro)、TableMetadata 文件(*.json)等。

下图展现了在 iceberg 中 snapshot、manifest 及 partition 中的文件的对应关系。下图中包含了三个 partition,第一个 partition 中有两个文件 f一、f3,第二个 partition 有两个文件f四、f5,第三个 partition 有一个文件f2。对于每一次写入都会生成一个 manifest 文件,该文件记录本次写入的文件与 partition 的对应关系。再向上层有 snapshot 的概念,snapshot 可以帮助快速访问到整张表的全量数据,snapshot 记录多个 manifest,如第二个 snapshot 包含 manifest2 和 manifest3。

3.4 INSERT、UPDATE、DELETE 写入

在了解了基本的概念,下面介绍 iceberg 中 insert、update、delete 操做的设计。

下图示例的 SQL 中展现的表包含两个字段即 id、data,两个字段都是 int 类型。在一个 transaction 中咱们进行了图示中的数据流操做,首先插入了(1,2)一条记录,接下来将这条记录更新为(1,3),在 iceberg 中 update 操做将会拆为 delete 和 insert 两个操做。

这么作的缘由是考虑到 iceberg 做为流批统一的存储层,将 update 操做拆解为 delete 和 insert 操做能够保证流批场景作更新时读取路径的统一,如在批量删除的场景下以 Hive 为例,Hive 会将待删除的行的文件 offset 写入到 delta 文件中,而后作一次 merge on read,由于这样会比较快,在 merge 时经过 position 将原文件和 delta 进行映射,将会很快获得全部未删除的记录。

接下来又插入记录(3,5),删除了记录(1,3),插入记录(2,5),最终查询是咱们获得记录(3,5)(2,5)。

上面操做看上去很是简单,但在实现中是存在一些语义上的问题。以下图中,在一个 transaction 中首先执行插入记录(1,2)的操做,该操做会在 data file1 文件中写入 INSERT(1,2),而后执行删除记录(1,2)操做,该操做会在 equalify delete file1 中写入 DELETE(1,2),接着又执行插入记录(1,2)操做,该操做会在 data file1 文件中再写入INSERT(1,2),而后执行查询操做。

在正常状况下查询结果应该返回记录 INSERT(1,2),但在实现中,DELETE(1,2)操做没法得知删除的是 data file1 文件中的哪一行,所以两行 INSERT(1,2)记录都将被删除。

那么如何来解决这个问题呢,社区当前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即经过指定一列或多列来进行删除操做,position-delete 是根据文件路径和行号来进行删除操做,经过将这两种方法结合起来以保证删除操做的正确性。

以下图咱们在第一个 transaction 中插入了三行记录,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),而后执行 commit 操做进行提交。接下来咱们开启一个新的 transaction 并执行插入一行数据(1,5),因为是新的 transaction,所以新建了一个 data file2 并写入 INSERT(1,5)记录,接下来执行删除记录(1,5),实际写入 delete 时是:

在 position delete file1 文件写入(file2, 0),表示删除 data file2 中第 0 行的记录,这是为了解决同一个 transaction 内同一行数据反复插入删除的语义的问题。
在 equality delete file1 文件中写入 DELETE (1,5),之因此写入这个 delete 是为了确保本次 txn 以前写入的 (1,5) 能被正确删除。

而后执行删除(1,4)操做,因为(1,4)在当前 transaction 中不曾插入过,所以该操做会使用 equality-delete 操做,即在 equality delete file1 中写入(1,4)记录。在上述流程中能够看出在当前方案中存在 data file、position delete file、equality delete file 三类文件。

在了解了写入流程后,如何来读取呢。以下图所示,对于 position delete file 中的记录(file2, 0)只需和当前 transaction 的 data file 进行 join 操做,对于 equality delete file 记录(1,4)和以前的 transaction 中的 data file 进行 join 操做。最终获得记录 INSERT(1,3)、INSERT(1,2)保证了流程的正确性。

3.5 Manifest 文件的设计

上面介绍了 insert、update 及 delete,但在设计 task 的执行计划时咱们对 manifest 进行了一些设计,目的是经过 manifest 可以快速到找到 data file,并按照数据大小进行分割,保证每一个 task 处理的数据尽量的均匀分布。

以下图示例,包含四个 transaction,前两个 transaction 是 INSERT 操做,对应 M一、M2,第三个 transaction 是 DELETE 操做,对应 M3,第四个 transaction 是 UPDATE 操做,包含两个 manifest 文件即 data manifest 和 delete manifest。

对于为何要对 manifest 文件拆分为 data manifest 和 delete manifest 呢,本质上是为了快速为每一个 data file 找到对应的 delete file 列表。能够看下图示例,当咱们在 partition-2 作读取时,须要将 deletefile-4 与datafile-二、datafile-3 作一个 join 操做,一样也须要将 deletefile-5 与 datafile-二、datafile-3 作一个 join 操做。

以 datafile-3 为例,deletefile 列表包含 deletefile-4 和 deletefile-5 两个文件,如何快速找到对应的 deletefIle 列表呢,咱们能够根据上层的 manifest 来进行查询,当咱们将 manifest 文件拆分为 data manifest 和 delete manifest 后,能够将 M2(data manifest)与 M三、M4(delete manifest)先进行一次 join 操做,这样即可以快速的获得 data file 所对应的 delete file 列表。

3.6 文件级别的并发

另外一个问题是咱们须要保证足够高的并发读取,在 iceberg 中这点作得很是出色。在 iceberg 中能够作到文件级别的并发读取,甚至文件中更细粒度的分段的并发读取,好比文件有 256MB,能够分为两个 128MB 进行并发读取。这里举例说明,假设 insert 文件跟 delete 文件在两个 Bucket 中的布局方式以下图所示。

咱们经过 manifest 对比发现,datafile-2 的 delete file 列表只有 deletefile-4,这样能够将这两个文件做为一个单独的 task(图示中Task-2)进行执行,其余的文件也是相似,这样能够保证每一个 task 数据较为均衡的进行 merge 操做。

对于这个方案咱们作了简单的总结,以下图所示。首先这个方案的优势能够知足正确性,而且能够实现高吞吐写入和并发高效的读取,另外能够实现 snapshot 级别的增量的拉取。

当前该方案仍是比较粗糙,下面也有一些能够优化的点。

  • 第一点,若是同一个 task 内的 delete file 有重复能够作缓存处理,这样能够提升 join 的效率。
  • 第二点,当 delete file 比较大须要溢写到磁盘时可使用 kv lib 来作优化,但这不依赖外部服务或其余繁重的索引。
  • 第三点,能够设计 Bloom filter(布隆过滤器)来过滤无效的 IO,由于对于 Flink 中经常使用的 upsert 操做会产生一个 delete 操做和一个 insert 操做,这会致使在 iceberg 中 data file 和 delete file 大小相差不大,这样 join 的效率不会很高。若是采用 Bloom Filter,当 upsert 数据到来时,拆分为 insert 和 delete 操做,若是经过 bloom filter 过滤掉那些以前没有 insert 过数据的 delete 操做(即若是这条数据以前没有插入过,则不须要将 delete 记录写入到 delete file 中),这将极大的提升 upsert 的效率。
  • 第四点,是须要一些后台的 compaction 策略来控制 delete file 文件大小,当 delete file 越少,分析的效率越高,固然这些策略并不会影响正常的读写。

3.7 增量文件集的 Transaction 提交

前面介绍了文件的写入,下图咱们介绍如何按照 iceberg 的语义进行写入而且供用户读取。主要分为数据和 metastore 两部分,首先会有 IcebergStreamWriter 进行数据的写入,但此时写入数据的元数据信息并无写入到 metastore,所以对外不可见。第二个算子是 IcebergFileCommitter,该算子会将数据文件进行收集, 最终经过 commit transaction 来完成写入。

在 Iceberg 中并无其余任何其余第三方服务的依赖,而 Hudi 在某些方面作了一些 service 的抽象,如将 metastore 抽象为独立的 Timeline,这可能会依赖一些独立的索引甚至是其余的外部服务来完成。

4、将来规划

下面是咱们将来的一些规划,首先是 Iceberg 内核的一些优化,包括方案中涉及到的全链路稳定性测试及性能的优化, 并提供一些 CDC 增量拉取的相关 Table API 接口。

在 Flink 集成上,会实现 CDC 数据的自动和手动合并数据文件的能力,并提供 Flink 增量拉取 CDC 数据的能力。

在其余生态集成上,咱们会对 Spark、Presto 等引擎进行集成,并借助 Alluxio 加速数据查询。

做者:阿里云实时计算Flink
原文连接本文为阿里云原创内容,未经容许不得转载