Apache Flink 在同程艺龙实时计算平台的研发与应用实践

本文主要介绍 Apache Flink 在同程艺龙的应用实践,从当前同程艺龙实时计算平台现状、建设过程、易用性提高、稳定性优化四方面分享了同城艺龙实时计算平台的建设经验,供你们参考。html

1.背景介绍

在 2015 年初,为了可以采集到用户在 PC,APP 等平台上的行为轨迹,咱们开始开发实时应用。那时可选的技术架构仍是比较少的,实时计算框架这块,当时比较主流的有 Storm 和 Spark-streaming。综合考虑实时性,接入难度,咱们最终选择使用基于 Storm 构建了第一个版本的用户行为轨迹采集框架。后续随着实时业务的增多,咱们发现 Storm 已经远远不能知足咱们对数据端到端处理准确一次(Exactly-Once)语义的需求,而且对于流量高峰来临时也不能平滑的背压(BackPressure),在大规模集群的支持上 Storm 也存在问题。通过充分的调研后,咱们在 2018 年初选择基于 Flink 开发同程艺龙新一代实时计算平台。git

目前实时计算平台已支撑近千个实时任务运行,服务公司的市场、机票、火车票、酒店、金服、国旅、研发等各个业务条线。下面主要结合实时计算平台来分享下咱们在 Flink 落地过程当中的一些实践经验及思考。github

2.平台建设

在开发实时计算平台前,咱们有过大量实时应用业务的经验,咱们发现使用实时计算的业务方主要有两类:算法

  • 一类的大数据业务是基于 Lambda 架构开发的,这部分业务是须要有一个实时计算的组件来帮他们把之前离线的一套数据同步清洗(如:sqoop、hive)转换成实时任务。有时在这个过程当中也须要组件来支持实时的过滤聚合。这部分业务方大可能是数仓&分析,他们对 SQL 比较熟悉,更倾向于用 SQL 解决一切问题;
  • 另外一部分业务方主要是数据开发&挖掘,他们的业务场景更复杂,业务需求变化及应用迭代很频繁,更关注实时应用的性能,他们喜欢用编程语言如:Java,scala 来开发实时应用。

为了更好的为两类用户提供支持,实时计算平台同时支持两种类型的任务:FlinkSQL 和 FlinkStream。平台总体架构如图所示: apache

1.jpg

2.1 FlinkSQL

2.1.1 概述

上图的后端 RTC-FlinkSQL 模块便是用来执行提交 FlinkSQL 任务的服务,SQL 属于声明式语言,通过 30、40 年的发展,具备很高的易用性、灵活性和表达性。虽然 Flink 提供了 Table & SQL API,可是咱们当时基于的 Flink 1.4 及 1.6 版本自己语法也不支持像 Create Table 这样的 DDL 语法,而且在须要关联到外部数据源的时候 Flink 也没有提供 SQL 相关的实现方式。编程

此外根据其提供的 API 接口编写 TableSource 和 TableSink 异常繁琐,不只要了解 Flink 各类 Operator 的 API,还要对各个组件的相关接入和调用方式有必定了解(好比 Kafka、RocketMQ、Elasticsearch、HBase、HDFS 等),所以对于只熟悉 SQL 进行数据分析的人员直接编写 FlinkSQL 任务须要较大的学习成本。后端

鉴于以上缘由,咱们构建了实时计算平台的 RTC-FlinkSQL 开发模块并对 FlinkSQL 进行扩展,让这部分用户在使用 FlinkSQL 的时候只须要关心作什么,而不须要关心怎么作。不须要过多的关心程序的实现,而是专一于业务逻辑。api

2.1.2 四步实现 FlinkSQL 提交模块

  • 构建于 Apache Calcite、Apache Flink 之上
  • 将 SQL 映射成 Flink JobGraph
    parser:经过 Calcite api 实现解析,最终获得 SqlNode 集合
    validator:从 SqlNode 中提取执行的 SQL 和 Source、Sink、维表对应的配置信息
    executor:利用 validator 获取的信息借助 - Flink 的 API 获得对应的JobGraph

经过 Yarn Client 提交构建好的 Flink 任务,提交成功返回 ApplicationID性能优化

  • 利用 YARN 返回的 ApplicationID 获取 JobId 以后经过 Flink RESTful API 监控程序的运行情况

2.1.3 在原有 FlinkSQL 的基础上作了不少扩展

  • 支持建立源表语句

这里主要是根据上述 validator 阶段获取的 Source 配置信息,根据指定参数实例化出该对象,而后调用 registerTableSource 方法将 TableSource 注册到 environment,从而完成了源表的注册。 网络

2.jpg

  • 支持建立输出表语句

Flink Table 输出 Operator 基类是 TableSink,咱们这里继承的是 AppendStreamTableSink,根据上述 validator 阶段获取的 Sink 配置信息,根据指定参数实例化出该对象,而后调用 registerTableSink 方法将 TableSink 注册到 environment。

3.jpg

  • 支持建立自定义函数

继承 ScalarFunction 或者继承 TableFunction,须要从用户提交的 SQL 中获取要使用的自定义函数类名, 以后经过反射获取实例,判断自定义 Function 属于上述哪一种类型,而后调用 TableEnvironment.registerFunction 便可完成了 UDF 的注册,最后用户就能够在 SQL中使用自定义的 UDF。

4.jpg

  • 支持维表关联

使用 Calcite 对上述 validator 阶段获取的可执行 SQL 进行解析,将 SQL 解析出一个语法树,经过迭代的方式,搜索到对应的维表,并结合上述 validator 阶段获取的维表信息实例化对应的 SideOperator 对象,以后经过 RichAsyncFunction 算子生成新的 DataStream,最后从新注册表并执行其余 SQL,咱们同时支持帐号密码直连和公司研发提供的 DAL 方式。

5.jpg

以下图所示,能够方便地在实时计算平台上 FlinkSQL 编辑器内完成 FlinkSQL 任务的开发,目前线上运行有 500+ 的 FlinkSQL 任务在运行。

6.jpg

2.2 FlinkStream

除了 FlinkSQL 外,平台上还有一半的实时任务是一些业务场景更复杂,经过代码来编写开发的任务。对此咱们提供了 RTC-FlinkStream 模块来让用户上传本身本地打包后的 FAT-JAR,经过资源管理平台来让用户对 JAR 作版本管理控制,方便用户选择运行指定的任务版本,FlinkStream 任务开发界面如图所示。

7.jpg

这部分任务有些对资源使用需求比较大,咱们提供了任务容器配置的参数来让用户灵活的配置其 Task 并发,而且提供了自定义时间周期触发保存点(savepoint)的功能。

3.易用性提高

平台开发难度相对低,难的是如何提高平台的易用性,由于开源组件如 Apache Flink 核心关注数据的处理流程,对于易用性这部分稍显不足,因此在实时平台功能开发过程当中要修改 Flink 组件的源码来提高其易用性。

3.1 指标(Metrics)监控

以 Flink 任务运行的指标(Metrics)监控来讲,当 Flink 程序提交至集群以后,咱们须要的是收集任务的实时运行 Metrics 数据,经过这些数据能够实时监控任务的运行情况,例如,算子的 CPU 耗时、JVM 内存、线程数等。这些实时 Metrics 指标对任务的运维、调优等有着相当重要的做用,方便及时发现报警,进行调整。

经过对比现有的指标采集系统,包括 InfluxDB、StatsD、Datadog 等系统再结合公司的指标收集系统,咱们最终决定采用 Prometheus 做为指标系统。可是在开发过程当中咱们发现 Flink 只支持 Prometheus 的拉模式收集数据,此模式须要提早知道集群的运行主机以及端口等信息,适合于单集群模式。

而做为企业用户,更多的是将 Flink 任务部署在 YARN 等集群上,此时,Flink 的 JobManager、TaskManager 的运行是由 YARN 统一调度,主机以及是端口都是动态的,而 Flink 只支持的拉模式难以知足咱们需求。因此咱们经过增长 Prometheus 的 Pushgateway 来进行指标的收集,此模式属于推模式,架构如图所示。同时,咱们也积极的向社区贡献了这个新特性[4] ,目前 PR 已经被合并,详情见 FLINK-9187。

8.jpg

3.2 配置监控页面

在完成 Flink Pushgateway 的相关工做后,为了方便用户查看本身 Flink 任务的吞吐量,处理延迟等重要监控信息,咱们为用户配置了监控页面,方便用户在实时计算平台上快速定位出任务性能问题,如经过咱们实时平台监控页面提供的图表,具体指标为 flink_taskmanager_job_task_buffers_outPoolUsage 来快速判断实时任务的 Operator 是否存在反压状况[2]。

在使用过程当中咱们也发现了 Flink Metrics 中衡量端到端的 Opertor Latency 的指标存在漂移,致使监控不许确问题。咱们也修复了该问题[5]并反馈给了社区,详情见FLINK-11887。

9.jpg

3.3 日志

提高平台易用性还有一个重要的地方就是日志,日志分为操做日志,启动日志,业务日志,运行历史等日志信息。其中比较难处理的就是用户代码中打印的业务日志。由于 Flink 任务是分布式执行的,不一样的 TaskManager 的处理节点都会有一份日志,业务看日志要分别打开多个 TaskManager 的日志页面。

而且Flink任务是属于长运行的任务,用户代码中打印的日志是打印在 Flink WebUI 上。此时会面临一个问题,当任务运行的时间越长,日志量会愈来愈多,原生自带的日志页面将没法打开。为了方便用户查看日志,解决用户没法获取到实时任务的日志信息,同时也为了方便用户根据关键词进行历史日志的检索,咱们在实时计算平台为用户提供了一套实时日志系统功能,开发人员能够实时地搜索任务的日志。

而且系统采用无侵入式架构,架构图见下图,在用户程序无感知的状况下,实时采集日志,并同步到 Elasticsearch 中,当业务须要检索日志时,可经过 Elasticsearch 语法进行检索。

10.jpg

3.4 计算组件

计算组件每每处于大数据的中间位置,上游承接 MQ 等实时数据源,下游对接 HDFS、HBase 等大数据存储,经过 Flink 这些实时组件将数据源和数据目标串联在一块儿。为了不混乱,这个过程每每须要经过数据血缘来作管理。然而常见的数据血缘管理的开源项目如 Apache Atlas 等并未提供对 Flink 的支持,而 Flink 自身也没有提供相应的 Hook 来抽取用户代码的中的数据源等信息。

为了解决这个问题,咱们修改了 Flink Client 提交过程,在 CliFrontend 中增长一个 notify 环节,经过 ContextClassLoader 和反射在 Flink 任务提交阶段将 Flink 生成的 StreamGraph 内的各个 StreamNode 抽取出来,这样就能够在提交时候获取出用户编写的 Flink 任务代码中关键数据源等配置信息,从而为后续的 Flink 数据血缘管理提供支持。其关键代码以下:

11.jpg

Flink 采用了 Chandy-Lamport 的快照算法来保证一致性和容错性,在实时任务的运行期间是经过 Checkpoint [1]机制来保障的。若是升级程序,重启程序,任务的运行周期结束,window 内的状态或使用 mapstate 的带状态算子(Operator)所保存的数据就会丢失了,为了解决这个问题,给用户提供平滑升级程序方案从而保障数据准确处理,咱们实时计算平台提供了从外部触发 Savepoint 功能,在用户手动重启任务的时候,能够选择最近一段时间内执行成功的保存点来恢复本身的程序。平台从保存点恢复任务操做如图所示。

12.jpg

虽然咱们提供了通用的实时计算平台,可是有些用户想使用 Flink,除此以外还须要在平台上增长些更符合其业务特色的功能,对此咱们也开放了咱们实时计算平台的 API 接口给到业务方,让业务根据其自身场景特色来加速实时应用的变现和落地。

4.稳定性优化

前面介绍了咱们在实时计算平台易用性方面如:SQL,监控,日志,血缘,保存点等功能点上作的开发工做,其实除了平台功能开发以外还有更多的工做内容是用户没有感知到的。如保障实时应用运行稳定性,在这方面咱们积累了不少实践经验,与此同时咱们也在 Github 上创建了 Tongcheng-Elong 组织,并将修复后的源代码贡献到 Apache 社区。其中有十几个 patch 已经被社区接收合并。接下来分享一些咱们遇到的稳定性问题和提供的解决方案。

4.1 Flink 的“ 空跑”问题

咱们在集群运维过程当中发现,在偶发的状况下,Flink 任务会在 YARN 集群上空跑。此时,在 YARN 层面的现象是任务处于 RUNNING 状态,可是进入到 Flink WebUI,会发现此时全部的 TaskManager 所有退出,并无任务在运行。这个状况下,会形成的 YARN 资源的浪费,同时也给运维人员带来困扰,为何 TaskManager 都退出了,JobManager 不退出呢?甚至给平台监控任务运行状态带来误判,认为任务还在运行,但实际任务早挂了。

这个问题比较难定位,首先发生这种状况很少,可是一旦出现影响很大。其次,没有异常堆栈信息,没法定位到具体的根本缘由。咱们的解决方法是经过修改源码,在多个可能的地方增长日志埋点,以观察并了解任务退出时 JobManager 所执行的处理逻辑。最终咱们定位到当任务失败时,在默认的重试策略以后,会将信息归档到 HDFS 上。因为是串行执行,因此若是在归档过程当中发生异常,则会中断正常处理逻辑从而致使通知 JobManager 的过程不能成功执行。具体的执行逻辑见下图。

13.jpg

梳理清楚逻辑以后,咱们发现社区也没有修复这个问题。一样,咱们也积极向社区进行提交PR修复6[8]。修复这个问题,须要经过 3 个 PR,逐步进行完善,详情见 FLINK-1224六、FLINK-1221九、FLINK-12247。

咱们的存储组件比较多,在使用 Flink-Connector 来读写相关存储组件的如:RocketMQ、HDFS、Kudu、Elasticsearch 也发现过这些 Connector 的 Source/Sink 存在问题,咱们在修复以后也提交了 PR 反馈到社区:

  • RocketMQSource 的 connector 从 savepoint 恢复异常问题,及 RocketMQSource 不能严格保证数据不丢失问题。咱们修复这些问题后为业务用户提供基于咱们本身版本稳定的 connector SDK;
  • BucketingSink 写入 HDFS 时出现的 client 无限续租致使的文件卡在 openforwrite 状态问题[3],咱们也维护了本身的 filesystem connector SDK 提供给业务用户,在异常发生时主动释放租约;
  • 对 Flink 写 Kudu 时提供的 KuduSink 性能太低问题,咱们也提出经过异步刷新模式来提升 Sink 的写入性能[9];
  • Flink - Elasticsearch6-connector 写入线程死锁问题。ES 是实时分析这边重要的存储组件,而在咱们实际的实践过程当中会发现原本运行正常的 Flink 程序会偶尔出现程序 hang 住,全部的数据处理都中止,消费 MQ 数据速度降为 0,除非重启任务不然没法恢复。这个问题一旦出现,严重影响线上实时应用的稳定性。Flink 和 ElasticSearch 社区也有多个 issue 讨论相似的问题。在通过分析后,咱们发现问题主要缘由是 Elasticsearch 6 的 core 模块在线程池重构后 Bulk Interval FlushTask 和 RetryHandler 共用相同线程池致使的,具体的执行逻辑见图。

14.jpg

对于该问题的临时解决方案是在使用 Elasticsearch 6.x 的 RestHighLevelClient 的时候暂时中止使用 setBulkFlushInterval 配置, 而是经过 Flink 自身的 checkpoint 机制来触发数据定时 Flush 到 ElasticSearch Server 端。真正完全解决办法是构建单独的线程池提供给 ReryHandler 来使用。随后咱们也向 Elasticsearch 社区提交了 issue 及 PR 来修复这个问题 [10]。在这个过程当中发现也顺便修复了 Flink 在任务重试时候 transport client 线程泄露[11]等问题详情见 FLINK-11235。

4.2 Flink 与 ZK 网络问题

咱们也遇到了 Flink 与 ZK 网络问题,当 Jobmanager 与 ZK 的链接中断以后,会将正在运行的任务当即中止。当集群中任务不少时,可能因为网络抖动等缘由瞬断时,会致使任务的重启。而在咱们集群上有上千的 Flink 应用,一旦出现网络抖动,会使得大量 Flink 任务重启,这个问题对集群和任务的稳定性影响比较大。

根本缘由是 Flink 底层采用 Curator 的 LeaderLatch 作分布式锁服务,在 Curator-2.x 的版本中对于网络瞬断没有容忍性,当由于网络抖动、机器繁忙、zk集群短暂无响应都会致使 curator 将状态置为 suspended,正是这个 suspended 状态致使了全部任务的重启。

咱们的解决办法是先升级 Curator 版本到 4.x[12],而后在提高版本后再用 CuratorFrameworkFactory 来构造 CuratorFramework 时,经过使用 ConnectionStateErrorPolicy 将 StandardConnectionStateErrorPolicy 替换为 SessionConnectionStateErrorPolicy,前者将 suspended 和 lost 都做为 error,后者只是将 lost 做为 error,而只有发生 error 的时候才会取消 leadership,因此在通过修改以后,在进入 suspended 状态时,再也不发生 leadership 的取消和从新选举。咱们把这个问题和咱们的解决办法也反馈给了社区,详情见 FLINK-10052。

5.总结

本文大体介绍了 Flink 在同程艺龙实时计算平台实践过程当中的一些工做和踩过的坑。对于大数据基础设施来讲平台是基础,除此以外还须要投入不少精力来提升 Flink 集群的易用性和稳定性,这个过程当中要紧跟开源社区,由于随着同程艺龙在大数据这块应用场景愈来愈多,会遇到不少其它公司没有遇到甚至没有发现的问题,这个时候基础设施团队要有能力主动解决这些影响稳定性的风险点,而不是被动的等待社区来提供 patch。

因为在 Flink 在 1.8 版本以前社区方向主要集中在 Flink Stream 处理这块,咱们也主要应用 Flink 的流计算来替换 storm 及 spark streaming。可是随着近期 Flink 1.9 的发布,Blink 分支合并进入 Flink 主分支,咱们也打算在 Flink Batch 这块尝试一些应用来落地。

做者:同城艺龙数据中心 Flink 小分队(谢磊、周生乾、李苏兴)
Reference:
[1]https://www.ververica.com/blo...
[2]https://www.cnblogs.com/Alone...
[3]https://www.cnblogs.com/Alone...
[4]https://issues.apache.org/jir...
[5]https://issues.apache.org/jir...
[6]https://issues.apache.org/jir...
[7]https://issues.apache.org/jir...
[8]https://issues.apache.org/jir...
[9]https://issues.apache.org/jir...
[10]https://github.com/elastic/el...
[11]https://issues.apache.org/jir...
[12]https://issues.apache.org/jir...


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:

https://developer.aliyun.com/...

首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:

https://tianchi.aliyun.com/ma...

相关文章
相关标签/搜索