简介:《实时数仓入门训练营》由阿里云研究员王峰、阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,协力搭建这次训练营的课程体系,精心打磨课程内容,直击当下同窗们所遇到的痛点问题。由浅入深全方位解析实时数仓的架构、场景、以及实操应用,7 门精品课程帮助你 5 天时间从小白成长为大牛!
本文整理自直播《实时计算 Flink 版 SQL 实践-李麟(海豹)》
视频连接:https://developer.aliyun.com/learning/course/807/detail/13887数据库
内容简要:
1、实时计算Flink版SQL简介
2、实时计算Flink版SQL上手示例
3、开发常见问题和解法
实时计算Flink版选择了SQL这种声明式语言做为顶层API,比较稳定,也方便用户使用。Flink SQL具有流批统一的特性,给用户统一的开发体验,而且语义一致。另外,Flink SQL可以自动优化,包括屏蔽流计算里面State的复杂性,也提供了自动优化的Plan,而且还集成了AutoPilot自动调优的功能。Flink SQL的应用场景也比较普遍,包括数据集成、实时报表、实时风控,还有在线机器学习等场景。segmentfault
在基本操做上,能够看到SQL的语法和标准SQL很是相似。示例中包括了基本的SELECT、FILTER操做。,可使用内置函数,如日期的格式化,也可使用自定义函数,好比示例中的汇率转换就是一个用户自定义函数,在平台上注册后就能够直接使用。架构
在实际的数据处理过程当中,维表的Lookup Join也是一个比较常见的例子。 并发
这里展现的是一个维表INNER JOIN示例。运维
例子中显示的SOURCE表是一个实时变化的订单信息表,它经过INNER JOIN去关联维表信息,这里标黄高亮的就是维表JOIN的语法,能够看到它和传统的批处理有一个写法上的差别,多了FOR SYSTEM\_TIME AS OF这个子句来标明它是一个维表JOIN的操做。SOURCE表每来一条订单消息,它都会触发维表算子,去作一次对维表信息的查询,因此把它叫作一个Lookup Join。dom
Window Aggregation(窗口聚合)操做也是常见的操做,Flink SQL中内置支持了几种经常使用的Window类型,好比Tumble Window,Session Window,Hop Window,还有新引入的Cumulate Window。机器学习
Tumbleide
Tumble Window能够理解成固定大小的时间窗口,也叫滚窗,好比说5分钟、10分钟或者1个小时的固定间隔的窗口,窗口之间没有重叠。函数
Session工具
Session Window(会话窗口) 定义了一个连续事件的范围,窗口定义中的一个参数叫作Session Gap,表示两条数据的间隔若是超过定义的时长,那么前一个Window就结束了,同时生成了一个新的窗口。
Hop
Hop Window不一样于滚动窗口的窗口不重叠,滑动窗口的窗口之间能够重叠。滑动窗口有两个参数:size 和 slide。size 为窗口的大小,slide 为每次滑动的步长。若是slide < size,则窗口会重叠,同一条数据可能会被分配到多个窗口;若是 slide = size,则等同于 Tumble Window。若是 slide > size,窗口之间没有重叠且有间隙。
Cumulate
Cumulate Window(累积窗口),是Flink社区1.13版本里新引入的,能够对比 Hop Window来理解,区别是从Window Start开始不断去累积。示例中Window 一、Window 二、Window 3是在不断地增加的。它有一个最大的窗口长度,好比咱们定义Window Size是一天,而后Step步长是1个小时,那么它会在一天中的每一个小时产生累积到当前小时的聚合结果。
看一个具体的Window聚合处理示例。
如上图所示,好比说须要进行每5分钟单个用户的点击数统计。
源数据是用户的点击日志,咱们指望算出每5分钟单个用户的点击总数, SQL 中使用的是社区最新的 WindowTVF语法,先对源表开窗,再 GROUP BY 窗口对应的属性 window\_start和window\_end, COUNT(*)就是点击数统计。
能够看到,当处理12:00到12:04的数据,有2个用户产生了4次点击,分别能统计出来用户Mary是3次,Bob是1次。在接下来一批数据里面,又来了3条数据,对应地更新到下一个窗口中,分别是1次和2次。
相对于Window Aggregation来讲,Group Aggregation直接触发计算,并不须要等到窗口结束,适用的一个场景是计算累积值。
上图的例子是单个用户累积到当前的点击数统计。从Query上看,写法相对简单一点,直接 GROUP BY user 去计算COUNT(*),就是累积计数。
能够看到,在结果上和Window的输出是有差别的,在与Window相同的前4条输入数据,Group Aggregation输出的结果是Mary的点击数已更新到3次,具体的计算过程多是从1变成2再变成3,Bob是1次,随着后面3条数据的输入,Bob对应的点击数又会更新成2次,对结果是持续更新的过程,这和Window的计算场景是有一些区别的。
以前Window窗口里面输出的数据,在窗口结束后结果就不会再改变,而在Group Aggregation里,同一个Group Key的结果是会产生持续更新的。
更全面地对比一下Window和Group Aggregation的一些区别。
Window Aggregation在输出模式上是按时输出,是在定义的数据到期以后它才会输出。好比定义5分钟的窗口,结果是延迟输出的,好比00:00~00:05这个时间段,它会等整个窗口数据都到齐以后,才完整输出出来,而且结果只输出一次,不会再改变。
Group Aggregation是数据触发,好比第一条数据来它就会输出结果,同一个Key 的第二条数据来结果会更新,因此在输出流的性质上二者也是不同的。Window Aggregation通常状况下输出的是Append Stream,而在Group Aggregation输出的是Update Stream。
在状态State处理上二者的差别也比较大。Window Aggregation会自动清理过时数据,用户就不须要额外再去关注 State的膨胀状况。Group Aggregation是基于无限的状态去作累积,因此须要用户根据本身的计算场景来定义State的TTL,就是State保存多久。
好比统计一天内累计的PV和UV,不考虑数据延迟的状况,也至少要保证State的TTL要大于等于一天,这样才能保证计算的精确性。若是State的TTL定义成半天,统计值就可能不许确了。
对输出的存储要求也是由输出流的性质来决定的。在Window的输出上,由于它是Append流,全部的类型都是能够对接输出的。而Group Aggregatio输出了更新流,因此要求目标存储支持更新,能够用Hologres、MySQL或者HBase这些支持更新的存储。
下面经过具体的例子来看每一种SQL操做在真实的业务场景中会怎么使用,好比SQL基本的语法操做,包括一些常见的Aggregation的使用。
这里的例子是电商交易数据场景,模拟了实时数仓里分层数据处理的状况。
在数据接入层,咱们模拟了电商的交易订单数据,它包括了订单ID,商品ID,用户ID,交易金额,商品的叶子类目,交易时间等基本信息,这是一个简化的表。
示例1会从接入层到数据明细层,完成一个数据清洗工做,此外还会作类目信息的关联,而后数据的汇总层咱们会演示怎么完成分钟级的成交统计、小时级口径怎么作实时成交统计,最后会介绍下在天级累积的成交场景上,怎么去作准实时统计。
- 示例环境:内测版
演示环境是目前内测版的实时计算Flink产品,在这个平台能够直接作一站式的做业开发,包括调试,还有线上的运维工做。
- 接入层数据
使用 SQL DataGen Connector 生成模拟电商交易数据。
接入层数据:为了方便演示,简化了链路,用内置的SQL DataGen Connector来模拟电商数据的产生。
这里面order\_id是设计了一个自增序列,Connector的参数没有完整贴出来。 DataGen Connector支持几种生成模式,好比能够用Sequence产生自增序列,Random模式能够模拟随机值,这里根据不一样的字段业务含义,选择了不一样的生成策略。
好比order\_id是自增的,商品ID是随机选取了1~10万,用户ID是1~1000万,交易金额用分作单位, cate\_id是叶子类目ID,这里共模拟100个叶子类目,直接经过计算列对商品ID取余来生成,订单建立时间使用当前时间模拟,这样就能够在开发平台上调试,而不须要去建立Kafka或者DataHub作接入层的模拟。
- 电商交易数据-订单过滤
这是一个数据清洗的场景,好比须要完成业务上的订单过滤,业务方可能会对交易金额有最大最小的异常过滤,好比要大于1元,小于1万才保留为有效数据。
交易的建立时间是选取某个时刻以后的,经过WHERE条件组合过滤,就能够完成这个逻辑。
真实的业务场景可能会复杂不少,下面来看下SQL如何运行。
这是使用调试模式,在平台上点击运行按钮进行本地调试,能够看到金额这一列被过滤,订单建立时间也都是大于要求的时间值。
从这个简单的清洗场景能够看到,实时和传统的批处理相比,在写法上包括输出结果差别并不大,流做业主要的差别是运行起来以后是长周期保持运行的,而不像传统批处理,处理完数据以后就结束了。
接下来看一下怎么作维表关联。
根据刚才接入层的订单数据,由于原始数据里面是叶子类目信息,在业务上须要关联类目的维度表,维度表里面记录了叶子类目到一级类目的关联关系,ID和名称,清洗过程须要完成的目标是用原始表里面叶子类目ID去关联维表,补齐一级类目的ID和Name。这里经过INNER JOIN维表的写法,关联以后把维表对应的字段选出来。
和批处理的写法差别仅仅在于维表的特殊语法FOR SYSTEM\_TIME AS OF。
如上所示,平台上能够上传本身的数据用于调试,好比这里使用了1个CSV的测试数据,把100个叶子类目映射到10个一级类目上。
对应叶子类目ID的个位数就是它一级类目的ID,会关联到对应的一级类目信息,返回它的名称。本地调试运行优势是速度比较快,能够即时看到结果。在本地调试模式中,终端收到1000条数据以后,会自动暂停,防止结果过大而影响使用。
接下来咱们来看一下基于Window的统计。
第一个场景是分钟级成交统计,这是在汇总层比较经常使用的计算逻辑。
分钟级统计很容易想到Tumble Window,每一分钟都是各算各的,须要计算几个指标,包括总订单数、总金额、成交商品数、成交用户数等。成交的商品数和用户数要作去重,因此在写法上作了一个Distinct处理。
窗口是刚刚介绍过的Tumble Window,按照订单建立时间去划一分钟的窗口,而后按一级类目的维度统计每一分钟的成交状况。
- 运行模式
上图和刚才的调试模式有点区别,上线以后就真正提交到集群里去运行一个做业,它的输出采用了调试输出,直接Print到Log里。展开做业拓扑,能够看到自动开启了Local-Global的两阶段优化。
- 运行日志 - 查看调试输出结果
在运行一段时间以后,经过Task里面的日志能够看到最终的输出结果。
用的是Print Sink,会直接打到Log里面。在真实场景的输出上,好比写到Hologres/MySQL,那就须要去对应存储的数据库上查看。
能够看到,输出的数据相对于数据的原始时间是存在必定滞后的。
在19:46:05的时候,输出了19:45:00这一个窗口的数据,延迟了5秒钟左右输出前1分钟的聚合结果。
这5秒钟实际上和定义源表时WATERMARK的设定是有关系的,在声明WATERMARK时是相对gmt\_create字段加了5秒的offset。这样起到的效果是,当到达的最先数据是 19:46:00 时,咱们认为水位线是到了19:45:55,这就是5秒的延迟效果,来实现对乱序数据的宽容处理。
第二个例子是作小时级实时成交统计。
如上图所示,当要求实时统计,直接把Tumble Window开成1小时Size的Tumble Window,这样能知足实时性吗?按照刚才展现的输出结果,具备必定的延迟效果。所以开一个小时的窗口,必须等到这一个小时的数据都收到以后,在下一个小时的开始,才能输出上一个小时的结果,延迟在小时级别的,知足不了实时性的要求。回顾以前介绍的 Group Aggregation 是能够知足实时要求的。
具体来看,好比须要完成小时+类目以及只算小时的两个口径统计,两个统计一块儿作,在传统批处理中经常使用的GROUPING SETS功能,在实时Flink上也是支持的。
咱们能够直接GROUP BY GROUPING SETS,第一个是小时全口径,第二个是类目+小时的统计口径,而后计算它的订单数,包括总金额,去重的商品数和用户数。
这种写法对结果加了空值转换处理便于查看数据,就是对小时全口径的统计,输出的一级类目是空的,须要对它作一个空值转换处理。
上方为调试模式的运行过程,能够看到Datagen生成的数据实时更新到一级类目和它对应的小时上。
这里能够看到,两个不一样GROUP BY的结果在一块儿输出,中间有一列ALL是经过空值转换来的,这就是全口径的统计值。本地调试相对来讲比较直观和方便,有兴趣的话也能够到阿里云官网申请或购买进行体验。
第三个示例是天级累计成交统计,业务要求是准实时,好比说可以接受分钟级的更新延迟。
按照刚才Group Aggregation小时的实时统计,容易联想到直接把Query改为天维度,就能够实现这个需求,并且实时性比较高,数据触发以后能够达到秒级的更新。
回顾下以前提到的Window和Group Aggregation对于内置状态处理上的区别,Window Aggregation能够实现State的自动清理,Group Aggregation须要用户本身去调整 TTL。因为业务上是准实时的要求,在这里能够有一个替代的方案,好比用新引入的Cumulate Window作累积的Window计算,天级的累积而后使用分钟级的步长,能够实现每分钟更新的准实时要求。
回顾一下Cumulate Window,如上所示。天级累积的话,Window的最大Size是到天,它的Window Step就是一分钟,这样就能够表达天级的累积统计。
具体的Query如上,这里使用新的TVF语法,经过一个TABLE关键字把Windows的定义包含在中间,而后 Cumulate Window引用输入表,接着定义它的时间属性,步长和size 参数。GROUP BY就是普通写法,由于它有提早输出,因此咱们把窗口的开始时间和结束时间一块儿打印出来。
这个例子也经过线上运行的方式去看Log输出。
- 运行模式
能够看到,它和以前Tumble Window运行的结构相似,也是预聚合加上全局聚合,它和Tumble Window的区别就是并不须要等到这一天数据都到齐了才输出结果。
- 运行日志 – 观察调试结果
从上方示例能够看到,在20:47:00的时候,已经有00:00:00到20:47:00的结果累积,还有对应的4列统计值。下一个输出就是接下来的累计窗口,能够看到20:47:00到20:48:00就是一个累计的步长,这样既知足了天级别的累计统计需求,也可以知足准实时的要求。
而后咱们来总体总结一下以上的示例。
在接入层到明细层的清洗处理特色是相对简单,也比较明确,好比业务逻辑上须要作固定的过滤条件,包括维度的扩展,这都是很是明确和直接的。
从明细层到汇总层,例子中的分钟级统计,咱们是用了Tumble Window,而小时级由于实时性的要求,换成了Group Aggregation,而后到天级累积分别展现Group Aggregation和新引入的Cumulate Window。
从汇总层的计算特色来讲,咱们须要去关注业务上的实时性要求和数据准确性要求,而后根据实际状况选择Window聚合或者Group 聚合。
这里为何要提到数据准确性?
在一开始比较Window Aggregation和Group Aggregation的时候,提到Group Aggregation的实时性很是好,可是它的数据准确性是依赖于State的TTL,当统计的周期大于TTL,那么TTL的数据可能会失真。
相反,在Window Aggregation上,对乱序的容忍度有一个上限,好比最多接受等一分钟,但在实际的业务数据中,可能99%的数据能知足这样的要求,还有1%的数据可能须要一个小时后才来。基于WATERMARK的处理,默认它就是一个丢弃策略,超过了最大的offset的这些数据就会被丢弃,不归入统计,此时数据也会失去它的准确性,因此这是一个相对的指标,须要根据具体的业务场景作选择。
上方是实时计算真实业务接触过程当中比较高频的问题。
首先是实时计算不知道该如何下手,怎么开始作实时计算,好比有些同窗有批处理的背景,而后刚开始接触Flink SQL,不知道从哪开始。
另一类问题是SQL写完了,也清楚输入处理的数据量大概是什么级别,可是不知道实时做业运行起来以后须要设定多大的资源
还有一类是SQL写得比较复杂,这个时候要去作调试,好比要查为何计算出的数据不符合预期等相似问题,许多同窗反映无从下手。
做业跑起来以后如何调优,这也是一个很是高频的问题。
1.实时计算如何下手?
对于上手的问题,社区有不少官方的文档,也提供了一些示例,你们能够从简单的例子上手,慢慢了解SQL里面不一样的算子,在流式计算的时候会有一些什么样的特性。
此外,还能够关注开发者社区实时计算 Flink 版、 ververica.cn网站、 B 站的Apache Flink 公众号等分享内容。
逐渐熟悉了SQL以后,若是想应用到生产环境中去解决真实的业务问题,阿里云的行业解决方案里也提供了一些典型的架构设计,能够做为参考。
2.复杂做业如何调试?
若是遇到千行级别的复杂SQL,即便对于Flink的开发同窗来也不能一目了然地把问题定位出来,其实仍是须要遵循由简到繁的过程,可能须要借助一些调试的工具,好比前面演示的平台调试功能,而后作分段的验证,把小段SQL局部的结果正确性调试完以后,再一步一步组装起来,最终让这个复杂做业能达到正确性的要求。
另外,能够利用SQL语法上的特性,把SQL组织得更加清晰一点。实时计算Flink产品上有一个代码结构功能,能够比较方便地定位长SQL里具体的语句,这都是一些辅助工具。
3.做业初始资源设置,如何调优?
咱们有一个经验是根据输入的数据,初始作小并发测试一下,看它的性能如何,而后再去估算。在大并发压测的时候,按照需求的吞吐量,逐步逼近,而后拿到预期的性能配置,这个是比较直接但也比较可靠的方式。
调优这一块主要是借助于做业的运行是状况,咱们会去关注一些重点指标,好比说有没有产生数据的倾斜,维表的Lookup Join须要访问外部存储,有没有产生IO的瓶颈,这都是影响做业性能的常见瓶颈点,须要加以关注。
在实时计算Flink产品上集成了一个叫AutoPilot的功能,能够理解为相似于自动驾驶,在这种功能下,初始资源设多少就不是一个麻烦问题了。
在产品上,设定做业最大的资源限制后,根据实际的数据处理量,该用多少资源能够由引擎自动帮咱们去调到最优状态,根据负载状况来作伸缩。
本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。