Flink提供了Table形式和DataStream两种形式,能够根据实际状况本身选择用哪些方式来实现,但实际开发过程当中可能会有需求两种形式互相转换,这里介绍下操做方法java
表能够转换为DataStream或DataSet,这样自定义流处理或批处理程序就能够继续在Table API或SQL查询的结果上运行了
将表转换为DataStream或DataSet时,须要指定生成的数据类型,即要将表的每一行转换成的数据类型
表做为流式查询的结果,是动态更新的
转换有两种转换模式: 追加(Appende)模式和撤回(Retract)模式sql
查看执行计划数据库
Table API提供了一种机制来解释计算表的逻辑和优化查询计划apache
查看执行计划,能够经过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,返回一个字符串,描述三个计划windows
优化的逻辑查询计划
优化后的逻辑查询计划
实际执行计划api
val explaination: String = tableEnv.explain(resultTable)
println(explaination)并发
动态表(Dynamic Tables)
动态表是Flink对流数据的Table API和SQL支持的核心概念
与批处理数据的静态表不一样,动态表是随时间变化的maven
持续查询(Continuous Query)
动态表能够像静态的批处理表 同样进行查询,查询一个动态表会产生持续查询(Continuous Query)
连续查询永远不会终止,并会生成另外一个动态表
查询会不断更新其动态结果表,以反映其动态输入表上的改动ide
动态表和持续查询的转换过程函数
1) 首先输入的流会被转换为动态表,这个动态表只会一直追加
2)对动态表计算连续查询,生成新的动态表
针对以前查询的结果加一个状态,这样子就不用每次从头开始查询,提高效率
3)生成的新的动态表被转换成流而后输出
为了处理带有关系查询的流,必须先将其转换为表
从概念上讲,流的每一个数据记录,都被解释为对结果表的插入修改操做
持续查询会在动态表上作计算处理,并做为结果生成新的动态表
与常规的数据库表同样,动态表能够经过插入(insert)、更新(update)和删除(delete)更改,进行持续的修改
将动态表转换为流或将其写入外部系统时,须要对这些更改进行编码
1,仅追加流(Append-only)
Retract操做,每新增一个是insert+操做,撤回一个是delete - 操做,
这里当Mary第一次来的时候,会有一个insert,当Mary第二次来的时候会触发两个操做,Insert和delete,增长一个mary2, 删掉mary1
时间特性(Time Attributes)
基于时间的操做(好比Table API和SQL中的窗口操做),须要定义相关的时间语义和事件数据来源的信息
Table能够提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳
时间属性,能够是每一个schema的一部分。一旦定义了时间属性,它就能够做为一个字段引用,而且能够在基于时间的操做中使用
时间属性的行为相似于常规时间戳,能够访问,而且进行计算
处理时间语义下,容许表处理程序根据机器的本地时间生成结果。他是时间最简单的概念,既不须要提取时间戳,也不须要生成watermark
几种定义的方法:
一、由DataStream转换成表时指定(最简单的一种)
在定义schema期间,能够使用.proctime,指定字段名定义处理时间字段
这个proctime属性只能经过附加逻辑字段,来拓展物理schema,所以只能在schema定义的末尾定义它
val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'temperature,'timestamp, 'pt.proctime)
增长pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.10.1</version> </dependency>
package com.mafei.apitest.tabletest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row object TimeAndWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置1个并发 //设置处理时间为流处理的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //设置环境信息(能够不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的时候默认是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 设置flink table运行环境 val tableEnv = StreamTableEnvironment.create(env, settings) //流转换成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime) sensorTables.printSchema() sensorTables.toAppendStream[Row].print() env.execute() } }
代码结构及运行效果:
第二种,定义处理时间(Processing Time)
val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由于txt里头是以,分割的跟csv同样,因此能够用oldCsv .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上 .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("tem", DataTypes.DOUBLE()) .field("pt", DataTypes.TIMESTAMP(3)) .proctime() //须要注意,只有输出的sink目标里面实现了DefineRowTimeAttributes才能用,不然会报错,文件中不能,但kafka中是能够用的 ).createTemporaryTable("inputTable")
第三种,定义处理时间(Processing Time)另外一种实现,必须使用blink引擎
val sinkDDlL: String = """ |create table dataTable( | id varchar(20) not null | ts bigint, | temperature double, | pt AS PROCTIME() |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = '/sensor.txt', | 'format.type' = 'csv' |) |""".stripMargin tableEnv.sqlUpdate(sinkDDlL)
这种就不是flink从本地取处理时间了,而是取事件中的时间来处理
事件时间语义,容许表处理程序根据每一个记录中包含的时间生成结果。这样子即便在乱序事件或者延迟事件时,也能够得到正确的结果。
为了处理无序事件,并区分流中的准时和迟到事件;Flink须要从事件数据中,提取时间戳,并用来推动事件时间的进展
定义事件时间有3种方式
第一种,由DataStream转换成表时指定
在DataStream转换成Table,使用rowtime能够定义事件时间属性
//先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(1000L)) { override def extractTimestamp(t: SensorReadingTest5): Long =t.timestamp * 1000L }) //指定watermark //流转换成表,指定处理时间-上面的实现方式 // val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime) //将DataStream转换为Table,并指定事件时间字段 val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.proctime,'temperature) //将DataStream转换为Table,并指定事件时间字段-直接追加字段 val sensorTables = tableEnv.fromDataStream(dataStream,"id","temperature","timestamp","rt".rowtime)
第二种,定义Table Schema时指定
val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由于txt里头是以,分割的跟csv同样,因此能够用oldCsv .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上 .field("id", DataTypes.STRING()) .field("tem", DataTypes.DOUBLE()) .rowtime( new Rowtime() .timestampsFromField("timestamp") //从数据字段中提取时间戳 .watermarksPeriodicBounded(2000) //watermark延迟2秒 ) ).createTemporaryTable("inputTable")
在建立表的DDL中定义
//在建立表的DDL中定义 val sinkDDlL: String = """ |create table dataTable( | id varchar(20) not null | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts)), | watermark for rt as rt - interval '1' second //基于ts减去1秒生成watermark,也就是watermark的窗口时1秒 |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = '/sensor.txt', | 'format.type' = 'csv' |) |""".stripMargin tableEnv.sqlUpdate(sinkDDlL)
时间语义须要配合窗口操做才能发挥真正的做用,
在Table ApI和SQL中,主要有两种窗口
先定义组长什么样子,在根据key进行groupby,最后一步执行聚合函数
根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每一个组的数据执行一次聚合函数
Group Windows是使用window(w:GroupWindow)子句定义的,而且必须由as子句制定一个别名.
为了按照窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段同样引用
val table = input
.window([w;: GroupWindow] as 'w) //定义窗口,别名为w
.groupBy('w,'a) //按照字段a和窗口w分组
.select('a,'b.sum) //聚合操做
Table API提供了一组具备特定语义的预约义window类,这些类会被转换为底层DataStream或DataSet的窗口操做
滚动窗口要用Tumble类来定义
//定义事件时间的滚动窗口(Tumbling Event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)
//定义处理时间的滚动窗口(Tumbling Processing-time window)
.window(Tumble over 10.minutes on 'proctime as 'w)
//定义数据数量的滚动窗口(Tumbling Row-count window)
.window(Tumble over 10.rows on 'proctime as 'w)
10分钟一个滑动窗口,每5分钟滑动一次
//Sliding Event-time window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
//Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
//Sliding Row-count window
.window(Slide over 10.minutes every 5.rows on 'proctime as 'w)
会话窗口要用Session类来定义
//Sesion Evnet-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
//Session Processing-time Window
.window(Session withGap 10.minutes on 'procetime as 'w)
Group Windows定义在SQL查询的Group By子句中
TUMBLE(time_attr, interval)
定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
HOP(time_attr,interval,interval)
定义的一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度
SESSION(time_attr, interval)
定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
Over Windows
针对每一个输入行,计算相邻行范围内的聚合