简介:惟品会 Flink 的容器化实践应用,Flink SQL 平台化建设,以及在实时数仓和实验平台上的应用案例。
转自dbaplus社群公众号
做者:王康,惟品会数据平台高级开发工程师git
GitHub 地址
https://github.com/apache/flink
欢迎你们给 Flink 点赞送 star~github
自 2017 年起,为保障内部业务在平时和大促期间的平稳运行,惟品会就开始基于 Kubernetes 深刻打造高性能、稳定、可靠、易用的实时计算平台,如今的平台支持 Flink、Spark、Storm 等主流框架。数据库
本文将分为五个方面,分享惟品会 Flink 的容器化实践应用以及产品化经验:apache
在集群规模方面,咱们有 2000+ 的物理机,主要部署 Kubernetes 异地双活的集群,利用 Kubernetes 的 namespaces,labels 和 taints 等实现业务隔离以及初步的计算负载隔离。json
Flink 任务数、Flink SQL 任务数、Storm 任务数、Spark 任务数,这些线上实时应用加起来有 1000 多个。目前咱们主要支持 Flink SQL 这一块,由于 SQL 化是一个趋势,因此咱们要支持 SQL 任务的上线平台。api
咱们从下往上进行解析实时计算平台的总体架构:缓存
其实是用 deployment 的模式运行 Kubernetes 上,平台虽然支持 yarn 调度,可是 yarn 调度与批任务共享资源,因此主流任务仍是运行在 Kubernetes 上的。而且,yarn 调度这一层主要是离线部署的一套 yarn 集群。在 2017 年的时候,咱们自研了 Flink on Kubernetes 的一套方案,由于底层调度分了两层,因此在大促资源紧张的时候,实时跟离线就能够作一个资源的借调。架构
主要用来支持公司内部基于 Kafka 的实时数据 vms,基于 binlog 的 vdp 数据和原生 Kafka 做为消息总线,状态存储在 HDFS 上,数据主要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。并发
主要是 Flink、Storm、Spark,目前主推的是 Flink,每一个框架会都会支持几个版本的镜像以知足不一样的业务需求。app
主要提供做业配置、调度、版本管理、容器监控、job 监控、告警、日志等功能,提供多租户的资源管理(quota,label 管理)以及 Kafka 监控。资源配置也分为大促日和日常日,大促的资源和日常的资源是不同的,资源的权限管控也是不同的。在 Flink 1.11 版本以前,平台自建元数据管理系统为 Flink SQL 管理 schema;从 1.11 版本开始,则是经过 Hive metastore 与公司元数据管理系统融合。
主要是支持实时大屏、推荐、实验平台、实时监控和实时数据清洗的一些场景。
上面是实时平台 Flink 容器化的架构图。Flink 容器化实际上是基于 Standalone 模式部署的。
咱们的部署模式共有 Client、Job Manager、Task Manager 三个角色,每个角色都会有一个 Deployment 来控制。
用户经过平台上传任务 jar 包、配置等,存储于 HDFS 上。同时由平台维护的配置、依赖等也存储在 HDFS 上,当 pod 启动时,就会进行拉取等初始化操做。
Client 中主进程是一个由 go 开发的 agent,当 Client 启动时,会首先检查集群状态,当集群准备好后,从 HDFS 上拉取 jar 包,再向这个集群提交任务。Client 的主要任务是作容错,它主要功能还有监控任务状态,作 savepoint 等操做。
经过部署在每台物理机上的 smart-agent 采集容器的指标写入 m3,以及经过 Flink 暴漏的接口将 metrics 写入 prometheus,结合 grafana 展现。一样经过部署在每台物理机上的 vfilebeat 采集挂载出来的相关日志写入 es,在 dragonfly 能够实现日志检索。
1)Flink 平台化
在实践过程当中,必定要结合具体场景和易用性,再去考虑作平台化工做。
2)Flink 稳定性
在咱们应用部署以及运行过程当中,异常是不可避免的,这时候平台就须要作一些保证任务在出现异常情况后,依旧保持稳定性的一些策略。
pod 的健康和可用:
由 livenessProbe 和 readinessProbe 检测,同时指定 pod 的重启策略,Kubernetes 自己能够作一个 pod 的拉起。
Flink 任务产生异常时:
在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到本身的缓存中,并汇报到平台,而后固化到 MySQL 中。当 Flink 没法再重启时,由 Client 从新从最新的成功 checkpoint 提交任务。这是它的第二层保障。
这一层将 checkpoint 固化到 MySQL 中后,就再也不使用 Flink HA 机制了,少了 zk 的组件依赖。
机房容灾:
Kafka 监控是任务监控里很是重要的一个环节,总体的流程以下:
平台提供监控 Kafka 堆积,用户在界面上,能够配置本身的 Kafka 监控,告知在怎样的集群,以及用户消费 message 等配置信息。能够从 MySQL 中将用户 Kafka 监控配置提取后,再经过 jmx 监控 Kafka,这样的信息采集以后,写入下游 Kafka,再经过另外一个 Flink 任务实时监控告警,同时将这些数据同步写入 ck 里面,从而反馈给咱们的用户(这里也能够不用 ck,用 Prometheus 去作监控也是能够的,但 ck 会更加适合),最后再用 Grafana 组件去展现给用户。
有了前面 Flink 的容器化方案以后,就要开始 Flink SQL 平台化建设了。你们都知道,这样流式的 api 开发起来,仍是有必定的成本的。 Flink 确定是比 Storm 快的,也相对比较稳定、容易一些,可是对于一些用户,特别是 Java 开发的一些同窗来讲,作这个是有必定门槛的。
Kubernetes 的 Flink 容器化实现之后,方便了 Flink api 应用的发布,可是对于 Flink SQL 的任务仍然不够便利。因而平台提供了更加方便的在线编辑发布、SQL 管理等一栈式开发平台。
平台的 Flink SQL 方案如上图所示,任务发布系统与元数据管理系统是彻底解耦的。
1)Flink SQL 任务发布平台化
在实践过程当中,须要考虑易用性,作平台化工做,主操做界面以下图所示:
下图是一个用户界面配置的例子:
下图是一个集群配置的范例:
2)元数据管理
平台在 1.11 以前经过构建本身的元数据管理系统 UDM,MySQL 存储 Kafka,Redis 等 schema,经过自定义 catalog 打通 Flink 与 UDM,从而实现元数据管理。
在 1.11 以后,Flink 集成 Hive 逐渐完善,平台重构了 Flink SQL 框架,并经过部署一个 SQL-gateway service 服务,中间调用本身维护的 SQL-Client jar 包,从而与离线元数据打通,实现了实时离线元数据的统一,为以后的流批一体打好了基础。
在元数据管理系统建立的 Flink 表操做界面以下图所示:建立 Flink 表的元数据,持久化到 Hive 里,Flink SQL 启动时从 Hive 里读取对应表的 table schema 信息。
平台对于官方原生支持或者不支持的 connector 进行整合和开发,镜像和 connector,format 等相关依赖进行解耦,能够快捷的进行更新与迭代。
1)Flink SQL 相关实践
Flink SQL 主要分为如下三层:
connector 层
runtime 层
平台层
2)拓扑图执行计划修改
针对现阶段 SQL 生成的 stream graph 并行度没法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的 FlinkSQL 的 excution plan json 提供给用户,利用 uid 保证算子的惟一性,修改每一个算子的并行度,chain 策略等,也为用户解决反压问题提供方法。例如针对 ClickHouse sink 小并发大批次的场景,咱们支持修改 ClickHouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,提升 ClickHouse sink tps。
3)维表关联 keyBy 优化 cache
针对维表关联的状况,为了下降 IO 请求次数,下降维表数据库读压力,从而下降延迟,提升吞吐,有如下三种措施:
下面是维表关联 KeyBy 优化 cache 的图:
在优化以前的时候,维表关联 LookupJoin 算子和正常算子 chain 在一块儿,优化之间维表关联 Lookup Join 算子和正常算子不 chain 在一块儿,将join key 做为 hash 策略的 key。
采用这种方式优化后,例如原来的 3000W 数据量维表,10 个 TM 节点,每一个节点都要缓存 3000W 的数据,总共须要缓存 3 亿的量。而通过 keyBy 优化以后,每一个 TM 节点只须要缓存 3000W/10 = 300W 的数据量,总共缓存的数据量只有 3000W,这很是大程度减小了缓存数据量。
4)维表关联延迟 join
维表关联中,有不少业务场景,在维表数据新增数据以前,主流数据已经发生 join 操做,会出现关联不上的状况。所以,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟 join。
最简单的作法是,在维表关联的 function 里设置重试次数和重试间隔,这个方法会增大整个流的延迟,但主流 qps 不高的状况下,能够解决问题。
增长延迟 join 的算子,当 join 维表未关联时,先缓存起来,根据设置重试次数和重试间隔从而进行延迟的 join。
1)实时数据入仓
实时数仓主要分为三个过程:
使用 Flink SQL 作流式数据入仓是很是方便的,并且 1.12 版本已经支持了小文件的自动合并,解决了大数据层一个很是广泛的痛点。
咱们自定义分区提交策略,当前分区 ready 时候会调一下实时平台的分区提交 api,在离线调度定时调度经过这个 api 检查分区是否 ready。
采用 Flink SQL 统一入仓方案之后,咱们可得到如下成果:
2)实时指标计算
以往指标计算一般采用 Storm 方式,这个方式须要经过 api 定制化开发,采用这样 Flink 方案之后,咱们能够得到了如下成果:
3)实时离线一体化ETL数据集成
具体的流程以下图所示:
Flink SQL 在最近的版本中持续强化了维表 join 的能力,不只能够实时关联数据库中的维表数据,还能关联 Hive 和 Kafka 中的维表数据,能灵活知足不一样工做负载和时效性的需求。
基于 Flink 强大的流式 ETL 的能力,咱们能够统一在实时层作数据接入和数据转换,而后将明细层的数据回流到离线数仓中。
咱们经过将 presto 内部使用的 HyperLogLog(后面简称 HLL)实现引入到 Spark UDAF 函数里,打通 HLL 对象在 Spark SQL 与 presto 引擎之间的互通。如 Spark SQL 经过 prepare 函数生成的 HLL 对象,不只能够在 Spark SQL 里 merge 查询并且能够在 presto 里进行 merge 查询。
具体流程以下:
UV 近似计算示例:
惟品会实验平台是经过配置多维度分析和下钻分析,提供海量数据的 A/B-test 实验效果分析的一体化平台。一个实验是由一股流量(好比用户请求)和在这股流量上进行的相对对比实验的修改组成。实验平台对于海量数据查询有着低延迟、低响应、超大规模数据(百亿级)的需求。
总体数据架构以下:
业务数据流以下:
咱们的实验平台有一个很重要的 ES 场景,咱们上线一个应用场景后,若是我想看效果如何,包括上线产生的曝光、点击、加购、收藏是怎样的。咱们须要把每个数据的明细,好比说分流的一些数据,根据场景分区,写到 ck 里面去。
咱们经过 Flink SQL Redis connector,支持 Redis 的 sink 、source 维表关联等操做,能够很方便地读写 Redis,实现维表关联,维表关联内可配置 cache ,极大提升应用的 TPS。经过 Flink SQL 实现实时数据流的 pipeline,最终将大宽表 sink 到 CK 里,并按照某个字段粒度作 murmurHash3\_64 存储,保证相同用户的数据都存在同一 shard 节点组内,从而使得 ck 大表之间的 join 变成 local 本地表之间的 join,减小数据 shuffle 操做,提高 join 查询效率。
Flink SQL 对于 Hive 用户来讲,使用起来仍是有一点不同的地方。无论是 Hive,仍是 Spark SQL,都是批量处理的一个场景。
因此当前咱们的 Flink SQL 调试起来仍有不少不方便的地方,对于作离线 Hive 的用户来讲还有必定的使用门槛,例如手动配置 Kafka 监控、任务的压测调优。因此如何能让用户的使用门槛降至最低,让用户只须要懂 SQL 或者懂业务,把 Flink SQL 里面的概念对用户屏蔽掉,简化用户的使用流程,是一个比较大的挑战。
未来咱们考虑作一些智能监控,告诉用户当前任务存在的问题,不须要用户去学习太多的东西,尽量自动化并给用户一些优化建议。
一方面,咱们作数据湖主要是为了解决咱们 binlog 实时更新的场景,目前咱们的 VDP binlog 消息流,经过 Flink SQL 写入到 Hive ods 层,以加速 ods 层数据源的准备时间,可是会产生大量重复消息去重合并。咱们会考虑 Flink + 数据湖的 cdc 入仓方案来作增量入仓。
另外一方面咱们但愿经过数据湖,来替代咱们 Kudu,咱们这边一部分重要的业务在用 Kudu。虽然 Kudu 没有大量的使用,但鉴于 Kudu 的运维比通常的数据库运维复杂得多、比较小众,而且像订单打宽以后的 Kafka 消息流、以及聚合结果都须要很是强的实时 upsert 能力,因此咱们就开始调研 CDC+数据湖这种解决方案,用这种方案的增量 upsert 能力来替换 kudu 增量 upsert 场景。
Q1:vdp connector 是 MySQL binlog 读取吗?和 canal是一种工具吗?
A1 :vdp 是公司 binlog 同步的一个组件,将 binlog 解析以后发送到 Kafka。是基于 canal 二次开发的。咱们定义了一个 cdc format 能够对接公司的 vdp Kafka 数据源,与 Canal CDC format 有点相似。目前没有开源,使咱们公司用的 binlog 的一个同步方案。
Q2 : uv 数据输出到 HBase,销售数据输出到 kudu,输出到了不一样的数据源,主要是由于什么采起的这种策略?
A2 :kudu 的应用场景没有 HBase 这么普遍。uv 实时写入的 TPS 比较高,HBase 比较适合单条查询的场景,写入 HBase 高吞吐 + 低延迟,小范围查询延迟低;kudu 的话具有一些 OLAP 的特性,能够存订单类明细,列存加速,结合 Spark、presto 等作 OLAP 分析。
Q3 : 请问一下,大家怎么解决的 ClickHouse 的数据更新问题?好比数据指标更新。
A3 : ck 的更新是异步 merge,只能在同一 shard 同一节点同一分区内异步 merge,是弱一致性。对于指标更新场景不太建议使用 ck。若是在 ck 里有更新强需求的场景,能够尝试 AggregatingMergeTree 解决方案,用 insert 替换 update,作字段级的 merge。
Q4:binlog 写入怎么保证数据的去重和一致性?
A4 : binlog 目前尚未写入 ck 的场景,这个方案看起来不太成熟。不建议这么作,能够用采用 CDC + 数据湖的解决方案。
Q5 : 若是 ck 各个节点写入不均衡,怎么去监控,怎么解决?怎么样看数据倾斜呢?
A5 :能够经过 ck 的 system.parts 本地表监控每台机器每一个表每一个分区的写入数据量以及 size,来查看数据分区,从而定位到某个表某台机器某个分区。
Q6 : 大家在实时平台是如何作任务监控或者健康检查的?又是如何在出错后自动恢复的?如今用的是 yarn-application 模式吗?存在一个 yarn application 对应多个 Flink job 的状况吗?
A6 : 对于 Flink 1.12+ 版本,支持了 PrometheusReporter 方式暴露一些 Flink metrics 指标,好比算子的 watermark、checkpoint 相关的指标如 size、耗时、失败次数等关键指标,而后采集、存储起来作任务监控告警。
Flink 原生的 restart 策略和 failover 机制,做为第一层的保证。
在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到本身的缓存中,并汇报到平台,固化到 MySQL 中。当 Flink 没法再重启时,由 Client 从新从最新的成功 checkpoint 提交任务。做为第二层保证。这一层将 checkpoint 固化到 MySQL 中后,就再也不使用 Flink HA 机制了,少了 zk 的组件依赖。
当前两层没法重启时或集群出现异常时,由平台自动从固化到 MySQL 中的最新 chekcpoint 从新拉起一个集群,提交任务,做为第三层保证。
咱们支持 yarn-per-job 模式,主要基于 Flink on Kubernetes 模式部署 standalone 集群。
Q7 : 目前大家大数据平台上全部的组件都是容器化的仍是混合的?
A7 :目前咱们实时这一块的组件 Flink、Spark 、Storm、Presto 等计算框架实现了容器化,详情可看上文 1.2 平台架构。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8 :kudu 不是在 Kubernetes 上运行,这个目前尚未特别成熟的方案。而且 kudu 是基于 cloudera manager 运维的,没有上 Kubernetes 的必要。
Q9 : Flink 实时数仓维度表存到 ck 中,再去查询 ck,这样的方案能够吗?
A9:这是能够的,是能够值得尝试的。事实表与维度表数据均可以存,能够按照某个字段作哈希(好比 user\_id),从而实现 local join 的效果。
本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。