原文地址:stream-sqlhtml
个人Flink系列实战文章地址:Github Repogit
近年来,开源的分布式流计算系统层出不穷,引发了普遍的关注与讨论。其中的先行者,譬如 Apache Storm提供了低延迟的流式处理功能,可是受限于at-least-once的投递保证,背压等不太良好的处理以及相对而言的开放API的底层化。不过Storm也起到了抛砖引玉的做用,自此以后,不少新的流计算系统在不一样的维度上大放光彩。今日,Apache Flink或者 Apache Beam的使用者可以使用流式的Scala或者Java APIs来进行流处理任务,同时保证了exactly-once的投递机制以及高吞吐状况下的的低延迟响应。与此同时,流处理也在产界获得了应用,从Apache Kafka与Apache Flink在流处理的基础设施领域的大规模部署也折射除了流处理在产界的快速发展。与日俱增的实时数据流催生了开发人员或者分析人员对流数据进行分析以及实时展示的需求。不过,流数据分析也须要一些必备的技能与知识储备,譬如无限流的基本特性、窗口、时间以及状态等等,这些概念都会在利用Java或者Scala API来完成一个流分析的任务时候起到很大的做用。github
大概六个月以前,Apache Flink社区着手为流数据分析系统引入一个SQL交互功能。众所周知,SQL是访问与处理数据的标准语言,基本上每一个用过数据库的或者进行或数据分析的都对SQL不陌生。鉴于此,为流处理系统添加一个SQL交互接口可以有效地扩大该技术的受用面,让更多的人能够熟悉而且使用。除此以外,引入SQL的支持还能知足于一些须要实时交互地用户场景,大大简化一些须要进行流操做或者转化的应用代码。这篇文章中,咱们会从现有的状态、架构的设计以及将来Apache Flink社区准备添加SQL支持的计划这几个方面进行讨论。sql
在 0.9.0-milestone1 发布以后,Apache Flink添加了所谓的Table API来提供相似于SQL的表达式用于对关系型数据进行处理。这系列API的操做对象就是抽象而言的可以进行关系型操做的结构化数据或者流。Table API通常与DataSet或者DataStream紧密关联,能够从DataSet或者DataStream来方便地建立一个Table对象,也能够用以下的操做将一个Table转化回一个DataSet或者DataStream对象:数据库
val execEnv = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // obtain a DataSet from somewhere val tempData: DataSet[(String, Long, Double)] = // convert the DataSet to a Table val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF) // compute your result val avgTempCTable: Table = tempTable .where('location.like("room%")) .select( ('time / (3600 * 24)) as 'day, 'Location as 'room, (('tempF - 32) * 0.556) as 'tempC ) .groupBy('day, 'room) .select('day, 'room, 'tempC.avg as 'avgTempC) // convert result Table back into a DataSet and print it avgTempCTable.toDataSet[Row].print()
上面这坨代码是Scala的,不过你能够简单地用Java版本的Table API进行实现,下面这张图就展现了Table API的原始的结构:apache
在咱们从DataSet或者DataStream建立了Table以后,能够利用相似于filter
, join
, 或者 select
关系型转化操做来转化为一个新的Table对象。而从内部实现上来讲,全部应用于Table的转化操做都会变成一棵逻辑表操做树,在Table对象被转化回DataSet或者DataStream以后,专门的转化器会将这棵逻辑操做符树转化为对等的DataSet或者DataStream操做符。譬如'location.like("room%")
这样的表达式会经由代码生成编译为Flink中的函数。api
不过,老版本的Table API仍是有不少的限制的。首先,Table API并不能单独使用,而必须嵌入到DataSet或者DataStream的程序中,对于批处理表的查询并不支持外链接、排序以及其余不少在SQL中常用的扩展操做。而流处理表中只支持譬如filters、union以及projections,不能支持aggregations以及joins。而且,这个转化处理过程并不能有查询优化,你要优化的话仍是要去参考那些对于DataSet操做的优化。架构
关因而否须要添加SQL支持的讨论以前就在Flink社区中发生过几回,Flink 0.9发布以后,Table API、关系表达式的代码生成工具以及运行时的操做符等都预示着添加SQL支持的不少基础已经具有,能够考虑进行添加了。不过另外一方面,在整个Hadoop生态链里已经存在了大量的所谓“SQL-on-Hadoop”的解决方案,譬如Apache Hive, Apache Drill, Apache Impala, Apache Tajo,在已经有了这么多的可选方案的状况下,咱们以为应该优先提高Flink其余方面的特性,因而就暂时搁置了SQL-on-Hadoop的开发。框架
不过,随着流处理系统的日渐火热以及Flink受到的愈来愈普遍地应用,咱们发现有必要为用户提供一个更简单的能够用于数据分析的接口。大概半年前,咱们决定要改造下Table API,扩展其对于流处理的能力而且最终完成在流处理上的SQL支持。不过咱们也不打算重复造轮子,所以打算基于Apache Calcite这个很是流行的SQL解析器进行重构操做。Calcite在其余不少开源项目里也都应用到了,譬如Apache Hive, Apache Drill, Cascading, and many more。此外,Calcite社区自己也有将SQL on streams列入到它们的路线图中,所以咱们一拍即合。Calcite 在新的架构设计中的地位大概以下所示:分布式
新的架构主要是将Table API与SQL集成起来,用这两货的API构建的查询最终都会转化到Calcite的所谓的logicl plans表述。转化以后的流查询与批查询基本上差很少,而后Calcite的优化器会基于转化和优化规则来优化这些logical plans,针对数据源(流仍是静态数据)的不一样咱们会应用不一样的规则。最后,通过优化的logical plan会转化为一个普通的Flink DataStream或者DataSet对象,即仍是利用代码生成来将关系型表达式编译为Flink的函数。
新的架构继续提供了Table API而且在此基础上进行了很大的提高,它为流数据与关系型数据提供了统一的查询接口。另外,咱们利用了Calcite的查询优化框架与SQL解释器来进行了查询优化。不过,由于这些设计都仍是基于Flink的已有的API,譬如DataStream API提供的低延迟、高吞吐以及exactly-once投递的功能,以及DataSet API经过的健壮与高效的内存级别的操做器与管道式的数据交换,任何对于Flink核心API的提高都可以自动地提高Table API或者SQL查询的效率。
在这些工做以后,Flink就已经具有了同时对于流数据与静态数据的SQL支持。不过,咱们并不想把这个当成一个高效的SQL-on-Hadoop的解决方案,就像Impala, Drill, 以及 Hive那样的角色,咱们更愿意把它当成为流处理提供便捷的交互接口的方案。另外,这套机制还能促进同时用了Flink API与SQL的应用的性能。
咱们讨论了为啥要重构Flink的流SQL接口的缘由以及大概怎么去完成这个任务,如今咱们讨论下最终的API或者使用方式会是啥样的。新的SQL接口会集成到Table API中。DataStreams、DataSet以及额外的数据源都会先在TableEnvironment中注册成一个Table而后再进行SQL操做。TableEnvironment.sql()
方法会容许你输入SQL查询语句而后执行返回一个新的Table,下面这个例子就展现了一个完整的从JSON编码的Kafka主题中读取数据而后利用SQL查询进行处理最终写入另外一个Kafka主题的模型。注意,这下面提到的KafkaJsonSource与KafkaJsonSink都还未发布,将来的话TableSource与TableSinks都会固定提供,这样能够减小不少的模板代码。
// get environments val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // configure Kafka connection val kafkaProps = ... // define a JSON encoded Kafka topic as external table val sensorSource = new KafkaJsonSource[(String, Long, Double)]( "sensorTopic", kafkaProps, ("location", "time", "tempF")) // register external table tableEnv.registerTableSource("sensorData", sensorSource) // define query in external table val roomSensors: Table = tableEnv.sql( "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " + "FROM sensorData " + "WHERE location LIKE 'room%'" ) // define a JSON encoded Kafka topic as external sink val roomSensorSink = new KafkaJsonSink(...) // define sink for room sensor data and execute query roomSensors.toSink(roomSensorSink) execEnv.execute()
你可能会发现上面这个例子中没有体现流处理中两个重要的方面:基于窗口的聚合与关联。下面我就会解释下怎么在SQL中表达关于窗口的操做。 Apache Calcite社区关于这方面已经有所讨论:SQL on streams。Calcite的流SQL被认为是一个标准SQL的扩展,而不是另外一个相似于SQL的语言。这会有几个方面的好处,首先,已经熟悉了标准SQL语法的同窗就不必花时间再学一波新的语法了,皆大欢喜。如今对于静态表与流数据的查询已经基本一致了,能够很方便地进行转换。Flink一直主张的是批处理只是流处理的一个特殊状况,所以用户也能够同时在静态表与流上进行查询,譬如处理有限的流。最后,将来也会有不少工具支持进行标准的SQL进行数据分析。
尽管咱们尚未彻底地定义好在Flink SQL表达式与Table API中如何进行窗口等设置,下面这个简单的例子会指明如何在SQL与Table API中进行滚动窗口式查询:
SELECT STREAM TUMBLE_END(time, INTERVAL '1' DAY) AS day, location AS room, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE 'room%' GROUP BY TUMBLE(time, INTERVAL '1' DAY), location
val avgRoomTemp: Table = tableEnv.ingest("sensorData") .where('location.like("room%")) .partitionBy('location) .window(Tumbling every Days(1) on 'time as 'w) .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)