浅析 Flink Table/SQL API

从何而来

关系型API有不少好处:是声明式的,用户只须要告诉须要什么,系统决定如何计算;用户没必要特意实现;更方便优化,能够执行得更高效。自己Flink就是一个统一批和流的分布式计算平台,因此社区设计关系型API的目的之一是可让关系型API做为统一的一层,两种查询拥有一样的语义和语法。大多数流处理框架的API都是比较low-level的API,学习成本高并且不少逻辑须要写到UDF中,因此Apache Flink 添加了SQL-like的API处理关系型数据--Table API。这套API中最重要的概念是Table(能够在上面进行关系型操做的结构化的DataSet或DataStream)。Table API 与 DataSetDataStream API 结合紧密,DataSet 和 DataStream均可以很容易地转换成 Table,一样转换回来也很方便:javascript

val execEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// obtain a DataSet from somewhere
val tempData: DataSet[(String, Long, Double)] =

// convert the DataSet to a Table
val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF)
// compute your result
val avgTempCTable: Table = tempTable
 .where('location.like("room%"))
 .select(
   ('time / (3600 * 24)) as 'day, 
   'Location as 'room, 
   (('tempF - 32) * 0.556) as 'tempC
  )
 .groupBy('day, 'room)
 .select('day, 'room, 'tempC.avg as 'avgTempC)
// convert result Table back into a DataSet and print it
avgTempCTable.toDataSet[Row].print()复制代码

example使用的是Scala的API,Java版API也有一样的功能。html

下图展现了 Table API 的架构:java

从 DataSet 或 DataStream 建立一个 Table,而后在上面进行关系型操做好比 fliterjoinselect。对Table的操做将会转换成逻辑运算符树。Table 转换回 DataSet 和 DataStream 的时候将会转换成DataSet 和 DataStream的算子。有些相似 'location.like("room%") 的表达式将会经过 code generation 编译成Flink的函数。sql

然而,最初传统的Table API 有必定的限制。首先,它不能独立使用。Table API 的 query 必须嵌入到 DataSet 或 DataStream的程序中。对批处理表的查询不支持outer joinsorting和不少SQL中常见的标量函数。对于流处理的查询只支持filtetr unionprojection,不支持aggregationjoin。并且,转换过程当中没有利用太多查询优化技术,除了适用于全部DataSet程序的优化。数据库

Table API 和 SQL 紧密结合

随着流处理的日益普及和Flink在该领域的增加,Flink社区认为须要一个更简单的API使更多的用户可以分析流数据。一年前Flink社区决定将Table API提高到一个新的层级,扩展Table API中流处理的能力以及支持SQL。社区不想重复造轮子,因而决定在 Apache Calcite (一个比较流行的SQL解析和优化框架)的基础上构建新的 Table API。Apache Calcite 被用在不少项目中,包括 Apache Hive,Apache Drill,Cascading等等。除此以外,Calcite社区将 SQL on Stream 写入它的roadmap,因此Flink的SQL很适合和它结合。apache

以Calcite为核心的新架构图:编程

新架构提供两种API进行关系型查询,Table API 和 SQL。这两种API的查询都会用包含注册过的Table的catalog进行验证,而后转换成统一Calcite的logical plan。在这种表示中,stream和batch的查询看起来彻底同样。下一步,利用 Calcite的 cost-based 优化器优化转换规则和logical plan。根据数据源的性质(流式和静态)使用不一样的规则进行优化。最终优化后的plan转传成常规的Flink DataSet 或 DataStream 程序。这步还涉及code generation(将关系表达式转换成Flink函数)。api

下面咱们举一个例子来理解新的架构。表达式转换成Logical Plan以下图所示:架构

调用Table API 其实是建立了不少 Table API 的 LogicalNode,建立的过程当中对会对整个query进行validate。好比table是CalalogNode,window groupBy以后在select时会建立WindowAggregateProject,where对应Filter。而后用RelBuilder翻译成Calcite LogicalPlan。若是是SQL API 将直接用Calcite的Parser进行解释而后validate生成Calcite LogicalPlan。框架

利用Calcite内置的一些rule来优化LogicalPlan,也能够本身添加或者覆盖这些rule。转换成Optimized Calcite Plan后,仍然是Calcite的内部表示方式,如今须要transform成DataStream Plan,对应上图第三列的类,里面封装了如何translate成普通的DataStream或DataSet程序。随后调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各类算子。如今就至关于咱们直接利用Flink的DataSet或DataStream API开发的程序。

Table API的新架构除了维持最初的原理还改进了不少。为流式数据和静态数据的关系查询保留统一的接口,并且利用了Calcite的查询优化框架和SQL parser。该设计是基于Flink已构建好的API构建的,DataStream API 提供低延时高吞吐的流处理能力并且就有exactly-once语义并且能够基于event-time进行处理。并且DataSet拥有稳定高效的内存算子和流水线式的数据交换。Flink的core API和引擎的全部改进都会自动应用到Table API和SQL上。

新的SQL接口集成到了Table API中。DataSteam, DataSet和外部数据源能够在TableEnvironment中注册成表,为了是他们能够经过SQL进行查询。TableEnvironment.sql()方法用来声明SQL和将结果做为Table返回。下面的是一个完整的样例,从一个JSON编码的Kafka topic中读取流表,而后用SQL处理并写到另外一个Kafka topic。

// get environments
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// configure Kafka connection
val kafkaProps = ...
// define a JSON encoded Kafka topic as external table
val sensorSource = new KafkaJsonSource[(String, Long, Double)](
    "sensorTopic",
    kafkaProps,
    ("location", "time", "tempF"))

// register external table
tableEnv.registerTableSource("sensorData", sensorSource)

// define query in external table
val roomSensors: Table = tableEnv.sql(
    "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " +
    "FROM sensorData " +
    "WHERE location LIKE 'room%'"
  )

// define a JSON encoded Kafka topic as external sink
val roomSensorSink = new KafkaJsonSink(...)

// define sink for room sensor data and execute query
roomSensors.toSink(roomSensorSink)
execEnv.execute()复制代码

这个样例中忽略了流处理中最有趣的部分:window aggregate 和 join。这些操做如何用SQL表达呢?Apache Calcite社区提出了一个proposal来讨论SQL on streams的语法和语义。社区将Calcite的stream SQL描述为标准SQL的扩展而不是另外的 SQL-like语言。这有不少好处,首先,熟悉SQL标准的人可以在不学习新语法的状况下分析流数据。静态表和流表的查询几乎相同,能够轻松地移植。此外,能够同时在静态表和流表上进行查询,这和flink的愿景是同样的,将批处理看作特殊的流处理(批看做是有限的流)。最后,使用标准SQL进行流处理意味着有不少成熟的工具支持。

下面的example展现了如何用SQL和Table API进行滑动窗口查询:

SQL

SELECT STREAM
  TUMBLE_END(time, INTERVAL '1' DAY) AS day,
  location AS room,
  AVG((tempF - 32) * 0.556) AS avgTempC
FROM sensorData
WHERE location LIKE 'room%'
GROUP BY TUMBLE(time, INTERVAL '1' DAY), location复制代码

Table API

val avgRoomTemp: Table = tableEnv.ingest("sensorData")
  .where('location.like("room%"))
  .partitionBy('location)
  .window(Tumbling every Days(1) on 'time as 'w)
  .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)复制代码

Table API的现状

Batch SQL & Table API 支持:

  • Selection, Projection, Sort, Inner & Outer Joins, Set operations
  • Windows for Slide, Tumble, Session

Streaming Table API 支持:

  • Selection, Projection, Union
  • Windows for Slide, Tumble, Session

Streaming SQL:

  • Selection, Projection, Union, Tumble

Streaming SQL案例

持续的ETL和数据导入

获取流式数据,而后转换这些数据(归一化,聚合...),将其写入其余系统(File,Kafka,DBMS)。这些query的结果一般会存储到log-style的系统。

实时的Dashboards 和 报表

获取流式数据,而后对数据进行聚合来支持在线系统(dashboard,推荐)或者数据分析系统(Tableau)。一般结果被写到k-v存储中(Cassandra,Hbase,可查询的Flink状态),创建索引(Elasticsearch)或者DBMS(MySQL,PostgreSQL...)。这些查询一般能够被更新,改进。

即席分析

针对流数据的即席查询,以实时的方式进行分析和浏览数据。查询结果直接显示在notebook(Apache Zeppelin)中。

Flink社区还提出来和数据库中Materialized View很类似的Dynamic table 动态表概念,将在之后的版本中支持,具体细节将另开文章解释。

原文链接 http://mtunique.com/flink_sql/

相关文章
相关标签/搜索