不管在 OLAP 仍是 OLTP 领域,Join 都是业务常会涉及到且优化规则比较复杂的 SQL 语句。对于离线计算而言,通过数据库领域多年的积累,Join 语义以及实现已经十分红熟,然而对于近年来刚兴起的 Streaming SQL 来讲 Join 却处于刚起步的状态。数据库
其中最为关键的问题在于 Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象倒是无限的数据流,内存压力和计算效率在长期运行来讲都是不可避免的问题。下文将结合 SQL 的发展解析 Flink SQL 是如何解决这些问题并实现两个数据流的 Join。编程
传统的离线 Batch SQL (面向有界数据集的 SQL)有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。缓存
相对于离线的 Join,实时 Streaming SQL(面向无界数据集的 SQL)没法缓存全部数据,所以 Sort-Merge Join 要求的对数据集进行排序基本是没法作到的,而 Nested-loop Join 和 Hash Join 通过必定的改良则能够知足实时 SQL 的要求。
咱们经过例子来看基本的 Nested Join 在实时 Streaming SQL 的基础实现(案例及图来自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。安全
Table A 有 一、42 两个元素,Table B 有 42 一个元素,因此此时的 Join 结果会输出 42。app
接着 Table B 依次接受到三个新的元素,分别是 七、三、1。由于 1 匹配到 Table A 的元素,所以结果表再输出一个元素 1。运维
随后 Table A 出现新的输入 二、三、6,3 匹配到 Table B 的元素,所以再输出 3 到结果表。ide
能够看到在 Nested-Loop Join 中咱们须要保存两个输入表的内容,而随着时间的增加 Table A 和 Table B 须要保存的历史数据无止境地增加,致使很不合理的内存磁盘资源占用,并且单个元素的匹配效率也会愈来愈低。相似的问题也存在于 Hash Join 中。oop
那么有没有可能设置一个缓存剔除策略,将没必要要的历史数据及时清理呢?答案是确定的,关键在于缓存剔除策略如何实现,这也是 Flink SQL 提供的三种 Join 的主要区别。优化
Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和更新都会对全局可见,影响以后全部的 Join 结果。举例,在一个以下的 Join 查询里,Orders 表的新纪录会和 Product 表全部历史纪录以及将来的纪录进行匹配。ui
由于历史数据不会被清理,因此 Regular Join 容许对输入表进行任意种类的更新操做(insert、update、delete)。然而由于资源问题 Regular Join 一般是不可持续的,通常只用作有界数据流的 Join。
Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并能够被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间能够指计算发生的系统时间(即 Processing Time),也能够指从数据自己的时间字段提取的 Event Time。若是是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;若是是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。
以更经常使用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询以下:
这个查询会为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界(图4)。
并为 Shipmenets 表设置了 s.shiptime >= o.ordertime 的时间下界(图5)。
所以两个输入表都只须要缓存在时间下界以上的数据,将空间占用维持在合理的范围。
不过虽然底层实现上没有问题,但如何经过 SQL 语法定义时间还是难点。尽管在实时计算领域 Event Time、Processing Time、Watermark 这些概念已经成为业界共识,但在 SQL 领域对时间数据类型的支持仍比较弱[4]。所以,定义 Watermark 和时间语义都须要经过编程 API 的方式完成,好比从 DataStream 转换至 Table ,不能单纯靠 SQL 完成。这方面的支持 Flink 社区计划经过拓展 SQL 方言来完成,感兴趣的读者能够经过 FLIP-66[7] 来追踪进度。
虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过以后则不可访问。这对于不少 Join 维表的业务来讲是不适用的,由于不少状况下维表并无时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来知足用户需求。
Temporal Table Join 相似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者通常是纬度表的 changelog,后者通常是业务数据流,典型状况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,因此又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(一般就是 Event Time 时间字段),以反映记录在不一样时间的内容。
好比典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,须要和 RatesHistory 汇率流进行 Join。RatesHistory 表明不一样货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容以下:
![图6. Temporal Table Join Example]](https://upload-images.jianshu...
咱们将 RatesHistory 注册为一个名为 Rates 的 Temporal Table,设定主键为 currency,版本字段为 time。
![图7. Temporal Table Registration]](https://upload-images.jianshu...
此后给 Rates 指定时间版本,Rates 则会基于 RatesHistory 来计算符合时间版本的汇率转换内容。
![图8. Temporal Table Content]](https://upload-images.jianshu...
在 Rates 的帮助下,咱们能够将业务逻辑用如下的查询来表达:
值得注意的是,不一样于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录均可以与另外一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另外一表在该时间节点之前的记录是不可见的。这意味着咱们只须要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。由于 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据能够安全地被清理掉。
实时领域 Streaming SQL 中的 Join 与离线 Batch SQL 中的 Join 最大不一样点在于没法缓存完整数据集,而是要给缓存设定基于时间的清理条件以限制 Join 涉及的数据范围。根据清理策略的不一样,Flink SQL 分别提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 来应对不一样业务场景。
另外,尽管在实时计算领域 Join 能够灵活地用底层编程 API 来实现,但在 Streaming SQL 中 Join 的发展仍处于比较初级的阶段,其中关键点在于如何将时间属性合适地融入 SQL 中,这点 ISO SQL 委员会制定的 SQL 标准并无给出完整的答案。或者从另一个角度来说,做为 Streaming SQL 最先的开拓者之一,Flink 社区很适合探索出一套合理的 SQL 语法反过来贡献给 ISO。
做者介绍:林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工做,目前专一于 Apache Flink 的开发及应用。探究问题原本就是一种乐趣。
本文做者:林小铂
本文为阿里云内容,未经容许不得转载。