关系型SQL
与stream processing
对好比下。html
SQL | Stream Processing |
---|---|
有限元组 | 无限元组 |
完整数据集上的查询 | 没法基于全部数据查询 |
查询会结束 | 查询不会结束 |
Materialized View被定义为一条SQL查询,其会缓存查询结果。但当所查询的表(基表)被修改时,缓存的结果将过时。
Eager View Maintenance会更新Materialized View,当基表被更新时,会马上更新Materialized View中缓存的结果。java
Eager View Maintenance和SQL Query在streams
上的关系以下。数据库
INSERT、UPDATE、DELETE
等DML
语句流的结果,被流称为changelog stream。View
,查询须要不断处理changelog stream。streaming SQL
查询结果。动态表是Flink流上Table Api & SQL
的核心概念,其随时间动态变化;apache
连续查询的结果等同在输入表的快照上以批处理模式执行相同查询的结果。缓存
流、动态表、连续查询的关系以下图所示。session
stream
会被转化为动态表。stream
。动态表是一个逻辑概念。 在查询执行期间动态表不必定(彻底)
materialized
。app
为理解动态表和连续查询的概念,假设点击事件流有以下模式。ide
[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]this
为在流上使用关系查询,流须要被转化为表。流的每一个记录被解释为结果表(动态表)上的INSERT
修改,咱们从一个只有INSERT
的changelog
流中构建表。以下图所示,点击事件流被转化为表,表会随着点击事件记录的插入而不断增加。编码
连续查询做用于动态表并又会产生动态表;连续查询不会终止并会根据其输入表(动态表)上的更新来更新其结果表(动态表)。
下面显示在点击事件流上定义的clicks
表上显示两个查询示例。
首先是GROUP-BY COUNT
聚合查询示例。
当查询开始时,clicks
表为空;当第一行插入到clicks
表中时,查询开始计算结果表(动态表),如[Mary, ./home]插入后,结果表包含一行结果[Mary, 1];当插入第二行[Bob, ./cart]时,查询会更新结果表并插入新记录[Bob, 1]。第三行[Mary, ./prod=id=1]插入时,查询会更新结果表中的[Mary, 1]记录,将其更新为[Mary, 2]。最后一行[Liz, 1]插入clicks
表后,也会更新到结果表(插入新记录)。
第二个查询与第一个查询相似,除了用户属性以外,还在小时滚动窗口上对clicks
表进行分组,而后对URL进行计数(基于时间的计算,如窗口基于特殊的时间属性)。
每一个小时查询会计算结果并更新结果表。在cTime
在12:00:00 - 12:59:59
之间,clicks
表存在四条记录,对应的查询计算出两条结果;下个时间窗口(13:00:00 - 13:59:59),clicks
表中存在三条记录,对应的查询计算出两条结果添加值结果表中;当记录插入至clicks
表中后,结果表也会被动态更新。
上述两个查询虽然有些相似(均计算统计聚合分组),但二者也有显著不一样:第一个查询会更新结果表的结果,如定义在结果表上的changelog
流包含INSERT
和UPDATE
;第二个查询仅仅往结果表中添加记录,如定义在结果表上的changelog
流只包含INSERT
。一个查询是否生成仅插入表(INSERT
)或更新表(UPDATE
)有一些含义:生成更新表的查询必需要维护更多状态,将仅插入表转化为流与将更新表转化为流不一样。
不少查询能够等同在流上的连续查询,一些查询因为需维护状态的大小或计算更新代价大致使查询计算代价太大。
user
的url
的count
以即可以增长count
,使得当输入表(左侧表)接收一行新数据时会产生新的结果(右侧表)。若只跟踪注册用户,那么维护cnt
大小代价不会太大(注册用户量不太大)。但若非注册用户也分配惟一的用户名,则随着时间的增长,维护cnt
大小代价将增大,最终致使查询失败。SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
RANK
。一旦clicks
表收到新行,用户的lastAction
被更新而且应该计算新的RANK
。然而因为不存在两行相同RANK
,因此全部较低RANK
的行也须要被更新。SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
动态表可像传统表同样被INSERT、UPDATE、DELETE
修改。可能只有一行的表被持续更新;或者是没有UPDATE、DELETE
更改的只插入表。当将动态表转化为流或将其写入外部系统,这些更改(修改)须要被编码,Flink
的Table API & SQL
支持三种方式编码动态表上的更改(修改)。
INSERT更改
进行修改的动态表可经过发出插入的行来转化为流。Retract流
包含两种类型消息(add消息和retract消息
),经过将动态表的INSERT更改
做为add消息
、将DELETE更改
做为retract消息
、将UPDATE更改
分解为旧记录的retract消息
和新记录的add消息
。下图展现了从动态表转化为retract流
。Upsert流
包含两种类型消息(upset消息和delete消息
),动态表转化为upsert流
须要有主键(可复合),具备主键的动态表经过将INSERT、UPDATE更改
编码为upset消息
,将DELETE更改
编码为delete消息
。upset流
与retract流
主要区别是UPDATE更改
使用单一消息(主键)进行编码,所以效率更高。下图展现了将动态表
转化为upset流
。Event time
类型)。上述时间能够在代码中指明时间特性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default // alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Table API & SQL
中基于时间的操做(如窗口)须要设置时间概念和及其来源信息。所以,tables
能够提供逻辑时间属性
来指示时间并在table
程序中访问相应时间戳。时间属性能够是表模式
的一部分(从DataStream
中建立表时被定义),或在使用TableSource
时被预约义,一旦时间属性被定义,那么其能够做为一个字段被引用或进行基于时间的操做。只要时间属性没有被修改,只是从查询的一部分转发到另外一部分,那么它仍然是一个有效的时间属性。时间属性与常规时间戳相同,可被访问并计算。若是在计算中使用时间属性,那么其将被具象化为常规时间戳,常规时间戳不兼容Flink
的时间和水位系统,所以不能再用于基于时间的操做。
processing time
容许表程序基于本地机器的时间输出结果,它不须要提取时间戳和生成水位,有多种方式定义processing time
属性。
processing time
属性在模式定义时使用.proctime
属性定义,时间属性只能经过额外的逻辑字段扩展物理模式,所以,其可被定义在模式定义的末尾,具体以下。
DataStream<Tuple2<String, String>> stream = ...; // declare an additional logical field as a processing time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime"); WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
processing time
属性可经过实现DefinedProctimeAttribute
接口定义,逻辑时间属性被附加到由TableSource
的返回类型定义的物理模式上。
// define a table source with a processing attribute public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream DataStream<Row> stream = ...; return stream; } @Override public String getProctimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; } } // register table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
Event time
容许表程序根据每条记录中包含的时间输出结果,这样即便在无序事件或晚到事件状况下保持一致结果,当从持久化存储中读取记录时还保证可重放结果。此外,event time
容许批和流环境中的表程序使用统一的语法,流环境中的时间属性能够是批环境中的记录的字段。为处理乱序事件,并区分流中准时和晚到事件,Flink
须要从事件中提取时间戳信息,并在时间戳上进行处理(水位)。event time
属性可被定义在流到表的转化中或者使用TableSource。Table API & SQL
假设在上述两种状况下,都在DataStream API
中生成时间戳和水位。
event time
属性在模式定义时经过.rowtime
属性定义;时间戳和水位必须在转换的DataStream中已被分配;将DataStream
转化为Table
时有以下两种定义时间属性的方式。
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // declare an additional logical field as an event time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime"); // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data"); // Usage: WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
event time
属性可经过实现DefinedRowtimeAttribute
接口定义,逻辑时间属性被附加到由TableSource
的返回类型定义的物理模式上。时间戳和水位必定要在getDataStream
方法返回的流中被分配。
// define a table source with a rowtime attribute public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream // ... // extract timestamp and assign watermarks based on knowledge of the stream DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...); return stream; } @Override public String getRowtimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; } } // register the table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
无论输入是有界批量输入仍是无界流输入,Table API & SQL
查询都有相同的语义。在不少状况下,流上的连续查询与离线计算具备相同准确的结果。然而,在实际状况下连续查询必需要限制其所维护状态的大小以免使用完存储空间,并可以在长时间处理无限流数据。所以,连续查询可能只能根据输入数据的特征和查询自己提供近似准确的结果。
Flink Table API & SQL
接口提供参数调整连续查询的准确性和资源消耗。参数经过QueryConfig
对象定义,QueryConfig
对象可经过TableEnvironment
获取并在翻译表时被传回。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // obtain query configuration from TableEnvironment StreamQueryConfig qConfig = tableEnv.queryConfig(); // set query parameters qConfig.withIdleStateRetentionTime(Time.hours(12)); // define query Table result = ... // create TableSink TableSink<Row> sink = ... // emit result Table via a TableSink result.writeToSink(sink, qConfig); // convert result Table into a DataStream<Row> DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
下面描述了QueryConfig
的参数如何影响查询的准确性和资源消耗的。
不少查询在一个或多个关键属性上聚合或链接记录(如典型的聚合查询),当在流上执行该查询时,连续查询须要维护记录或保持每一个键的部分结果。若涉及到流的关键域(活动键值随时间会变化),随着不一样键被观察,连续查询会积累愈来愈多的状态。然而,在一段时间后键将变得不活动时,它们的对应状态将变得过时和无效。以下查询示例中计算每一个session
的clicks
数量。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId
被做为分组键,连续查询会为每一个sessionId
维护clicks
数量。sessionId
属性随着时间推移而变化,sessionId
值仅在session
结束前处于活动状态(保持一段时间)。然而,因为不清楚sessionId
属性,连续查询指望每一个sessionId
值在任什么时候间都有效,即会维护全部sessionId
的值。这样会致使随着时间的推移,所维护的sessionId
愈来愈多。
空闲状态保留时间参数定义键的状态不被更新,在删除以前保留多长时间。在上述查询中,sessionId
的计数在指定的配置时间内未被更新时将被移除。当键会移除后再次被添加,那么键将会被当成新的键(如上述示例中又会开始计0)。有两个参数配置空闲状态保留时间,最小空闲状态保留时间和最大空闲状态保留时间。
StreamQueryConfig qConfig = ... // set idle state retention time: min = 12 hour, max = 16 hours qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16)); // set idle state retention time. min = max = 12 hours qConfig.withIdleStateRetentionTime(Time.hours(12);
配置不一样的最小和最大空闲状态保留时间的效率更高,由于它减小了查询内部簿记什么时候删除状态的次数。
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html