做者:程鹤群(军长)java
文章概述:本文主要包含三部分:第一部分,主要介绍什么是 Table API,从概念角度进行分析,让你们有一个感性的认识;第二部分,从代码的层面介绍怎么使用 Table API;第三部分,介绍 Table API 近期的动态。文章结构以下:python
什么是 Table APIgit
Table API 编程github
Table API 操做sql
为了更好地了解 Table API,咱们先看下 Flink 都提供了哪些 API 供用户使用。apache
如图,Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐加强,好比 processFunction,是最底层的 API,表达能力最强,咱们能够用他来操做 state 和 timer 等复杂功能。Datastream API 相对于 processFunction 来讲,又进行了进一步封装,提供了不少标准的语义算子给你们使用,好比咱们经常使用的 window 算子(包括 Tumble, slide,session 等)。那么最上面的 SQL 和 Table API 使用最为便捷,具备自身的不少特色,重点概括以下:编程
第一,Table API & SQL 是一种声明式的 API。用户只需关心作什么,不用关心怎么作,好比图中的 WordCount 例子,只须要关心按什么维度聚合,作哪一种类型的聚合,不须要关心底层的实现。api
第二,高性能。Table API & SQL 底层会有优化器对 query 进行优化。举个例子,假如 WordCount 的例子里写了两个 count 操做,优化器会识别并避免重复的计算,计算的时候只保留一个 count 操做,输出的时候再把相同的值输出两遍便可,以达到更好的性能。缓存
第三,流批统一。上图例子能够发现,API 并无区分流和批,同一套 query 能够流批复用,对业务开发来讲,避免开发两套代码。性能优化
第四,标准稳定。Table API & SQL 遵循 SQL 标准,不易变更。API 比较稳定的好处是不用考虑 API 兼容性问题。
第五,易理解。语义明确,所见即所得。
上一小节介绍了 Table API 和 SQL 一些共有的特性,这个小节重点介绍下 Table API 自身的特性。主要能够概括为如下两点:
第一,Table API 使得多声明的数据处理写起来比较容易。
怎么理解?好比咱们有一个 Table(tab),而且须要执行一些过滤操做而后输出到结果表,对应的实现是:tab.where(“a < 10”).inertInto(“resultTable1”);此外,咱们还须要作另一些筛选,而后也对结果输出,即 tab.where(“a > 100”).insertInto(“resultTable2”)。你会发现,用 Table API 写起来会很是简洁方便,两行代码就把功能实现了。
第二,Table API 是 Flink 自身的一套 API,这使得咱们更容易地去扩展标准的 SQL。固然,在扩展 SQL 的时候并非随意的去扩展,须要考虑 API 的语义、原子性和正交性,而且当且仅当须要的时候才去添加。
对比 SQL,咱们能够认为 Table API 是 SQL 的超集。SQL 有的操做,Table API 能够有,然而咱们又能够从易用性和功能性地角度对 SQL 进行扩展和提高。
第一章介绍了 Table API 相关的概念。这一章咱们来看下如何用 Table API 来编程。本章会先从一个 WordCount 的例子出发,让你们对 Table API 编程先有一个大概的认识,而后再具体介绍一下 Table API 的操做,好比,如何获取一个 Table,如何输出一个 Table,以及如何对 Table 执行查询操做。
这是一个完整的,用 java 编写的 batch 版本的 WordCount 例子,此外,还有 scala 和 streaming 版本的 WordCount,都统一上传到了 github 上(https://github.com/hequn8128/TableApiDemo),你们能够下载下来尝试运行或者修改。
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class JavaBatchWordCount { // line:10 public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .registerTableSource("fileSource"); // line:20 Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count"); tEnv.toDataSet(result, Row.class).print(); } }
咱们具体看下这个 WordCount 的例子。首先,第1三、14行,是对 environment 的一些初始化,先经过 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到执行环境,而后再经过 BatchTableEnvironment 的 create 拿到对应的 Table 环境,拿到环境后,咱们能够注册 TableSource、TableSink 或执行一些其余操做。
这里须要注意的是,ExecutionEnvironment 跟 BatchTableEnvironment 都是对应 Java 的版本,对于 scala 程序,这里须要是一个对应 scala 版本的 environment。这也是初学者一开始可能会遇到的问题,由于 environent 有不少且容易混淆。为了让你们更好区分这些 environment,下面对 environment 进行了一些概括。
这里从 batch/stream,还有 Java/scala,对 environment 进行了分类,对于这些 environment 使用时须要特别注意,不要 import 错了。environment 的问题,社区已经进行了一些讨论,如上图下方的连接,这里再也不具体展开。
咱们再回到刚刚的 WordCount 的例子,拿到 environment 后,须要作的第二件事情是注册对应的TableSource。
tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .registerTableSource("fileSource");
使用起来也很是方便,首先,由于咱们要读一个文件,须要指定读取文件的路径,指定了以后,咱们须要再描述文件内容的格式,好比他是 csv 的文件而且行分割符是什么。还有就是指定这个文件对应的 Schema 是什么,好比只有一列单词,而且类型是 String。最后,咱们须要把 TableSource 注册到 environment 里去。
Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count"); tEnv.toDataSet(result, Row.class).print();
经过 scan 刚才注册好的 TableSource,咱们能够拿到一个 Table 对象,并执行相应的一些操做,好比 GroupBy,count。最后,能够把 Table 按 DataSet 的方式进行输出。
以上即是一个 Table API 的 WordCount 完整例子。涉及 Table 的获取,Table 的操做,以及 Table 的输出。接下来会具体介绍如何获取 Table、输出 Table 和执行 Table 操做。
获取 Table 大致能够分为两步,第一步,注册对应的 TableSource;第二步,调用 Table environement 的 scan 方法获取 Table 对象。注册 Table Source 又有3种方法:经过 Table descriptor 来注册,经过自定义 source 来注册,或者经过 DataStream 来注册。具体的注册方式以下图所示:
对应输出 Table,咱们也有相似的3种方法:Table descriptor, 自定义 Table sink 以及输出成一个 DataStream。以下图所示:
第二、3节介绍了如何获取和输出一个 Table,本节主要介绍如何对 Table 进行操做。Table 上有不少操做,好比一些 projection 操做 select、filter、where;聚合操做,如 groupBy、flatAggrgate;还有join操做,等等。咱们以一个具体的例子来介绍下 Table 上各操做的转换流程。
如上图,当咱们拿到一个 Table 后,调用 groupBy 会返回一个 GroupedTable。GroupedTable 里只有 select 方法,对 GroupedTable 调用 select 方法会返回一个 Table。拿到这个 Table 后,咱们能够再调用 Table 上的方法。图中其余 Table,如 OverWindowedTable 也是相似的流程。值得注意的是,引入各个类型的 Table 是为了保证 API 的合法性和便利性,好比 groupBy 以后只有 select 操做是有意义的,在编辑器上能够直接点出来。
前面咱们提到,能够将 Table API 当作是 SQL 的超集,所以咱们也能够对 Table 里的操做按此进行分类,大体分为三类,以下图所示:
第一类,是跟 SQL 对齐的一些操做,好比 select, filter, join 等。第二类,是一些提高 Table API 易用性的操做。第三类,是加强 Table API 功能的一些操做。第一类操做因为和 SQL 相似,比较容易理解,其次,也能够查看官方的文档,了解具体的方法,因此这里再也不展开介绍。下面的章节会重点介绍后两类操做,这些操做也是 Table API 独有的。
介绍易用性以前,咱们先来看一个问题。假设咱们有一张很大的表,里面有一百列,此时须要去掉一列,那么SQL怎么写?咱们须要 select 剩下的 99 列!显然这会给用户带来不小的代价。为了解决这个问题,咱们在Table上引入了一个 dropColumns 方法。利用 dropColumns 方法,咱们即可以只写去掉的列。与此对应,还引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法,以下图所示:
解决了刚才的问题后,咱们再看下面另外一个问题:假设仍是一张100列的表,咱们须要选第20到第80列,那么咱们如何操做呢?为了解决这个问题,咱们又引入了 withColumns 和 withoutColumns 方法。对于刚才的问题,咱们能够简单地写成 table.select(“withColumns(20 to 80)”)。
该小节会介绍下 TableAggregateFunction 的功能和用法。在引入 TableAggregateFunction 以前,Flink 里有三种自定义函数:ScalarFunction,TableFunction 和 AggregateFunction。咱们能够从输入和输出的维度对这些自定义函数进行分类。以下图所示,ScalarFunction 是输入一行,输出一行;TableFunction 是输入一行,输出多行;AggregateFunction 是输入多行输出一行。为了让语义更加完整,Table API 新加了 TableAggregateFunction,它能够接收和输出多行。TableAggregateFunction 添加后,Table API 的功能能够获得很大的扩展,某种程度上能够用它来实现自定义 operator。好比,咱们能够用 TableAggregateFunction 来实现 TopN。
TableAggregateFunction 使用也很简单,方法签名和用法以下图所示:
用法上,咱们只须要调用 table.flatAggregate(),而后传入一个 TableAggregateFunction 实例便可。用户能够继承 TableAggregateFunction 来实现自定义的函数。继承的时候,须要先定义一个 Accumulator,用来存取状态,此外自定义的 TableAggregateFunction 须要实现 accumulate 和 emitValue 方法。accumulate 方法用来处理输入的数据,而 emitValue 方法负责根据 accumulator 里的状态输出结果。
最后介绍下 Table API 近期的动态:
1.Flip-29
主要是 Table API 功能和易用性的加强。好比刚刚介绍的 columns 相关操做,还有 TableAggregateFunction。
社区对应的 jira 是:https://issues.apache.org/jira/browse/FLINK-10972
2.Python Table API
但愿在 Table API 上增长 python 语言的支持。这个应该是 Python 用户的福音。
社区对应的 jira 是:https://issues.apache.org/jira/browse/FLINK-12308
3.Interactive Programming(交互式编程)
即 Table 上会提供一个 cache 算子,执行 cache 操做能够缓存 table 的结果,并在这个结果上作其余操做。社区对应 jira 是:https://issues.apache.org/jira/browse/FLINK-11199
4.Iterative Processing(迭代计算)
Table 上会支持一个 iterator 的算子,该算子能够用来执行迭代计算。好比迭代 100 次,或者指定一个收敛的条件,在机器学习领域使用比较普遍。社区对应 jira 是:https://issues.apache.org/jira/browse/FLINK-11199
▼ Apache Flink 社区推荐 ▼
Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:
https://developer.aliyun.com/...
首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击: