数栈是云原生—站式数据中台PaaS,咱们在github和gitee上有一个有趣的开源项目:FlinkX,FlinkX是一个基于Flink的批流统一的数据同步工具,既能够采集静态的数据,也能够采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。你们喜欢的话请给咱们点个star!star!star!mysql
github开源项目:https://github.com/DTStack/flinkxgit
gitee开源项目:https://gitee.com/dtstack_dev_0/flinkxgithub
首先,本文所述均基于flink 1.5.4。正则表达式
1、咱们为何扩展Flink-SQL?
因为Flink 自己SQL语法并不提供在对接输入源和输出目的的SQL语法。数据开发在使用的过程当中须要根据其提供的Api接口编写Source和 Sink, 异常繁琐,不只须要了解FLink 各种Operator的API,还须要对各个组件的相关调用方式有了解(好比kafka,redis,mongo,hbase等),而且在须要关联到外部数据源的时候没有提供SQL相关的实现方式,所以数据开发直接使用Flink编写SQL做为实时的数据分析时须要较大的额外工做量。redis
咱们的目的是在使用Flink-SQL的时候只须要关心作什么,而不须要关心怎么作。不须要过多的关心程序的实现,专一于业务逻辑。sql
接下来,咱们一块儿来看下Flink-SQL的扩展实现吧!缓存
2、扩展了哪些flink相关sql
一、建立源表语句网络
二、建立输出表语句异步
三、建立自定义函数函数
四、维表关联
3、各个模块是如何翻译到flink的实现
一、如何将建立源表的sql语句转换为flink的operator
Flink中表的都会映射到Table这个类。而后调用注册方法将Table注册到environment。
StreamTableEnvironment.registerTable(tableName, table);
当前咱们只支持kafka数据源。Flink自己有读取kafka 的实现类, FlinkKafkaConsumer09,因此只须要根据指定参数实例化出该对象。并调用注册方法注册便可。
另外须要注意在flink sql常常会须要用到rowtime, proctime, 因此咱们在注册表结构的时候额外添加rowtime,proctime。
当须要用到rowtime的使用须要额外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定义watermark主要作两个事情:1:如何从Row中获取时间字段。 2:设定最大延迟时间。
二、 如何将建立的输出表sql语句转换为flink的operator
Flink输出Operator的基类是OutputFormat, 咱们这里继承的是RichOutputFormat, 该抽象类继承OutputFormat,额外实现了获取运行环境的方法getRuntimeContext(), 方便于咱们以后自定义metric等操做。
咱们以输出到mysql插件mysql-sink为例,分两部分:
- 将create table 解析出表名称,字段信息,mysql链接信息。
该部分使用正则表达式的方式将create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件链接信息。
- 继承RichOutputFormat将数据写到对应的外部数据源。
主要是实现writeRecord方法,在mysql插件中其实就是调用jdbc 实现插入或者更新方法。
三、如何将自定义函数语句转换为flink的operator;
Flink对udf提供两种类型的实现方式:
1)继承ScalarFunction
2)继承TableFunction
须要作的将用户提供的jar添加到URLClassLoader, 并加载指定的class (实现上述接口的类路径),而后调用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注册。以后便可使用改定义的udf;
四、维表功能是如何实现的?
流计算中一个常见的需求就是为数据流补齐字段。由于数据采集端采集到的数据每每比较有限,在作数据分析以前,就要先将所需的维度信息补全,可是当前flink并未提供join外部数据源的SQL功能。
实现该功能须要注意的几个问题:
1)维表的数据是不断变化的
在实现的时候须要支持定时更新内存中的缓存的外部数据源,好比使用LRU等策略。
2)IO吞吐问题
若是每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里咱们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减小了花费在网络请求上的时间。
3)如何将sql 中包含的维表解析到flink operator
为了从sql中解析出指定的维表和过滤条件, 使用正则明显不是一个合适的办法。须要匹配各类可能性。将是一个无穷无尽的过程。查看flink自己对sql的解析。它使用了calcite作为sql解析的工做。将sql解析出一个语法树,经过迭代的方式,搜索到对应的维表;而后将维表和非维表结构分开。
经过上述步骤能够经过SQL完成经常使用的从kafka源表,join外部数据源,写入到指定的外部目的结构中。