解决问题 1474 个,Flink 1.11 究竟有哪些易用性上的改善?

7月7日,Flink 1.11.0 正式发布了,做为这个版本的 release manager 之一,我想跟你们分享一下其中的经历感觉以及一些表明性 feature 的解读。在进入深度解读前,咱们先简单了解下社区发布的通常流程,帮助你们更好的理解和参与 Flink 社区的工做。数据库

  • 首先在每一个版本的规划初期,会从志愿者中选出 1-2 名做为 Release Manager。1.11.0 版本我做为中国这边的 Release Manager,同时还有一名来自 Ververica 的 Piotr Nowojski 做为德国方的 Release Manager,这在某种程度上也说明中国的开发者和贡献度在整个社区的占比很重要。
  • 接下来会进行这个版本的 Feature Kickoff。在一些大的方向上,社区的规划周期可能比较久,会分阶段、分步骤跨越多个版本完成,确保质量。每一个版本的侧重点也会有所不一样,好比前两个版本侧重于批处理的增强,而这个版本更侧重于流处理易用性的提高。社区规划的 Feature 列表会在邮件列表中发起讨论,以收集更多的用户/开发者意见和反馈。
  • 通常的开发周期为 2-3 个月时间,提早会明确规划出大概的 Feature Freeze 时间,以后进行 Release Candidate 的发布和测试、以及 Bug Fix。通常通过几轮的迭代周期后会正式投票经过一个相对稳定的 Candidate 版本,而后基于这个版本正式发布。

Flink 1.11.0 从 3 月初的功能规划到 7 月初的正式发布,历经了差很少 4 个月的时间,对 Flink 的生态、易用性、生产可用性、稳定性等方面都进行了加强和改善,下面将一一跟你们分享。json

一  综述

Flink 1.11.0 从 Feature 冻结后发布了 4 次 Candidate 才最终经过。经统计,一共有 236 个贡献者参与了此次版本开发,解决了 1474 个 Jira 问题,涉及 30 多个 FLIP,提交了 2325 个 Commit。

缓存

纵观近五次版本发布,能够看出从 1.9.0 开始 Flink 进入了一个快速发展阶段,各个维度指标相比以前都有了几乎翻倍的提升。也是从 1.9.0 开始阿里巴巴内部的 Blink 项目开始被开源 Flink 整合,到 1.10.0 通过两个大版本已经所有整合完毕,对 Flink 从生态建设、功能性、性能和生产稳定性上都有了大幅的加强。 网络

Flink 1.11.0 版本的最初定位是重点解决易用性问题,提高用户业务的生产使用体验,总体上不作大的架构调整和功能开发,倾向于快速迭代的小版本开发。可是从上面统计的各个指标来看,所谓的“小版本”在各个维度的数据也丝绝不逊色于前两个大版本,解决问题的数量和参与的贡献者人数也在持续增长,其中来自中国的贡献者比例达到 62%。架构

下面咱们会深度剖析 Flink 1.11.0 带来了哪些让你们期待已久的特性,从用户直接使用的 API 层一直到执行引擎层,咱们都会选择一些有表明性的 Feature 从不一样维度解读,更完整的 Feature 列表请你们关注发布的 Release Blog。并发

二  生态完善和易用性提高

这两个维度在某种程度上是相辅相成的,很难严格区分开,生态兼容上的缺失经常形成使用上的不便,提高易用性的过程每每也是不断完善相关生态的过程。在这方面用户感知最明显的应该就是 Table & SQL API 层面的使用。app

1  Table & SQL 支持 Change Data Capture(CDC)

CDC 被普遍使用在复制数据、更新缓存、微服务间同步数据、审计日志等场景,不少公司都在使用开源的 CDC 工具,如 MySQL CDC。经过 Flink 支持在 Table & SQL 中接入和解析 CDC 是一个强需求,在过往的不少讨论中都被说起过,能够帮助用户以实时的方式处理 Changelog 流,进一步扩展 Flink 的应用场景,例如把 MySQL 中的数据同步到 PG 或 ElasticSearch 中,低延时的 Temporal Join 一个 Changelog 等。框架

除了考虑到上面的真实需求,Flink 中定义的“Dynamic Table”概念在流上有两种模型:Append 模式和 Update 模式。经过 Append 模式把流转化为“Dynamic Table”在以前的版本中已经支持,所以在 1.11.0 中进一步支持 Update 模式也从概念层面完整的实现了“Dynamic Table”。
机器学习

为了支持解析和输出 Changelog,如何在外部系统和 Flink 系统之间编解码这些更新操做是首要解决的问题。考虑到 Source 和 Sink 是衔接外部系统的一个桥梁,所以 FLIP-95 在定义全新的 Table Source 和 Table Sink 接口时解决了这个问题。异步

在公开的 CDC 调研报告中,Debezium 和 Canal 是用户中最流行使用的 CDC 工具,这两种工具用来同步 Changelog 到其它的系统中,如消息队列。据此,FLIP-105 首先支持了 Debezium 和 Canal 这两种格式,并且 Kafka Source 也已经能够支持解析上述格式并输出更新事件,在后续的版本中会进一步支持 Avro(Debezium) 和 Protobuf(Canal)。

CREATE TABLE my_table (  
...) WITH (  
'connector'='...', -- e.g. 'kafka'  
'format'='debezium-json',  
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)  
'debezium-json.ignore-parse-errors'='true' -- default: false
);

2  Table & SQL 支持 JDBC Catalog

1.11.0 以前,用户若是依赖 Flink 的 Source/Sink 读写关系型数据库或读取 Changelog 时,必需要手动建立对应的 Schema。并且当数据库中的 Schema 发生变化时,也须要手动更新对应的 Flink 做业以保持一致和类型匹配,任何不匹配都会形成运行时报错使做业失败。用户常常抱怨这个看似冗余且繁琐的流程,体验极差。

实际上对于任何和 Flink 链接的外部系统均可能有相似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。FLIP-93 提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户使用 Flink SQL 时能够自动获取表的 Schema 而再也不须要输入 DDL。除此以外,任何 Schema 不匹配的错误都会在编译阶段提早进行检查报错,避免了以前运行时报错形成的做业失败。这是提高易用性和用户体验的一个典型例子。

3  Hive 实时数仓

从 1.9.0 版本开始 Flink 从生态角度致力于集成 Hive,目标打造批流一体的 Hive 数仓。通过前两个版本的迭代,已经达到了 Batch 兼容且生产可用,在 TPC-DS 10T Benchmark 下性能达到 Hive 3.0 的 7 倍以上。

1.11.0 在 Hive 生态中重点实现了实时数仓方案,改善了端到端流式 ETL 的用户体验,达到了批流一体 Hive 数仓的目标。同时在兼容性、性能、易用性方面也进一步进行了增强。

在实时数仓的解决方案中,凭借 Flink 的流式处理优点作到实时读写 Hive:

  • Hive 写入:FLIP-115 完善扩展了 FileSystem Connector 的基础能力和实现,Table/SQL 层的 sink 能够支持各类格式(CSV、Json、Avro、Parquet、ORC),并且支持 Hive Table 的全部格式。
  • Partition 支持:数据导入 Hive 引入 Partition 提交机制来控制可见性,经过sink.partition-commit.trigger 控制 Partition 提交的时机,经过 sink.partition-commit.policy.kind 选择提交策略,支持 SUCCESS 文件和 Metastore 提交。
  • Hive 读取:实时化的流式读取 Hive,经过监控 Partition 生成增量读取新 Partition,或者监控文件夹内新文件生成来增量读取新文件。

在 Hive 可用性方面的提高:

  • FLIP-123 经过 Hive Dialect 为用户提供语法兼容,这样用户无需在 Flink 和 Hive 的 CLI 之间切换,能够直接迁移 Hive 脚本到 Flink 中执行。
  • 提供 Hive 相关依赖的内置支持,避免用户本身下载所需的相关依赖。如今只须要单独下载一个包,配置 HADOOP_CLASSPATH 就能够运行。

在 Hive 性能方面,1.10.0 中已经支持了 ORC(Hive 2+)的向量化读取,1.11.0 中咱们补全了全部版本的 Parquet 和 ORC 向量化支持来提高性能。

4  全新 Source API

前面也提到过,Source 和 Sink 是 Flink 对接外部系统的一个桥梁,对于完善生态、可用性及端到端的用户体验是很重要的环节。社区早在一年前就已经规划了 Source 端的完全重构,从 FLIP-27 的 ID 就能够看出是很早的一个 Feature。可是因为涉及到不少复杂的内部机制和考虑到各类 Source Connector 的实现,设计上须要考虑的很全面。从 1.10.0 就开始作 POC 的实现,最终遇上了 1.11.0 版本的发布。

先简要回顾下 Source 以前的主要问题:

  • 对用户而言,在 Flink 中改造已有的 Source 或者从新实现一个生产级的 Source Connector 不是一件容易的事情,具体体如今没有公共的代码能够复用,并且须要理解不少 Flink 内部细节以及实现具体的 Event Time 分配、Watermark 产出、Idleness 监测、线程模型等。
  • 批和流的场景须要实现不一样的 Source。
  • Partitions/Splits/Shards 概念在接口中没有显式表达,好比 Split 的发现逻辑和数据消费都耦合在 Source Sunction 的实现中,这样在实现 Kafka 或 Kinesis 类型的 Source 时增长了复杂性。
  • 在 Runtime 执行层,Checkpoint 锁被 Source Function 抢占会带来一系列问题,框架很难进行优化。

FLIP-27 在设计时充分考虑了上述的痛点:

  • 首先在 Job Manager 和 Task Manager 中分别引入两种不一样的组件 Split Enumerator 和 Source Reader,解耦 Split 发现和对应的消费处理,同时方便随意组合不一样的策略。好比现有的 Kafka Connector 中有多种不一样的 Partition 发现策略和实现耦合在一块儿,在新的架构下,咱们只须要实现一种 Source Reader,就能够适配多种 Split Enumerator 的实现来对应不一样的 Partition 发现策略。
  • 在新架构下实现的 Source Connector 能够作到批流统一,惟一的小区别是对批场景的有限输入,Split Enumerator 会产出固定数量的 Split 集合而且每一个 Split 都是有限数据集;对于流场景的无限输入,Split Enumerator 要么产出无限多的 Split 或者 Split 自身是无限数据集。
  • 复杂的 Timestamp Assigner 以及 Watermark Generator 透明的内置在 Source Reader 模块内运行,对用户来讲是无感知的。这样用户若是想实现新的 Source Connector,通常再也不须要重复实现这部分功能。

目前 Flink 已有的 Source Connector 会在后续的版本中基于新架构来从新实现,Legacy Source 也会继续维护几个版本保持兼容性,用户也能够按照 Release 文档中的说明来尝试体验新 Source 的开发。

5  PyFlink 生态

众所周知,Python 语言在机器学习和数据分析领域有着普遍的使用。Flink 从 1.9.0 版本开始发力兼容 Python 生态,Python 和 Flink 协力为 PyFlink,把 Flink 的实时分布式处理能力输出给 Python 用户。前两个版本 PyFlink 已经支持了 Python Table API 和 UDF,在 1.11.0 中扩大对 Python 生态库 Pandas 的支持以及和 SQL DDL/Client 的集成,同时 Python UDF 性能有了极大的提高。

具体来讲,以前普通的 Python UDF 每次调用只能处理一条数据,并且在 Java 端和 Python 端都须要序列化/反序列化,开销很大。1.11.0 中 Flink 支持在 Table & SQL 做业中自定义和使用向量化 Python UDF,用户只须要在 UDF 修饰中额外增长一个参数 udf_type=“pandas” 便可。这样带来的好处是:

  • 每次调用能够处理 N 条数据。
  • 数据格式基于 Apache Arrow,大大下降了 Java、Python 进程之间的序列化/反序列化开销。
  • 方便 Python 用户基于 Numpy 和 Pandas 等数据分析领域经常使用的 Python 库,开发高性能的 Python UDF。

除此以外,1.11.0 中 PyFlink 还支持:

  • PyFlink table 和 Pandas DataFrame 之间无缝切换(FLIP-120),加强 Pandas 生态的易用性和兼容性。
  • Table & SQL 中能够定义和使用 Python UDTF(FLINK-14500),再也不必需 Java/Scala UDTF。
  • Cython 优化 Python UDF 的性能(FLIP-121),对比 1.10.0 能够提高 30 倍。
  • Python UDF 中用户自定义 Metric(FLIP-112),方便监控和调试 UDF 的执行。

上述解读的都是侧重 API 层面,用户开发做业能够直接感知到的易用性的提高。下面咱们看看执行引擎层在 1.11.0 中都有哪些值得关注的变化。

三  生产可用性和稳定性提高

1  支持 Application 模式和 Kubernetes 加强

1.11.0 版本前,Flink 主要支持以下两种模式运行:

  • Session 模式:提早启动一个集群,全部做业都共享这个集群的资源运行。优点是避免每一个做业单独启动集群带来的额外开销,缺点是隔离性稍差。若是一个做业把某个 Task Manager(TM)容器搞挂,会致使这个容器内的全部做业都跟着重启。虽然每一个做业有本身独立的 Job Manager(JM)来管理,可是这些 JM 都运行在一个进程中,容易带来负载上的瓶颈。
  • Per-job 模式:为了解决 Session 模式隔离性差的问题,每一个做业根据资源需求启动独立的集群,每一个做业的 JM 也是运行在独立的进程中,负载相对小不少。

以上两种模式的共同问题是须要在客户端执行用户代码,编译生成对应的 Job Graph 提交到集群运行。在这个过程须要下载相关 Jar 包并上传到集群,客户端和网络负载压力容易成为瓶颈,尤为当一个客户端被多个用户共享使用。

1.11.0 中引入了 Application 模式(FLIP-85)来解决上述问题,按照 Application 粒度来启动一个集群,属于这个 Application 的全部 Job 在这个集群中运行。核心是 Job Graph 的生成以及做业的提交不在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会分散到集群中,再也不有上述 Client 单点上的瓶颈。

用户能够经过 bin/flink run-application 来使用 Application 模式,目前 Yarn 和  Kubernetes(K8s)都已经支持这种模式。Yarn application 会在客户端将运行做业须要的依赖都经过 Yarn Local Resource 传递到 JM。K8s Application 容许用户构建包含用户 Jar 与依赖的镜像,同时会根据做业自动建立 TM,并在结束后销毁整个集群,相比 Session 模式具备更好的隔离性。K8s 再也不有严格意义上的 Per-Job 模式,Application 模式至关于 Per-Job 在集群进行提交做业的实现。

除了支持 Application 模式,Flink 原生 K8s 在 1.11.0 中还完善了不少基础的功能特性(FLINK-14460),以达到生产可用性的标准。例如 Node Selector、Label、Annotation、Toleration 等。为了更方便的与 Hadoop 集成,也支持根据环境变量自动挂载 Hadoop 配置的功能。

2  Checkpoint & Savepoint 优化

Checkpoint 和 Savepoint 机制一直是 Flink 保持先进性的核心竞争力之一,社区在这个领域的改动很谨慎,最近的几个大版本中几乎没有大的功能和架构上的调整。在用户邮件列表中,咱们常常能看到用户反馈和抱怨的相关问题:好比 Checkpoint 长时间作不出来失败,Savepoint 在做业重启后不可用等等。1.11.0 有选择的解决了一些这方面的常见问题,提升生产可用性和稳定性。

1.11.0 以前, Savepoint 中 Meta 数据和 State 数据分别保存在两个不一样的目录中,这样若是想迁移 State 目录很难识别这种映射关系,也可能致使目录被误删除,对于目录清理也一样有麻烦。1.11.0 把两部分数据整合到一个目录下,这样方便总体转移和复用。另外,以前 Meta 引用 State 采用的是绝对路径,这样 State 目录迁移后路径发生变化也不可用,1.11.0 把 State 引用改为了相对路径解决了这个问题(FLINK-5763),这样 Savepoint 的管理维护、复用更加灵活方便。

实际生产环境中,用户常常遭遇 Checkpoint 超时失败、长时间不能完成带来的困扰。一旦做业 failover 会形成回放大量的历史数据,做业长时间没有进度,端到端的延迟增长。1.11.0 从不一样维度对 Checkpoint 的优化和提速作了改进,目标实现分钟甚至秒级的轻量型 Checkpoint。

首先,增长了 Checkpoint Coordinator 通知 Task 取消 Checkpoint 的机制(FLINK-8871),这样避免 Task 端还在执行已经取消的 Checkpoint 而对系统带来没必要要的压力。同时 Task 端放弃已经取消的 Checkpoint,能够更快的参与执行 Coordinator 新触发的 Checkpoint,某种程度上也能够避免新 Checkpoint 再次执行超时而失败。这个优化也对后面默认开启 Local Recovery 提供了便利,Task 端能够及时清理失效 Checkpoint 的资源。

其次,在反压场景下,整个数据链路堆积了大量 Buffer,致使 Checkpoint Barrier 排在数据 Buffer 后面,不能被 Task 及时处理对齐,也就致使了 Checkpoint 长时间不能执行。1.11.0 中从两个维度对这个问题进行解决:

1)尝试减小数据链路中的 Buffer 总量(FLINK-16428),这样 Checkpoint Barrier 能够尽快被处理对齐。

  • 上游输出端控制单个 Sub Partition 堆积 Buffer 的最大阈值(Backlog),避免负载不均场景下单个链路上堆积大量 Buffer。
  • 在不影响网络吞吐性能的状况下合理修改上下游默认的 Buffer 配置。
  • 上下游数据传输的基础协议进行了调整,容许单个数据链路能够配置 0 个独占 Buffer 而不死锁,这样总的 Buffer 数量和做业并发规模解耦。根据实际需求在吞吐性能和 Checkpoint 速度二者之间权衡,自定义 Buffer 配比。

这个优化有一部分工做已经在 1.11.0 中完成,剩余部分会在下个版本继续推动完成。

2)实现了全新的 Unaligned Checkpoint 机制(FLIP-76)从根本上解决了反压场景下 Checkpoint Barrier 对齐的问题。实际上这个想法早在 1.10.0 版本以前就开始酝酿设计,因为涉及到不少模块的大改动,实现机制和线程模型也很复杂。咱们实现了两种不一样方案的原型 POC 进行了测试、性能对比,肯定了最终的方案,所以直到 1.11.0 才完成了 MVP 版本,这也是 1.11.0 中执行引擎层惟一的一个重量级 Feature。其基本思想能够归纳为:

  • Checkpoint Barrier 跨数据 Buffer 传输,不在输入输出队列排队等待处理,这样就和算子的计算能力解耦,Barrier 在节点之间的传输只有网络延时,能够忽略不计。
  • 每一个算子多个输入链路之间不须要等待 Barrier 对齐来执行 Checkpoint,第一个到的 Barrier 就能够提早触发 Checkpoint,这样能够进一步提速 Checkpoint,不会由于个别链路的延迟而影响总体。
  • 为了和以前 Aligned Checkpoint 的语义保持一致,全部未被处理的输入输出数据 Buffer 都将做为 Channel State 在 Checkpoint 执行时进行快照持久化,在 Failover 时连同 Operator State 一同进行恢复。换句话说,Aligned 机制保证的是 Barrier 前面全部数据必须被处理完,状态实时体现到 Operator State 中;而 Unaligned 机制把 Barrier 前面的未处理数据所反映的 Operator State 延后到 Failover Restart 时经过 Channel State 回放进行体现,从状态恢复的角度来讲最终都是一致的。注意这里虽然引入了额外的 In-Flight Buffer 的持久化,可是这个过程实际是在 Checkpoint 的异步阶段完成的,同步阶段只是进行了轻量级的 Buffer 引用,因此不会过多占用算子的计算时间而影响吞吐性能。

 Unaligned Checkpoint 在反压严重的场景下能够明显加速 Checkpoint 的完成时间,由于它再也不依赖于总体的计算吞吐能力,而和系统的存储性能更加相关,至关于计算和存储的解耦。可是它的使用也有必定的局限性,它会增长总体 State 的大小,对存储 IO 带来额外的开销,所以在 IO 已是瓶颈的场景下就不太适合使用 Unaligned Checkpoint 机制。

1.11.0 中 Unaligned Checkpoint 尚未做为默认模式,须要用户手动配置来开启,而且只在 Exactly-Once 模式下生效。但目前还不支持 Savepoint 模式,由于 Savepoint 涉及到做业的 Rescale 场景,Channel State 目前还不支持 State 拆分,在后面的版本会进一步支持,因此 Savepoint 目前仍是会使用以前的 Aligned 模式,在反压场景下有可能须要很长时间才能完成。

四  总结

Flink 1.11.0 版本的开发过程当中,咱们看到愈来愈多来自中国的贡献者参与到核心功能的开发中,见证了 Flink 在中国的生态发展愈来愈繁荣,好比来自腾讯公司的贡献者参与了 K8s、Checkpoint 等功能开发,来自字节跳动公司的贡献者参与了 Table & SQL 层以及引擎网络层的一些开发。但愿更多的公司可以参与到 Flink 开源社区中,分享在不一样领域的经验,使 Flink 开源技术一直保持先进性,可以普惠到更多的受众。

通过 1.11.0 “小版本”的短暂调整,Flink 正在酝酿下一个大版本的 Feature,相信必定会有不少重量级的特性登场,让咱们拭目以待!