Table API和SQL经过join API集成在一块儿,这个join API的核心概念是Table,Table能够做为查询的输入和输出。这篇文档展现了使用Table API和SQL查询的程序的通用结构,如何注册一个Table,如何查询一个Table以及如何将数据发给Table。html
全部批处理和流处理的Table API、SQL程序都有以下相同的模式,下面例子的代码展现了Table API和SQL程序的通用结构:react
// 对于批处理程序来讲使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 建立一个TableEnvironment // 对于批处理程序来讲使用 BatchTableEnvironment 替换 StreamTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 注册一个 Table tableEnv.registerTable("table1", ...) // 或者 tableEnv.registerTableSource("table2", ...); // 或者 tableEnv.registerExternalCatalog("extCat", ...); // 从Table API的查询中建立一个Table Table tapiResult = tableEnv.scan("table1").select(...); // 从SQL查询中建立一个Table Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... "); // 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是同样的 tapiResult.writeToSink(...); // 执行 env.execute();
注意:Table API 和 SQL查询能够轻易地进行集成并嵌入到DataStream或者DataSet程序中,请参考Integration With DataStream and DataSet API部分来了解DataStream和DataSet如何转换成Table及Table如何转换成DataStream和DataSet。sql
TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
一、在内部目录中注册一个Table
二、注册一个外部目录
三、执行SQL查询
四、注册一个用户自定义函数(标量、表及聚合)
五、将DataStream或者DataSet转换成Table
六、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table老是会绑定到一个指定的TableEnvironment中,相同的查询不一样的TableEnvironment是没法经过join、union合并在一块儿。数据库
TableEnvironment能够经过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()
来建立。TableConf能够用来配置TableEnvironment或者自定义查询优化器和翻译过程(参考查询优化器)apache
// *************** // STREAMING QUERY // *************** StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // 为streaming查询建立一个 TableEnvironment StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); // *********** // BATCH QUERY // *********** ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); // 为批查询建立一个 TableEnvironment BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // 为流查询建立一个 TableEnvironment val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // 为批查询建立一个 TableEnvironment val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
TableEnvironment有一个在内部经过表名组织起来的表目录,Table API或者SQL查询能够访问注册在目录中的表,并经过名称来引用它们。
TableEnvironment容许经过各类源来注册一个表:
一、一个已存在的Table对象,一般是Table API或者SQL查询的结果
二、TableSource,能够访问外部数据如文件、数据库或者消息系统
三、DataStream或者DataSet程序中的DataStream或者DataSetapi
将DataStream或者DataSet注册为一个表将在Integration With DataStream and DataSet API中讨论。安全
一个Table能够在TableEnvironment中按照下面程序注册:app
// 获取一个 StreamTableEnvironment, BatchTableEnvironment也是一样的方法 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Table 是简单的投影查询的结果 Table projTable = tableEnv.scan("X").project(...); // 将 Table projTable 注册为表 "projectedX" tableEnv.registerTable("projectedTable", projTable); // 获取一个TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // Table 是简单的投影查询的结果 val projTable: Table = tableEnv.scan("X").project(...) // 将 Table projTable 注册为表 "projectedX" tableEnv.registerTable("projectedTable", projTable)
注意:一个注册的Table被当作是与关系型数据库中的视图相似,即定义Table的查询不会被优化,可是当其余查询引用到已注册的Table时会被内联。若是多个查询引用同一个已注册的Table,这个Table会跟每一个查询内联并进行屡次执行,即:已注册的Table的结果不会共享。函数
TableSource能够访问保存在外部存储系统如数据库系统(MySQL、HBase...),指定编码格式的文件(CSV, Apache [Parquet, Avro, ORC],...)或者消息系统(Apache Kafka,RabbitMQ,...)中的数据。测试
Flink的目标是为通用的数据格式和存储系统提供TableSource,请参考Table Sources和Sinks页来了解Flink所支持的TableSource列表及如何自定义一个TableSource。
一个TableSource能够在TableEnvironment中按以下方式来定义:
// 获取一个StreamTableEnvironment, 一样适用于BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 建立一个 TableSource TableSource csvSource = new CsvTableSource("/path/to/file", ...); // 将TableSource注册为表 "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource); // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 建立一个TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // 将 TableSource 注册为表 "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)
一个外部目录提供了关于外部数据库和表的信息如:它们的名称、模式、统计及如何访问保存在外部数据库、表和文件中的数据。
一个外部目录能够经过实现ExternalCatalog接口来建立并在TableEnvironment中注册,以下:
// 获取一个 StreamTableEnvironment, 一样适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 建立一个外部catalog ExternalCatalog catalog = new InMemoryExternalCatalog(); // 注册 ExternalCatalog tableEnv.registerExternalCatalog("InMemCatalog", catalog); // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 建立一个 catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog // 注册 ExternalCatalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦在TableEnvironment中注册以后,全部定义在ExternalCatalog中的表均可以经过指定全路径如:catalog.database.table
在Table API或者SQL查询来访问。
目前,Flink提供InMemoryExternalCatalog来作demo或者测试。然而,ExternalCatalog接口还能够被用来链接HCatalog或者Metastore到Table API。
Table API是一个Scala和Java的语言集成查询API,与SQL相反,查询并不指定为字符串而是根据主机语言一步一步的构建。
Table API是基于Table类来的,Table类表明了一个流或者批表,而且提供方法来使用关系型操做。这些方法返回一个新的Table对象,这个Table对象表明着输入的Table应用关系型操做后的结果。一些关系型操做是由多个方法调用组成的如:table.groupBy(...).select()
, 其中groupBy(...)
指定了table的分组,而select(...)
则是table分组的映射。
Table API文档描述了streaming和batch表所支持的全部Table API操做。
下面的例子展现了一个简单的Table API聚合查询:
// 获取一个 StreamTableEnvironment, 一样适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 注册一个名叫 Orders 的表 // 扫描注册的 Orders 表 Table orders = tableEnv.scan("Orders"); // 计算全部来自法国的客户的收入 Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); // 发射或者转换一个 Table // 执行查询 // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 注册一个名叫 Orders 的表 // 扫描已注册的 Orders 表 Table orders = tableEnv.scan("Orders") // 计算全部来自法国偶的客户的收入 Table revenue = orders .filter('cCountry === "FRANCE") .groupBy('cID, 'cName) .select('cID, 'cName, 'revenue.sum AS 'revSum) // 发射或者转换一个Table // 执行查询
注意:Scala Table API使用Scala的符号在引用表属性时,以'`'开始,Table API使用Scala的隐式转换,为了使用Scala的隐式转换,请确保导入org.apache.flink.api.scala._
和org.apache.flink.table.api.scala._
。
Flink的SQL集成是基于Apache Calcite的,Apache Calcite实现了标准的SQL,SQL查询被指定为常规字符串。
SQL文档描述了Flink对流和批表的SQL支持。
下面的例子展现了如何指定一个查询并返回一个Table结果;
// 获取一个 StreamTableEnvironment, 一样适用于BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 注册一个名叫Orders 的表 // 计算全部来自法国的客户的收入 Table revenue = tableEnv.sql( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // 发射或者转换一个Table // 执行查询 // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //注册一个名叫 Orders的表 // 计算全部来自法国的客户的收入 Table revenue = tableEnv.sql(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin) // 发射或者转换 Table // 执行查询
Table API和SQL查询能够很容易地合并由于它们都返回Table对象:
一、Table API查询能够基于SQL查询结果的Table来进行
二、SQL查询能够基于Table API查询的结果来定义
为了发射一个Table,能够将其写入一个TableSink中,TableSink 是支持各类文件格式(如:CSV, Apache Parquet, Apache Avro)、存储系统(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系统(如:Apache Kafka,RabbitMQ)的通用接口。
一个批Table只能写入BatchTableSink中,而流Table须要一个AppendStreamTableSink
、RetractStreamTableSink
或者UpsertStreamTableSink
请参考Table Sources & Sinks文档来了解更多可用sink的信息和如何实现一个自定义的TableSink。
// 获取一个StreamTableEnvironment, 一样适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 使用Table API和/或SQL查询获取一个 Table Table result = ... // 建立一个TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // 将结果Table写入TableSink中 result.writeToSink(sink); // 执行程序 // 获取一个TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 使用Table API和/或SQL查询获取一个 Table val result: Table = ... //建立一个 TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") // 将结果 Table写入TableSink中 result.writeToSink(sink) // 执行程序
Table API和SQL查询根据输入是流仍是批翻译成DataStream或者DataSet,查询内部表示为一个逻辑查询计划,并分两个阶段进行翻译:
一、优化逻辑计划
二、翻译成一个DataStream或者DataSet程序
Table API或者SQL查询会在下面状况下触发:
当调用Table.writeToSink()
时,Table会发射到TableSink中
Table转换DataStream或者DataSet时(参考与DataStream和DataSet API集成)
一旦翻译,Table API或者SQL查询就会像常规DataStream或DataSet处理同样,而且当StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
调用时执行。
Table API和SQL查询能够很容易地进行集成并嵌入到DataStream和DataSet程序中。例如:咱们能够查询一个外部表(如:来自关系型数据库的表)、作一些预处理,如过滤、映射、聚合或者与元数据关联,而后使用DataStream或者DataSet API(及其余基于这些API的库,如CEP或Gelly)进行进一步处理。一样,Table API或者SQL查询也能够应用于DataStream或者DataSet程序的结果中。
这种交互能够经过将DataStream或者DataSet转换成一个Table及将Table转换成DataStream或者DataSet来实现。在本节,咱们将描述这些转换是如何完成的。
Scala 隐式转换
Scala Table API为DataSet、DataStream和Table类提供了隐式转换功能。这些转换能够经过导入Scala DataStream API中的org.apache.flink.table.api.scala._
和org.apache.flink.api.scala._
包来启用。
一个DataStream或者DataSet能够在TableEnvironment中注册为Table,表的结果模式根据注册的DataStream或者DataSet的数据类型来定。请参考数据类型映射到表模式来了解更详细的信息。
// 获取 StreamTableEnvironment // 注册一个DataSet 到BatchTableEnvironment也是等效的 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // 注册DataStream 为表 "myTable" ,并有两个字段 "f0", "f1" tableEnv.registerDataStream("myTable", stream); // 注册 DataStream 为表 "myTable2" 并有两个字段 "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, "myLong, myString"); // 获取 TableEnvironment // 注册一个 DataSet 是等价的 val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // 注册 DataStream 为表 "myTable" 并有两个字段 "f0", "f1" tableEnv.registerDataStream("myTable", stream) // 注册 DataStream 为 "myTable2" 并有两个字段 "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
Table能够转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就能够基于Table API或者SQL查询的结果来执行了。
当将一个Table转换为DataStream或者DataSet时,你须要指定生成的DataStream或者DataSet的数据类型,即须要转换表的行的数据类型,一般最方便的转换类型是Row,下面列表概述了不一样选项的功能:
一、Row:字段经过位置映射、能够是任意数量字段,支持空值,非类型安全访问
二、POJO:字段经过名称(POJO字段做为Table字段时,必须命名)映射,能够是任意数量字段,支持空值,类型安全访问
三、Case Class:字段经过位置映射,不支持空值,类型安全访问
四、Tuple:字段经过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问
五、Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。
将Table转换为DataStream
流式查询的结果Table会被动态地更新,即每一个新的记录到达输入流时结果就会发生变化。所以,转换此动态查询的DataStream须要对表的更新进行编码。
有两种模式来将Table转换为DataStream:
一、Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,以前发射的结果不会被更新。
二、Retract Mode:始终均可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。
// 获取一个 StreamTableEnvironment. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 有两个字段(String name, Integer age)的Table Table table = ... // 经过指定类将Table转换为Row的Append DataStream DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // 经过一个TypeInformation将Table转换为Tuple2<String, Integer> 类型的Append DataStream TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // 将Table转换为Row的react形式的DataStream // 一个reactDataStream的类型X为 DataStream<Tuple2<Boolean, X>>. // boolean字段指定了更改的类型. // True 是 INSERT, false 是 DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); // get TableEnvironment. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有关动态表及其属性的详细讨论在Streaming Queries文档中给出。
将Table转换为DataSet
Table能够按照以下方式转换为DataSet:
// 获取 BatchTableEnvironment BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 有两个字段(String name, Integer age)的Table Table table = ... // 经过指定类将Table转换为Row类型的DataSet DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // 经过TypeInformation 将Table转换为Tuple2<String, Integer>类型的DataSet TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // 获取 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 有两个字段(String name, Integer age)的Table val table: Table = ... // 将Table转换为Row类型的DataSet val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // 将Table转换为Tuple2[String, Int]类型的DataSet val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
Flink的DataStream和DataSet API支持多种数据类型,如Tuple,POJO, case class及原始数据类型。接下来咱们描述Table API如何将这些类型转换为内部行表示及展现将DataStream转换为Table的例子。
原子类型
Flink将原生类型(如:Integer, Double, String)或者通用类型(不能再被分析或者分解的类型)视为原子类型,一个原子类型的DataStream或者DataSet能够转换为只有一个属性的Table,属性的类型根据原子类型推算,而且必须得指定属性的名称。
// 获取一个 StreamTableEnvironment, 一样原理适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Long> stream = ... // 将 DataStream转换为具备属性"myLong"的Table Table table = tableEnv.fromDataStream(stream, "myLong"); // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[Long] = ... // 将 DataStream 转换为具备属性'myLong的Table val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支持)和Case Class(仅Scala支持)
Flink支持Scala内置的Tuple和Flink为Java提供的Tuple,DataStream和DataSet类型的Tuple均可以被转换为表。字段能够经过为全部字段(经过位置来映射)提供的名称来重命名,若是没有为字段指定名称的话,就会采用默认的字段名。
// 获取一个 StreamTableEnvironment, 一样适用于BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // 将 DataStream为具备字段名为"myLong", "myString"的Table Table table1 = tableEnv.fromDataStream(stream, "myLong, myString"); // 将 DataStream 转换为具备默认字段名 "f0", "f1"的 Table Table table2 = tableEnv.fromDataStream(stream); //获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // 将 DataStream 转换为具备字段名 'myLong, 'myString' 的Table val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) // 将 DataStream 转换为具备默认字段名 '_1, '_2的Table val table2: Table = tableEnv.fromDataStream(stream) // 定义一个 case class case class Person(name: String, age: Int) val streamCC: DataStream[Person] = ... // 将 DataStream 转换为具备默认字段名 'name, 'age'的Table val tableCC1 = tableEnv.fromDataStream(streamCC) // 将 DataStream 转换为具备字段名 'myName, 'myAge'的Table val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)
Flink支持使用POJO做为复合类型,决定POJO规则的文档请参考这里。
当将一个POJO类型的DataStream或者DataSet转换为Table而不指定字段名称时,Table的字段名称将采用JOPO原生的字段名称做为字段名称。重命名原始的POJO字段须要关键字AS,由于POJO没有固定的顺序,名称映射须要原始名称而且不能经过位置来完成。
//获取一个 StreamTableEnvironment, 一样原理适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Person 是一个有两个字段"name" and "age" 的POJO DataStream<Person> stream = ... // 将 DataStream 转换为有字段 "name", "age" 的Table Table table1 = tableEnv.fromDataStream(stream); // 将 DataStream 转换为有字段 "myName", "myAge" 的Table Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // Person 是一个有字段 "name" and "age" 的POJO val stream: DataStream[Person] = ... // 将 DataStream 转换为具备字段 'name, 'age' 的Table val table1: Table = tableEnv.fromDataStream(stream) // 将 DataStream 转换为具备字段 'myName, 'myAge' 的Table val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row
Row数据类型支持任意数量的字段,而且字段能够是null
值,字段名称能够经过RowTypeInformation来指定或者将一个Row DataStream或者DataSet转换为Table时(根据位置)指定。
// 获取一个 StreamTableEnvironment, 一样原理适用于 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream DataStream<Row> stream = ... // 将 DataStream 转换为具备字段 "name", "age" 的Table Table table1 = tableEnv.fromDataStream(stream); // 将 DataStream 转换为具备字段 "myName", "myAge" 的Table Table table2 = tableEnv.fromDataStream(stream, "myName, myAge"); // 获取一个 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream val stream: DataStream[Row] = ... // 将 DataStream 转换为具备字段 'name, 'age' 的Table val table1: Table = tableEnv.fromDataStream(stream) // 将 DataStream 转换为具备字段 'myName, 'myAge' 的Table val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
Apache Flink使用Apache Calcite来优化和翻译查询,当前的查询优化包括投影、过滤下推、子查询去相关及各类形式的查询重写。Flink不去优化join的顺序,可是会根据它们的顺序去执行(FROM子句中表的顺序或者WHERE子句中链接谓词的顺序)。
能够经过提供一个CalciteConfig对象来调整在不一样阶段应用的优化规则集,这个能够经过调用CalciteConfig.createBuilder())
得到的builder来建立,而且能够经过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)
来提供给TableEnvironment。
解析一个Table
Table API为计算一个Table提供了一个机制来解析逻辑和优化查询计划,这个能够经过调用TableEnvironment.explain(table)
方法来完成,它会返回描述三个计划的字符串:
一、关系查询语法树,即未优化的查询计划
二、优化后的逻辑查询计划
三、物理执行计划
如下代码显示了一个示例和相应的输出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello")); DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello")); Table table1 = tEnv.fromDataStream(stream1, "count, word"); Table table2 = tEnv.fromDataStream(stream2, "count, word"); Table table = table1 .where("LIKE(word, 'F%')") .unionAll(table2); String explanation = tEnv.explain(table); System.out.println(explanation); val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1 .where('word.like("F%")) .unionAll(table2) val explanation: String = tEnv.explain(table) println(explanation)
输出以下:
== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, 'F%')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableScan(table=[[_DataStreamTable_1]]) == Optimized Logical Plan == DataStreamUnion(union=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')]) DataStreamScan(table=[[_DataStreamTable_0]]) DataStreamScan(table=[[_DataStreamTable_1]]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Data Source content : collect elements with CollectionInputFormat Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE Stage 4 : Operator content : where: (LIKE(word, 'F%')), select: (count, word) ship_strategy : FORWARD Stage 5 : Operator content : from: (count, word) ship_strategy : REBALANCE