动态表的概念是社区很早就提出的但并无所有实现,下文中全部介绍都是基于已有规划和proposal给出的,可能与以后实现存在出入仅供参考复制代码
动态表直观上看是一个相似于数据库中的Materialized View
概念。动态表随着时间改变;相似静态的batch table同样能够用标准SQL进行查询而后一个新的动态表;能够和流无损地互相转换(对偶的)。对现有的API最大的改进关键在表的内容随着时间改变,而如今的状态只是append。当前的streaming table能够认为是一种动态表,append模式的动态表。javascript
流被转换成Table时决定选择哪一种模式是依据表的schema是否认义primary key。java
若是表的schema没有包括key的定义那转换成表时采用append模式。把流中每条新来的record当作新的row append到表中。一旦数据加到表中就不能再被更新和删除(指当前表中,不考虑转换成新表)。sql
相对应,若是定义了key,那么对于流中的每条记录若是key不在表中就insert不然就update。数据库
表到流的操做是把表的全部change以changelog stream的方式发送到下游。这一步也有两种模式。windows
traction模式中对于Dynamic Table的insert和delete的change分别产生insert或delete event。若是是update的change会产生两种change event,对于以前发送出去的一样key的record会产生delete event,对于当前的record是产生insert event。以下图所示:app
update模式依赖Dynamic Table定义了key。全部的change event是一个kv对。key对应表的key在当前record中的值;对于insert和change value对应新的record。对于delete value是空表示该能够已经被删除。以下图所示:dom
表的内容随着时间改变意味着对表的query结果也是随着时间改变的。咱们定义:优化
举个例子来理解动态表的概念:ui
因为流是无限的,相对应 Dynamic Table 也是无界的。当查询无限的表的时候咱们须要保证query的定时是良好的,有意义可行的。spa
1.在实践中Flink将查询转换成持续的流式应用,执行的query仅针对当前的逻辑时间,因此不支持对于任意时间点的查询(A[t])。
2.最直观的原则是query可能的状态和计算必须是有界的,因此能够支持可增量计算的查询:
Q(t+1) = q'(Q(t), c(T, t, t+1))
,其中Q(t)是query q的前一次查询结果,c(T, t, t_+1) 是表T从t+1到t的变化, q'是q的增量版本。Q(t+1) = q''(c(T, t-x, t+1)) ∪ Q(t)
,q''是不须要时间t时q的结果增量版本query q。c(T, t-x, t+1)是表T尾部的x+1个数据,x取决于语义。例如最后一小时的window aggregation至少须要最后一小时的数据做为状态。其余能支持的查询类型还有:单独在每一行上操做的SELECT WHERE;rowtime上的GROUP BY子句(好比基于时间的window aggregate);ORDER BY rowtime的OVER windows(row-windows);ORDER BY rowtime。如上文所说的,某些增量查询须要保留一些数据(部分输入数据或者中间结果)做为状态。为了保证query不会失败,保证查询所须要的空间是有界的不随着时间无限增加很重要。主要有两个缘由使得状态增加:
虽然第二种状况可有经过下文提到的"Last Result Offset"参数解决,可是第一种状况须要优化器检测。咱们应该拒毫不受时间限制的中间状态增加的查询。优化器应该提供如何修复查询且要求有适当的时间谓词。好比下面这个查询:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, page复制代码
随着用户数和页面数的增加,中间状态会数据随着时间推移而增加。对于存储空间的要求能够经过添加时间谓词来限制:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour
GROUP BY user, page复制代码
由于不是全部属性都是不断增加的, 所以能够告诉优化器domain的size, 就能够推断中间状态不会随着时间推移而增加,而后接受没有时间谓词的查询。
val sensorT: Table = sensors
.toTable('id, 'loc, 'stime, 'temp)
.attributeDomain('loc, Domain.constant) // domain of 'loc is not growing
env.registerTable("sensors", sensorT)
SELECT loc, AVG(temp) AS avgTemp
FROM sensors
GROUP BY loc复制代码
一些关系运算符必须等数据到达才能计算最终结果。例如:在10:30关闭的窗口至少要等到10:30才能计算出最终的结果。Flink的logical clock(即 决定什么时候才是10:30)取决于使用event time 仍是 processing time。在processing time的状况下,logical time是每一个机器的wallclock;在event time的状况下,logical clock time是由源头提供的watermark决定的。因为数据的乱序和延迟当在event time模式下时等待一段时间来减少计算结果不完整性。另外一方面某些状况下但愿获得不断改进的早期结果。所以对于结果被计算、改进或者作出最终结果时有不一样的要求、
下图描绘了不一样的配置参数如何用于控制早期结果和细化计算结果的。