Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

TopN 是统计报表和大屏很是常见的功能,主要用来实时计算排行榜。流式的 TopN 不一样于批处理的 TopN,它的特色是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,而后当排行榜发生变化时,发出更新后的排行榜。本文主要讲解 Flink SQL 是如何从语法和实现上设计 TopN 的。算法

TopN 语法

全局 TopN

用户最关心的是如何用 SQL 写出 TopN 的查询。你们最熟悉的 TopN 的写法通常是这样的:sql

SELECT column_name(s) FROM table_name WHERE condition ORDER BY order_field [DESC|ASC] LIMIT number

如上语法是 MySQL 的 TopN 语法,使用 ORDER BY 指定排序键和排序方向,使用 LIMIT 来指定选出前几名。不一样的数据库的 TopN 语法不尽相同,好比 MS SQL Server 使用 TOP 的关键字,Oracle 使用 ROWNUM 的隐藏字段。不过几家数据库提供的 TopN 语法都是全局 TopN,也就是数据是全局进行排序的,查询的结果只有一组排行榜。好比但愿对全网商家按销售额排序,计算出销售额排名前十的商家。这就是全局 TopN,范例以下:数据库

SELECT * FROM shop_sales ORDER BY sales DESC LIMIT 10

分组 TopN

上文讲述了全局 TopN 的语法,可是不少时候用户但愿根据不一样的分组进行排序,计算出每一个分组的一个排行榜。例如对全网商家根据行业按销售额排序,计算出每一个行业销售额前十名的商家。这时候,传统的 TopN 语法就没法表达这种需求了。有些 Stream SQL 系统为了解决这个问题,会 hack 一种新的 TopN 语法容许用户指定分组字段。可是 Flink SQL 是基于 ANSI SQL 标准语法的,不能加入任何非标准的语法。因而咱们尝试从批处理的角度去思考这个问题,在传统批处理中经常使用 ROW_NUMBER 的开窗聚合函数来解决分组 TopN 的问题。语法以下所示:数据结构

SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]

参数说明:并发

  • ROW_NUMBER(): 是一个计算行号的OVER窗口函数,行号计算从1开始。
  • PARTITION BY col1[, col2..] : 指定分区的列,能够不指定。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: 指定排序的列,能够多列不一样排序方向。

如上语法所示,TopN 须要两层 query,子查询中使用ROW_NUMBER()开窗函数来为每条数据标上排名,排名的计算根据PARTITION BYORDER BY来指定分区列和排序列,也就是说每一条数据会计算其在所属分区中,根据排序列排序获得的排名。在外层查询中,对排名进行过滤,只取出排名小于 N 的,如 N=10,那么就是取 Top 10 的数据。若是没有指定PARTITION BY那么就是一个全局 TopN 的计算,因此 ROW_NUMBER 在使用上更为灵活。函数

《实时计算 Flink SQL 核心功能解密》中所述,Flink SQL 是一个流与批统一的 API,也就是说理论上一段 SQL 要既能跑在批处理模式下,也能跑在流处理模式下,且输出的结果是一致的。那么在流处理模式下理所固然地应该支持 ROW_NUMBER 形式的 TopN 语法。例如上文说的对全网商家根据行业按销售额排序,计算出每一个行业销售额前十名的商家,SQL 范例以下。性能

SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rownum FROM shop_sales) WHERE rownum <= 10

TopN 实现和优化

ROW_NUMBER 方式的 TopN 语法很是灵活,能知足全局 TopN 和分组 TopN 的需求。可是在流计算上的物理执行是一个挑战。如上文所述的每一个行业销售额前十商家排行榜,通过 SQL 编译后获得的抽象语法树(AST)以下所示。优化

TB17SgxdS_I8KJjy0FoXXaFnVXa

LogicalWindow 会对全部数据进行排名,也就是说每当到达一个数据,就要对历史数据进行重排序,并输出历史数据的新的排名,而后 LogicalCalc 节点会根据排名进行过滤。这在性能上是很是糟糕的,由于这无限放大了流量。而咱们知道,最优的流式 TopN 的计算只须要维护一个 N 元素大小的小根堆,每当有数据到达时,只须要与堆顶元素比较,若是比堆顶元素还小,则直接丢弃;若是比堆顶元素大,则更新小根堆,并输出更新后的排行榜。也就是说咱们不须要分为两个节点进行计算,不须要将全部数据进行排序,只须要在一个节点中就能够高效地完成计算。因此咱们在查询优化器中加入了一条规则,在使用 TopN 语法时,将 LogicalWindow 和 LogicalCalc 合并成了 LogicalRank 节点。LogicalRank 在翻译成物理执行计划时,会使用一个通过特殊设计的 TopN 算子。阿里云

TopN 算子的实现上主要有两个数据结构,一个是 TreeMap,另外一个是 MapState。TreeMap 的做用相似于上文的小根堆,有序地存放了排名前 N 的元素。可是 TreeMap 是个内存数据结构,在 failover 后会丢失,没法保证数据的一致性。所以咱们还有一个 MapState 结构,MapState 是 Flink 提供的状态接口,用来存储 TopN 的数据(保证数据不丢)。当有 failover 发生后,MapState 能保证状态的恢复,而 TreeMap 会从 MapState 中从新构造出来。咱们并有没有把顺序也存到状态中去,由于顺序是能够在恢复时重构的。由于每一次状态的读写操做都会涉及到序列化/反序列化,每每是性能的瓶颈,因此 TreeMap 的主要做用是下降了对 MapState 状态的读写操做。对大部分数据来讲都是与 TreeMap 进行交互,不须要对 MapState 进行读写的,全是内存操做,因此 TopN 的性能是很是高的。spa

TopN 算子的主要处理流程是,每当有数据到达时,会与 TreeMap 的最小的元素比较,若是比它小,那么该数据就不多是 TopN 的一员,直接丢弃便可。若是比它大,那么就会先更新 TreeMap,同时更新 MapState 中的存的数据。最后输出更新后的排行榜。为了减小冗余数据的输出,咱们只会输出排名发生变化的数据。例如原先的第7名上升到了第六名,那么只须要输出新的第六名和第七名便可。

嵌套 TopN 解决热点问题

TopN 的计算与 GroupBy 的计算相似,若是数据存在倾斜,则会有计算热点的现象。好比全局 TopN,那么全部的数据只能聚集到一个节点进行 TopN 的计算,那么计算能力就会受限于单台机器,没法作到水平扩展。解决思路与 GroupBy 是相似的,就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如,计算全网排名前十的商铺,会致使单点的数据热点,那么能够先加一层分组 TopN,组的划分规则是根据店铺 ID 哈希取模后分红128组(并发的倍数)。第二层 TopN 与原先的写法同样,没有 PARTITION BY。第一层会计算出每一组的 TopN,然后在第二层中进行合并汇总,获得最终的全网前十。第二层虽然还是单点,可是大量的计算量由第一层分担了,而第一层是能够水平扩展的。使用嵌套 TopN 的优化写法以下所示:

CREATE VIEW tmp_topn AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY HASH_CODE(shop_id)%128 ORDER BY sales DESC) AS rownum FROM shop_sales) WHERE rownum <= 10 SELECT * FROM ( SELECT shop_id, shop_name, sales, ROW_NUMBER() OVER (ORDER BY sales DESC) AS rownum FROM tmp_topn) WHERE rownum <= 10

总结

流式 TopN 不只在语法以及算法上会遇到不少挑战,在不一样场景下的优化方案也是个很是有意思的话题。目前 Flink SQL 的 TopN 功能已经大量应用于彩票业务、阿里云的CDN项目、WAF项目等等。将来,咱们除了会针对更多的场景对 TopN 进行优化,还会提供除了 ROW_NUMBER 外的 RANKRANK_DENSE 排名函数,使得 TopN 更加灵活

相关文章
相关标签/搜索