本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包括:git
- 背景及痛点
- Flink + Iceberg 的落地
- Iceberg 优化实践
- 后续工做
- 收益及总结
同程艺龙是一个提供机票、住宿、交通等服务的在线旅游服务平台,目前我所在的部门属于公司的研发部门,主要职责是为公司内其余业务部门提供一些基础服务,咱们的大数据系统主要承接的业务是部门内的一些大数据相关的数据统计、分析工做等。数据来源有网关日志数据、服务器监控数据、K8s 容器的相关日志数据,App 的打点日志, MySQL 的 binlog 日志等。咱们主要的大数据任务是基于上述日志构建实时报表,提供基于 Presto 的报表展现和即时查询服务,同时也会基于 Flink 开发一些实时、批处理任务,为业务方提供准确及时的数据支撑。github
因为咱们全部的原始数据都是存储在 Kafka 的,因此原来的技术架构就是首先是 Flink 任务消费 Kafka 的数据,通过 Flink SQL 或者 Flink jar 的各类处理以后实时写入 Hive,其中绝大部分任务都是 Flink SQL 任务,由于我认为 SQL 开发相对代码要简单的多,而且维护方便、好理解,因此能用 SQL 写的都尽可能用 SQL 来写。
提交 Flink 的平台使用的是 Zeppelin,其中提交 Flink SQL 任务是 Zeppelin 自带的功能,提交 jar 包任务是我本身基于 Application 模式开发的 Zeppelin 插件。
对于落地到 Hive 的数据,使用开源的报表系统 metabase (底层使用 Presto) 提供实时报表展现、定时发送邮件报表,以及自定义 SQL 查询服务。因为业务对数据的实时性要求比较高,但愿数据能尽快的展现出来,因此咱们不少的 Flink 流式任务的 checkpoint 设置为 1 分钟,数据格式采用的是 orc 格式。sql
因为采用的是列式存储格式 ORC,没法像行式存储格式那样进行追加操做,因此不可避免的产生了一个大数据领域很是常见且很是棘手的问题,即 HDFS 小文件问题。apache
开始的时候咱们的小文件解决方案是本身写的一个小文件压缩工具,按期去合并,咱们的 Hive 分区通常都是天级别的,因此这个工具的原理就是天天凌晨启动一个定时任务去压缩昨天的数据,首先把昨天的数据写入一个临时文件夹,压缩完,和原来的数据进行记录数的比对检验,数据条数一致以后,用压缩后的数据覆盖原来的数据,可是因为没法保证事务,因此出现了不少问题:服务器
因此基于以上的 HDFS 小文件、查询慢等问题,结合咱们的现状,我调研了目前市面上的数据湖技术:Delta、Apache Iceberg 和 Apache Hudi,考虑了目前数据湖框架支持的功能和之后的社区规划,最终咱们是选择了 Iceberg,其中考虑的缘由有如下几方面:架构
前面讲到,咱们的绝大部分任务都是 Flink 任务,包括批处理任务和流处理任务,目前这三个数据湖框架,Iceberg 是集成 Flink 作的最完善的,若是采用 Iceberg 替代 Hive 以后,迁移的成本很是小,对用户几乎是无感知的,
好比咱们原来的 SQL 是这样的:并发
INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table
迁移到 Iceberg 之后,只须要修改 catalog 就行。框架
INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table
Presto 查询也是和这个相似,只须要修改 catalog 就好了。运维
在 Iceberg 的设计架构中,manifest 文件存储了分区相关信息、data files 的相关统计信息(max/min)等,去查询一些大的分区的数据,就能够直接定位到所要的数据,而不是像 Hive 同样去 list 整个 HDFS 文件夹,时间复杂度从 O(n) 降到了 O(1),使得一些大的查询速度有了明显的提高,在 Iceberg PMC Chair Ryan Blue 的演讲中,咱们看到命中 filter 的任务执行时间从 61.5 小时降到了 22 分钟。jvm
Flink CDC 提供了直接读取 MySQL binlog 的方式,相对之前须要使用 canal 读取 binlog 写入 Iceberg,而后再去消费 Iceberg 数据。少了两个组件的维护,链路减小了,节省了维护的成本和出错的几率。而且能够实现导入全量数据和增量数据的完美对接,因此使用 Flink SQL 将 MySQL binlog 数据导入 Iceberg 来作 MySQL->Iceberg 的导入将会是一件很是有意义的事情。
此外对于咱们最初的压缩小文件的需求,虽然 Iceberg 目前还没法实现自动压缩,可是它提供了一个批处理任务,已经能知足咱们的需求。
目前咱们的全部数据都是存储在 Hive 表的,在验证完 Iceberg 以后,咱们决定将 Hive 的数据迁移到 Iceberg,因此我写了一个工具,可使用 Hive 的数据,而后新建一个 Iceberg 表,为其创建相应的元数据,可是测试的时候发现,若是采用这种方式,须要把写入 Hive 的程序中止,由于若是 Iceberg 和 Hive 使用同一个数据文件,而压缩程序会不断地压缩 Iceberg 表的小文件,压缩完以后,不会立刻删除旧数据,因此 Hive 表就会查到双份的数据,故咱们采用双写的策略,原来写入 Hive 的程序不动,新启动一套程序写入 Iceberg,这样能对 Iceberg 表观察一段时间。还能和原来 Hive 中的数据进行比对,来验证程序的正确性。
通过一段时间观察,天天将近几十亿条数据、压缩后几个 T 大小的 Hive 表和 Iceberg 表,一条数据也不差。因此在最终对比数据没有问题以后,把 Hive 表中止写入,使用新的 Iceberg 表。
我将这个 Hive 表迁移 Iceberg 表的工具作成了一个基于 Flink batch job 的 Iceberg Action,提交了社区,不过目前还没合并:https://github.com/apache/ice...。这个功能的思路是使用 Hive 原始的数据不动,而后新建一个 Iceberg table,再为这个新的 Iceberg table 生成对应的元数据,你们有须要的话能够先看看。
此外,Iceberg 社区,还有一个把现有的数据迁移到已存在的 Iceberg table 的工具,相似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存储过程作的,你们也能够关注下:https://github.com/apache/ice...
目前压缩小文件是采用的一个额外批任务来进行的,Iceberg 提供了一个 Spark 版本的 action,我在作功能测试的时候发现了一些问题,此外我对 Spark 也不是很是熟悉,担忧出了问题很差排查,因此参照 Spark 版本的本身实现了一个 Flink 版本,并修复了一些 bug,进行了一些功能的优化。
因为咱们的 Iceberg 的元数据都是存储在 Hive 中的,也就是咱们使用了 HiveCatalog,因此压缩程序的逻辑是把 Hive 中全部的 Iceberg 表所有都查出来,依次压缩。压缩没有过滤条件,无论是分区表仍是非分区表,都进行全表的压缩,这样作是为了处理某些使用 eventtime 的 Flink 任务。若是有延迟的数据的到来,就会把数据写入之前的分区,若是不是全表压缩只压缩当天分区的话,新写入的其余天的数据就不会被压缩。
之因此没有开启定时任务来压缩,是由于好比定时五分钟压缩一个表,若是五分钟以内这个压缩任务没完成,没有提交新的 snapshot,下一个定时任务又开启了,就会把上一个没有完成的压缩任务中的数据从新压缩一次,因此每一个表依次压缩的策略能够保证某一时刻一个表只有一个任务在压缩。
代码示例参考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();
目前系统运行稳定,已经完成了几万次任务的压缩。
注意:
不过目前对于新发布的 Iceberg 0.11 来讲,还有一个已知的 bug,即当压缩前的文件大小大于要压缩的大小(targetSizeInBytes)时,会形成数据丢失,其实这个问题我在最开始测试小文件压缩的时候就发现了,而且提了一个 pr,个人策略是大于目标文件的数据文件不参与压缩,不过这个 pr 没有合并到 0.11 版本中,后来社区另一个兄弟也发现了相同的问题,提交了一个 pr( https://github.com/apache/ice... ) ,策略是将这个大文件拆分到目标文件大小,目前已经合并到 master,会在下一个 bug fix 版本 0.11.1 中发布。
目前对于定时调度中的批处理任务,Flink 的 SQL 客户端还没 Hive 那样作的很完善,好比执行 hive-f 来执行一个文件。并且不一样的任务须要不一样的资源,并行度等。
因此我本身封装了一个 Flink 程序,经过调用这个程序来进行处理,读取一个指定文件里面的 SQL,来提交批任务。在命令行控制任务的资源和并行度等。
/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
批任务的查询这块,我作了一些优化工做,好比 limit 下推,filter 下推,查询并行度推断等,能够大大提升查询的速度,这些优化都已经推回给社区,而且在 Iceberg 0.11 版本中发布。
在使用 Iceberg 的过程当中,有时候会有这样的状况,我提交了一个 Flink 任务,因为各类缘由,把它停了,这个时候 Iceberg 还没提交相应的快照。此外因为一些异常致使程序失败,会产生一些不在 Iceberg 元数据里面的孤立的数据文件,这些文件对 Iceberg 来讲是不可达的,也是没用的。因此咱们须要像 jvm 的垃圾回收同样来清理这些文件。
目前 Iceberg 提供了一个 Spark 版本的 action 来处理这些没用的文件,咱们采起的策略和压缩小文件同样,获取 Hive 中的全部的 Iceberg 表。每隔一个小时执行一次定时任务来删除这些没用的文件。
SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
咱们在程序运行过程当中出现了正常的数据文件被删除的问题,通过调研,因为快照保留设置是一小时,这个清理程序清理时间也是设置一个小时,经过日志发现是这个清理程序删除了正常的数据。查了查代码,应该是设置了同样的时间,在清理孤立文件的时候,有其余程序正在读取要 expired 的 snapshot,致使删除了正常的数据。最后把这个清理程序的清理时间改为默认的三天,没有再出现删除数据文件的问题。
固然,为了保险起见,咱们能够覆盖原来的删除文件的方法,改为将文件到一个备份文件夹,检查没有问题以后,手工删除。
咱们的快照过时策略,是和压缩小文件的批处理任务写在一块儿的,压缩完小文件以后,进行表的快照过时处理,目前保留的时间是一个小时。这是由于对于有一些比较大的表,分区比较多,并且 checkpoint 比较短,若是保留的快照过长的话,仍是会保留过多小文件,咱们暂时没有查询历史快照的需求,因此我将快照的保留时间设置了一个小时。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();
写入了数据以后,当想查看相应的快照有多少数据文件时,直接查询 Spark 没法知道哪一个是有用的,哪一个是没用的。因此须要有对应的管理工具。目前 Flink 这块还不太成熟,咱们可使用 Spark3 提供的工具来查看。
目前 create table 这些操做咱们是经过 Flink SQL Client 来作的。其余相关的 DDL 的操做可使用 Spark 来作:https://iceberg.apache.org/sp...
一些相关的数据的操做,好比删除数据等能够经过 MySQL 来实现,Presto 目前只支持分区级别的删除功能。
在咱们操做 Hive 的时候,有一些很经常使用的操做,好比 show partitions、 show create table 等,这些目前 Flink 尚未支持,因此在操做 Iceberg 的时候就很不方便,咱们本身基于 Flink 1.12 作 了修改,不过目前尚未彻底提交到社区,后续有时间会提交到 Flink 和 Iceberg 社区。
目前在咱们内部的版本中,我已经测试经过可使用 Flink SQL 将 CDC 数据(好比 MySQL binlog)写入 Iceberg,社区的版本中实现该功能还须要作一些工做,我也提交了一些相关的 PR 来推动这个工做。
对于 copy-on-write 表,咱们可使用 Spark SQL 来进行行级的删除和更新。具体的支持的语法能够参考源码中的测试类:
org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,这些功能我在测试环境测试是能够的,可是尚未来得及更新到生产。
在工做中会有一些这样的场景,因为数据比较大,Iceberg 的数据只存了较短的时间,若是很不幸由于程序写错了等缘由,想从更早的时间来消费就无能为力了。
当引入了 Iceberg 的 streaming read 以后,这些问题就能够解决了,由于 Iceberg 存储了全部的数据,固然这里有一个前提就是对于数据没有要求特别精确,好比达到秒级别,由于目前 Flink 写入 Iceberg 的事务提交是基于 Flink Checkpoint 间隔的。
通过对 Iceberg 大概一个季度的调研,测试,优化和 bug 修复,咱们将现有的 Hive 表都迁移到了 Iceberg,完美解决了原来的全部的痛点问题,目前系统稳定运行,并且相对 Hive 获得了不少的收益:
举一个例子,默认配置下,原来一个 flink 读取 kafka 写入 hive 的任务,须要60个并行度才不会让 Kafka 产生积压。改为写入 iceberg 以后,只须要20个并行度就够了。
前面咱们讲到 Iceberg 查询的时候不会像 Hive 同样去 list 整个文件夹来获取分区数据,而是先从 manifest 文件中获取相关数据,查询的性能获得了显著的提高,一些大的报表的查询速度从 50 秒提升到 30 秒。
因为 Iceberg 的事务支持,咱们能够实现对一个表进行并发读写,Flink 流式数据实时入湖,压缩程序同时压缩小文件,清理过时文件和快照的程序同时清理无用的文件,这样就能更及时的提供数据,作到分钟级的延迟,查询最新分区数据的速度大大加快了,而且因为 Iceberg 的 ACID 特性能够保证数据的准确性。
能够回溯查询之前某一时刻的数据。
总结一下,咱们目前能够实现使用 Flink SQL 对 Iceberg 进行批、流的读写,并能够对小文件进行实时的压缩,使用 Spark SQL 作一些 delete 和 update 工做以及一些 DDL 操做,后续可使用 Flink SQL 将 CDC 的数据写入 Iceberg。目前对 Iceberg 的全部的优化和 bug fix,我已经贡献给社区。因为笔者水平有限,有时候也不免有错误,还请你们不吝赐教。
做者介绍:张军,同程艺龙大数据开发工程师