【翻译】Flink Table Api & SQL —Streaming 概念 —— 时态表

本文翻译自官网: Temporal Tables https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.htmlhtml

Flink Table Api & SQL 翻译目录sql

时态表(注:Temporal Table , 我翻译为时态表,能够访问表在不一样时间的内容)表示一直在修改的表上的(参数化)视图的概念,该视图返回表在特定时间点的内容。数据库

更改表能够是跟踪表的修改历史(例如,数据库更改日志),也能够是维表的具体修改(例如,数据库表)。apache

对于表的历史修改,Flink能够跟踪修改,并容许在查询中访问表的特定时间点的内容。 在Flink中,这种表由Temporal Table Function表示app

对于变化的维表,Flink容许在查询中的处理时访问表的内容。在Flink中,这种表由Temporal Table 表示ide

设计初衷

与表的修改历史相关

假设咱们有下表 RatesHistory函数

SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

RatesHistory表示一个不断增加的关于日元的货币汇率的附加表(汇率为1)。例如,汇率期间从 09:0010:45的欧元日元的汇率为 114从 10:45 到 11:15 是 116this

假设咱们要在10:58的时间输出全部当前汇率,则须要如下SQL查询来计算结果表:spa

SELECT * FROM RatesHistory AS r WHERE r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = r.currency AND r2.rowtime <= TIME '10:58');

子查询肯定对应货币的最大时间小于或等于所需时间。外部查询列出具备最大时间戳的汇率。 翻译

下表显示了这种计算的结果。 在咱们的示例中,考虑了10:45 时欧元的更新,可是 10:58 时表的版本中未考虑 11:15 时欧元的更新和新的英镑输入。

rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116

时态表的概念旨在简化此类查询,加快其执行速度,并减小Flink的状态使用率。时态表是 append-only 表上的参数化视图,该视图将 append-only 表的行解释为表的变动日志,并在特定时间点提供该表的版本将 append-only 表解释为变动日志须要指定主键属性和时间戳属性。主键肯定覆盖哪些行,时间戳肯定行有效的时间。

在上面的示例中,currency RatesHistory的主键,而且rowtime是timestamp属性。

在Flink中,这由时态表函数表示

与维表变化相关

另外一方面,某些用例须要链接变化的维表,该表是外部数据库表。

假设 LatestRates 是一个以最新汇率实现的表(例如,存储在其中)。LatestRates 是物化的 RatesHistory 历史那么时间 10:58 的 LatestRates 表的内容将是

10:58> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 116

12:00 时 LatestRates的内容将是: 

12:00> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 119 Pounds 108

在Flink中,这由Temporal Table表示

时态表函数

为了访问时态表中的数据,必须传递一个时间属性,该属性肯定将要返回的表的版本。Flink使用表函数的SQL语法提供一种表达它的方法。

定义后,时态表函数将使用单个时间参数timeAttribute并返回一组行。该集合包含相对于给定时间属性的全部现有主键的行的最新版本。

假设咱们Rates(timeAttribute)基于RatesHistory定义了一个时态表函数,咱们能够经过如下方式查询该函数: 

SELECT * FROM Rates('10:15'); rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1 SELECT * FROM Rates('11:00'); rowtime currency rate ======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1

对Rates(timeAttribute)的每一个查询都将返回给定timeAttribute的Rates状态。

注意:当前 Flink 不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能在 join 中使用。上面的示例用于提供有关函数 Rates(timeAttribute)返回内容的直观信息 

另请参阅有关用于持续查询的 join 的页面,以获取有关如何与时态表 join 的更多信息。 

定义时态表函数

如下代码段说明了如何从 append-only 表中建立时态表函数。

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) // Provide a static data set of the rates history table.
val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L)) // Create and register an example table using above data set. // In the real setup, you should replace this with your own table.
val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
 tEnv.registerTable("RatesHistory", ratesHistory) // Create and register TemporalTableFunction. // Define "r_proctime" as the time attribute and "r_currency" as the primary key.
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
tEnv.registerFunction("Rates", rates)                                          // <==== (2)

Line (1)建立了一个 时态表函数 rates ,使咱们能够在 Table API 中使用 rates 函数  

Line (2) 在表环境中以Rates名称注册此函数,这使咱们能够在SQL中使用Rates函数

时态表

注意:仅 Blink planner 支持此功能。

为了访问时态表中的数据,当前必须使用LookupableTableSource定义一个TableSource。 Flink 使用FOR SYSTEM_TIME AS OF 的SQL语法查询时态表,这在SQL:2011中提出。

假设咱们定义了一个时态表 LatestRates,咱们能够经过如下方式查询此类表: 

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15'; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00'; currency rate ======== ====== US Dollar 102 Euro 116 Yen 1

注意:当前,Flink不支持以固定时间直接查询时态表。目前,时态表只能在 join 中使用。上面的示例用于提供有关时态表LatestRates返回内容的直觉

另请参阅有关用于持续查询的 join 的页面,获取有关如何与时态表 join 更多信息。

定义时态表

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // Create an HBaseTableSource as a temporal table which implements LookableTableSource // In the real setup, you should replace this with your own table.
val rates = new HBaseTableSource(conf, "Rates") rates.setRowKey("currency", String.class)   // currency as the primary key
rates.addColumn("fam1", "rate", Double.class) // register the temporal table into environment, then we can query it in sql
tEnv.registerTableSource("Rates", rates)

另请参阅有关如何定义LookupableTableSource的页面 

 

欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文

相关文章
相关标签/搜索