做者:黄良辉sql
本文讲述 Flink 在 Shopee 新加坡数据组(Shopee Singapore Data Team)的应用实践,主要内容包括:数据库
- 实时数仓建设背景
- Flink 在实时数据数仓建设中结合 Druid、Hive 的应用场景
- 实时任务监控
- Streaming SQL 平台化
- Streaming Job 管理
- 将来规划优化方向
Shopee 是东南亚与台湾领航电商平台,覆盖新加坡、马来西亚、菲律宾、台湾、印度尼西亚、泰国及越南七大市场,同时在中国深圳、上海和香港设立跨境业务办公室。缓存
其中包括订单商品、物流,支付,数字产品等各方面的业务。为了支持这些互联网化产品,应对越来的越多的业务挑战,因而咱们进行了数据仓库的设计和架构建设。架构
当前随着业务发展,数据规模的膨胀和商务智能团队对实时需求的不断增加,业务挑战愈来愈大:app
业务维度
而言,业务需求愈来愈复杂,有须要明细数据查询,又有实时各类维度聚合报表,实时标签培训和查询需求。同时大量业务共享了一些业务逻辑,形成大量业务耦合度高,重复开发。平台架构
而言,当前任务愈来愈多,管理调度,资源管理,数据质量异常监控等也愈来愈重要。实时化也越来急迫,目前大量业务仍是离线任务形式,致使凌晨服务负载压力巨大,同时基于 T+1(天、小时级)架构业务没法知足精细化、实时化运营须要。技术实现
而言,如今实时业务大量采用 Spark Structured Streaming 实现,严重依赖 HBase 作 Stateful 需求,开发复杂;在异常故障事故,Task 失败,缺少 Exactly Once 特性支持,数据易丢失、重复。为了解决上述问题,因而开始了 Flink 实时数仓的探索。异步
为了支持这些互联网化产品不断增加的的数据和复杂的业务,Shopee 构建以下图数据仓库架构,从下到上层来看:ide
数据收集层
,这一层负责实时数据,包括 Binlog、Service Log, Tracking Service Log,通过 Real-time Ingestion 团队数据将会被收集到 Kafka 、Hbase 中。Auto-Ingestion 团队负责数据库数离线平常收集到 HDFS。存储层
,这层主要是 Kafka 保存实时消息,加上 HDFS 保存 Hive 数据存储等,HBase 保存维度数据。存储层
上面是 Spark, Flink 计算引擎, Presto SQL 查询引擎
。调度管理层
,各类资源管理,任务管理,任务调度,管理各类 Spark,Flink 任务。资源管理层
上一层是 OLAP 数据存储层
,Druid 用于存储时间序列数据,Phoenix(HBase)存储聚合报表数据、维度表数据、标签数据,Elastic Search 存储须要多维度字段索引的数据如广告数据、用户画像等。应用层
,数据报表,数据业务服务,用户画像等。目前在 Shopee Data Team 主要从数据分库 Binlog 以及 Tracking Service 同步到 Kafka 集群中,经过 Flink/Spark 计算,包含实时订单商品销量优惠活动分析,订单物流分析、产品用户标新、用户印象行为分析,电商活动游戏运营分析等。最后的结果存到 Druid、 HBase、 HDFS 等,后面接入一些数据应用产品。目前已经有很多核心做业从 Spark Structured Streaming 迁移到 Flink Streaming 实现。函数
在实时订单销量分析产品中,经过 Flink 处理订单流,将处理后的明细数据实时注入Druid,达到公司实时运营活动分析等用途。
咱们使用 T-1(天)的 Lambda 架构来实时和历史订单数据产品分析,Flink 只处理实时今天的订单数据,每日会定时将昨日的数据经过离线任务索引到 Druid 中覆盖修正实时数据的微小偏差。总体的 Flink 实时处理流程以下图,从上中下看共三条流水线:工具
第一条流水线,经过 Kafka 接入 订单 Binlog 事件。性能
KeyBy
进入ProcessWindowFunction
,由于上游数据是 Binlog 会有重复订单事件,因此会经过 ValueState 来对订单进行去重。Side Output
进入另外一个 Slow Kafka Topic,以便处理异常订单。第二条流水线比较复杂,经过多个实时任务将各分表 Slave Binlog
同步到 Hbase Phoenix 表,以便作成实时订单流的维度表。目前遇到比较多问题仍是常常 Binlog 延迟等问题,以及数据热点问题。
第三条流基本与第一条相似,相似消息队列中的 dead message 异常处理。由于大量维度表依赖,不能保证 Phoenix 都在订单被处理前就被同步到 Phoenix 表,好比新订单商品,新用户,新店铺,新分类,新商品等。因此咱们引入一条实时 backfill 处理流将会对第一条主流,处理失败的订单重复处理,直到全部字段都关联成功才会进入下游 Druid。
另外为了不一些过时消息进入死循环,一样有个事件过滤窗口,保证只保留今日的订单事件在流水线中被处理。不一样的是,由于须要区分付款订单和未付款订单事件类型(可能一个订单有两个状态事件,当用户下单时,会有一个下单事件,当用户完成支付会有一个支付完成事件),因此将订单是否被处理状态放在enrichment以后标记重复成功。
由于上游数据源是 Binlog,因此随着订单状态的更新,会有大量的订单重复事件。
经过使用 Flink State 功能保存在内存中(FsSateBackend),以 ValueState 来标记订单是否被处理,经过设置 TTL,保证订单状态保存24小时过时,如今活动高峰期大概2G State,平均每一个TaskManager大约100M State。Checkpoint interval 设置为10秒一次,HDFS 负载并不高。同时由于流使用了窗口和自定义 Trigger,致使 State 须要缓冲少许窗口数据。窗口的使用将会在 Enrihcment 流程
优化部分详细说明。
在 Enrichment 步骤, 业务逻辑复杂,存在大量 IO,咱们作了大量改进优化。
Local RLU Memeory Cache
层,减小 Hbase 的访问量,加速关联;对 HBase Row Key Salt Bucket 避免订单商品表访问热点问题。Interval Join
来作,可是因为一个订单有多条订单商品信息,加上上游是 Binlog 事件,以及其余维度表数据延迟问题,致使业务逻辑复杂,并且计算产出数据保存在 Druid 只能支持增量更新。因此选择了使用 HBase 存储来关联订单商品信息,附加慢消息处理流来解决数据延迟问题。目前将 Checkpoint 设置为 exactly once
模式,并开启了Kafka exactly once
生产者模式,经过 Two Phase Commit
功能保证数据的一致性,避免 task 失败,job 重启时致使数据丢失。
监控方面,经过监控 Upstream Kafka Topic,以及 HBase 表写入更新状态,结合下游 Druid 数据延迟监控,作到 end-to-end 的 lag 指标监控。经过 Flink Metric Report 汇报 Hbase 访问性能指标,缓存大小,延迟订单数量等来对 Flink job 具体步骤性能分析。
在订单物流实时分析业务,接入 Binlog event 实现支持点更新的物流分析,使用 Flink Retract Stream
功能来支持每当订单和物流有最新状态变化事件就触发下游数据更新。
经过 Interval Join
订单流和物流流,并使用 Rocksdb State 与 Incremental Checkpoint 来维护最近七天的状态数据,从 Hbase 来增长用户维度信息等,维度字段 enrihcment 经过 Local LRU Memory Cache 层来优化查询,最后定时从 Hbase 导出到 HDFS。
如今将 Flink 任务产生的订单物流事件保存 HBase 来支持记录级别的点更新,每小时从 HBase 导出到 HDFS 结果,经过 Presto 接入来作实时分析。HBase 导出到HDFS,经过对 Hbase Row Key Salt Bucket 避免热点问题,优化减少 Region Size(默认10G)来减小导出时间。可是数据如今延迟仍是比较严重,在一个半小时左右,并且链路繁琐。未来考虑加入 Apache Hudi
组建接入 Presto,将延迟降到半小时内。
目前 Shopee 有大量的实时需求经过 SQL 实现,应用场景主要是应用层实时汇总数据报表、维度表更新等。业务经过 SDK 和一站式网站管理两种方式实现。一是以 SDK 形式提供支持,用户能够经过引入 JAR 依赖进行二次项目开发。二是制做了相关网站,经过以任务形式,用户建立任务编辑保存 SQL 来实现业务需求,目前支持以下:
下面是部分任务组织UI化形式:
当前平台只支持 Spark SQL 实现 Stream SQL,使用 Hive 存储元数据,经过关联维度表 JOIN Apache Phoenix 等外部表和外部服务实现 enrichment 等功能。经过对比 Flink SQL 与 Spark SQL,发现 Spark SQL 很多缺点:
Spark SQL 支持仍是有不少局限性,目前正在作 Flink SQL 需求导入评估阶段,并计划在 Stream SQL Platform 接入 Flink SQL 的支持。来知足公司愈来愈复杂用户画像标签标注和简单实时业务 SQL 化,减小业务开发成本。同时须要引入更好的 UDF 管理方式,集成元数据服务简化开发。
Shopee Data Team 拥有大量的实时任务是经过 Jar 包发布的,目前在 Job 管理上经过网站页面化,来减小 Job 维护成本。目前支持环境管理,任务管理,任务应用配置管理,和任务监控报警。
目前能够配置 Flink / Spark Bin 路径来支持不一样的 Flink/Spark 版本,来支持 Flink 升级带来的多版本问题,并支持一些颜色高亮来区分不一样环境。
如今支持实时任务的环境检索,状态检索,名字检索等。支持重启,禁用,配置任务参数等。任务支持从 checkpoint/savepoint 恢复,中止任务自动保存 savepoint,从 kafka timestamp 启动。
同时实时任务也支持配置内存,CPU 等 Flink Job 运行参数、JAR 依赖配置等。目前支持预览,编辑更新等,经过 Jekins CICD 集成与人工干预结果,来完成 Job 的部署升级。
任务应用配置是使用 HOCON 配置格式支持,目前支持共享配置集成,并经过配置名约定将 Checkpoint 路径自动绑定到配置中。网站支持预览模式,编辑模式,配置高亮等,未来会集成配置版本回滚等功能。
对于任务监控方面,如今支持任务异常处理报警。异常处理支持自动挂起失败的任务,并从上次最新 checkpoint 恢复;经过 Flink REST API 检测 Flink Job 状态,来避免 Flink Job 异常形成的假活状态。出现任务重启,异常状况会经过邮件等方式给任务负责人发报警,将来打算在网站集成 Grafana/Promethus 等监控工具来完成任务监控自动化等。
整体而言,Flink 在 Shopee 从 2019 年末开始调研,到项目落地不到半年时间,已经完成业务大量需求导入评估,对 Exactly Once,Kafka Exactly Once Semantics,Two Phase Commit,Interval Join,Rocksdb/FS State 一系列的功能进行了验证。在将来规划上:
做者简介:
黄良辉,2019 年加入 Shopee,在 Shoppe Data Team 负责实时数据业务和数据产品开发。