滴滴 Flink-1.10 升级之路

导读:滴滴实时计算引擎从 Flink-1.4 无缝升级到 Flink-1.10 版本,作到了彻底对用户透明。而且在新版本的指标、调度、SQL 引擎等进行了一些优化,在性能和易用性上相较旧版本都有很大提高。sql

这篇文章介绍了咱们升级过程当中遇到的困难和思考,但愿能给你们带来启发。json

1、 背景

在本次升级以前,咱们使用的主要版本为 Flink-1.4.2,而且在社区版本上进行了一些加强,提供了 StreamSQL 和低阶 API 两种服务形式。现有集群规模达到了 1500 台物理机,运行任务数超过 12000 ,日均处理数据 3 万亿条左右。数组

不过随着社区的发展,尤为是 Blink 合入 master 后有不少功能和架构上的升级,咱们但愿能经过版本升级提供更好的流计算服务。今年 2 月份,里程碑版本 Flink-1.10 发布,咱们开始在新版上上进行开发工做,踏上了充满挑战的升级之路。session

2、 Flink-1.10 新特性

做为 Flink 社区至今为止的最大的一次版本升级,加入的新特性解决了以前遇到不少的痛点。架构

1. 原生 DDL 语法与 Catalog 支持并发

Flink SQL 原生支持了 DDL 语法,好比 CREATE TABLE/CREATE FUNCTION,可使用 SQL 进行元数据的注册,而不须要使用代码的方式。函数

也提供了 Catalog 的支持,默认使用 InMemoryCatalog 将信息临时保存在内存中,同时也提供了 HiveCatalog 能够与 HiveMetastore 进行集成。也能够经过本身拓展 Catalog 接口实现自定义的元数据管理。性能

2.Flink SQL 的加强单元测试

  • 基于 ROW_NUMBER 实现的 TopN 和去重语法,拓展了 StreamSQL 的使用场景。
  • 实现了 BinaryRow 类型做为内部数据交互,将数据直接以二进制的方式构建而不是对象数组,好比使用一条数据中的某个字段时,能够只反序列其中部分数据,减小了没必要要的序列化开销。
  • 新增了大量内置函数,例如字符串处理、FIRST/LAST_VALUE 等等,因为不须要转换为外部类型,相较于自定义函数效率更高。
  • 增长了 MiniBatch 优化,经过微批的处理方式提高任务的吞吐

3.内存配置优化学习

以前对 Flink 内存的管理一直是一个比较头疼的问题,尤为是在使用 RocksDB 时,由于一个 TaskManager 中可能存在多个 RocksDB 实例,很差估算内存使用量,就致使常常发生内存超过限制被杀。

在新版上增长了一些内存配置,例如 state.backend.rocksdb.memory.fixed-per-slot 能够轻松限制每一个 slot的RocksDB 内存的使用上限,避免了 OOM 的风险。

3、挑战与应对

本次升级最大的挑战是,如何保证 StreamSQL 的兼容性。StreamSQL 的目的就是为了对用户屏蔽底层细节,可以更加专一业务逻辑,而咱们能够经过版本升级甚至更换引擎来提供更好的服务。保证任务的平滑升级是最基本的要求。

1. 内部 patch 如何兼容

因为跨越多个版本架构差距巨大,内部 patch 基本没法直接合入,须要在新版本上从新实现。咱们首先整理了全部的历史 commit,筛选出那些必要的修改而且在新版上进行从新实现,目的是能覆盖已有的全部功能,确保新版本能支持现有的全部任务需求。

例如:

  • 新增或修改 Connectors 以支持公司内部须要,例如 DDMQ(滴滴开源消息队列产品),权限认证功能等。
  • 新增 Formats 实现,例如 binlog,内部日志采集格式的解析等。
  • 增长 ADD JAR 语法,能够在 SQL 任务中引用外部依赖,好比 UDF JAR,自定义 Source/Sink。
  • 增长 SET 语法,能够在 SQL 中设置 TableConfig,指导执行计划的生成

2. StreamSQL 语法兼容

社区在 1.4 版本时,FlinkSQL还处于比较初始的阶段,也没有原生的 DDL 语法支持,咱们使用 Antlr 实现了一套自定义的 DDL 语法。可是在 Flink1.10 版本上,社区已经提供了原生的 DDL 支持,并且与咱们内部的语法差异较大。如今摆在咱们面前有几条路能够选择:

  • 放弃内部语法的支持,修改所有任务至新语法。(违背了平滑迁移的初衷,并且对已有用户学习成本高)
  • 修改 Flink 内语法解析的模块(sql-parser),支持对内部语法的解析。(实现较为复杂,且不利于后续的版本升级)
  • 在 sql-parser 之上封装一层语法转换层,将本来的 SQL 解析提取有效信息后,经过字符串拼接的方式组织成社区语法再运行。

最终咱们选用了第三种方案,这样能够最大限度的减小和引擎的耦合,做为插件运行,将来再有引擎升级彻底能够复用现有的逻辑,可以下降不少的开发成本。

例如:咱们在旧版本上使用 "json-path" 的库实现了 json 解析,经过在建表语句里定义相似 $.status 的表达式表示如何提取此字段。

新版本上原生的 json 类型解析可使用 ROW 类型来表示嵌套结构,在转换为新语法的过程当中,将本来的表达是解析为树并构建出新的字段类型,再使用计算列的方式提取出原始表中的字段,确保表结构与以前一致。类型名称、配置属性也经过映射转换为社区语法。

3. 兼容性测试

最后是测试阶段,须要进行完善的测试确保全部任务都能作到平滑升级。咱们本来的计划是准备进行回归测试,对已有的全部任务替换配置后进行回放,可是在实际操做中有不少问题:

  • 测试流程过长,一次运行可能须要数个小时。
  • 出现问题时很差定位,可能发生在任务的整个生命周期的任何阶段。
  • 没法验证计算结果,即新旧版本语义是否一致

因此咱们按任务的提交流程分红多个阶段进行测试,只有在当前阶段可以所有测试经过后后进入下一个阶段测试,提早发现问题,将问题定位范围缩小到当前阶段,提升测试效率。

  • 转换测试:对全部任务进行转换,测试结果符合预期,抽象典型场景为单元测试。
  • 编译测试:确保全部任务能够经过 TablePlanner 生成执行计划,再编译成 JobGraph,真正提交运行前结束。
  • 回归测试:在测试环境对任务替换配置后进行回放,确认任务能够提交运行
  • 对照测试:对采样数据以文件的形式提交至新旧两个版本中运行,对比结果是否彻底一致(由于部分任务结果不具备肯定性,因此使用旧版本连续运行 2 次,筛选出肯定性任务,做为测试用例)

4、引擎加强

除了对旧版本的兼容,咱们也结合了新版本的特性,对引擎进行了加强。

1. Task-Load 指标

咱们一直但愿能精确衡量任务的负载情况,使用反压指标指标只能粗略的判断任务的资源够或者不够。

结合新版的 Mailbox 线程模型,全部互斥操做所有运行在 TaskThread 中,只需统计出线程的占用时间,就能够精确计算任务负载的百分比。

将来可使用指标进行任务的资源推荐,让任务负载维持在一个比较健康的水平。

2. SubTask 均衡调度

在 FLIP-6 后,Flink 修改了资源调度模型,移除了--container 参数,slot 按需申请确保不会有闲置资源。可是这也致使了一个问题,Source 的并发数经常是小于最大并发数的,而 SubTask 调度是按 DAG 的拓扑顺序调度,这样 SourceTask 就会集中在某些 TaskManager 中致使热点。

咱们加入了"最小 slot 数"的配置,保证在 Flink session 启动后当即申请相应数量的 slot,且闲置时也不主动退出,搭配 cluster.evenly-spread-out-slots 参数能够保证在 slot 数充足的状况下,SubTask 会均匀分布在全部的 TaskManager 上。

3. 窗口函数加强

以滚动窗口为例 TUMBLE(time_attr, INTERVAL '1' DAY),窗口为一天时开始和结束时间固定为天天 0 点 -24 点,没法作到生产天天 12 点-第二天 12 点的窗口。

对于代码能够经过指定偏移量实现,可是 SQL 目前还未实现,经过增长参数 TUMBLE(time_attr, INTERVAL '1' DAY, TIME '12:00:00') 表示偏移时间为 12 小时。

还有另一种场景,好比统计一天的 UV,同时但愿展现当前时刻的计算结果,例如每分钟触发窗口计算。对于代码开发的方式能够经过自定义 Trigger 的方式决定窗口的触发逻辑,并且 Flink 也内置了一些 Tigger 实现,好比 ContinuousTimeTrigger 就很适合这种场景。因此咱们又在窗口函数里增长了一种可选参数,表明窗口的触发周期,TUMBLE(time_attr, INTERVAL '1' DAY, INTERVAL '1' MINUTES) 。

经过增长 offset 和 tiggger 周期参数(TUMBLE(time_attr, size[,offset_time][,trigger_interval])),拓展了 SQL 中窗口的使用场景,相似上面的场景能够直接使用 SQL 开发而不须要使用代码的方式。

4. RexCall 结果复用

在不少 SQL 的使用场景里,会屡次使用上一个计算结果,好比将 JSON 解析成 Map 并提取多个字段 。

虽然经过子查询,看起来 json 解析只调用一次,可是通过引擎的优化后,经过结果表的投影 (Projection) 生成函数调用链 (RexCall),结果相似:

这样会致使 json 解析的计算重复运行了3次,即便使用视图分割成两步操做,通过 Planner 的优化同样会变成上边的样子。

对于肯定性 (isDeterministic=true) 的函数来讲,相同的输入必定表明相同的结果,重复执行 3 次 json 解析实际上是没有意义的,如何优化才能实现对函数结果的复用呢?

在代码生成时,将 RexCall 生成的惟一标识(Digest)和变量符号的映射保存在 CodeGenContext 中,若是遇到 Digest 相同的函数调用,则能够复用已经存在的结果变量,这样解析 JSON 只须要执行第一次,以后就能够复用第一次的结果。

5、总结

经过几个月的努力,新版本已经上线运行,而且做为 StreamSQL 的默认引擎,任务重启后直接使用新版本运行。兼容性测试的经过率达到 99.9%,能够基本作到对用户的透明升级。对于新接触 StreamSQL 用户可使用社区 SQL 语法进行开发,已有任务也能够修改 DML 部分语句来使用新特性。如今新版本已经支持了公司内许多业务场景,例如公司实时数据仓库团队依托于新版本更强的表达能力和性能,承接了多种多样的数据需求作到稳定运行且与离线口径保持一致。

版本升级不是咱们的终点,随着实时计算的发展,公司内也有愈来愈多的团队须要使用 Flink 引擎, 也向咱们提出了更多的挑战,例如与 Hive 的整合作到将结果直接写入 Hive 或直接使用 Flink 做为批处理引擎,这些也是咱们探索和发展的方向,经过不断的迭代向用户提供更加简单好用的流计算服务。

做者|Alan

原文连接

本文为阿里云原创内容,未经容许不得转载。

相关文章
相关标签/搜索