Flink JDBC Connector:Flink 与数据库集成最佳实践

整理:陈政羽(Flink 社区志愿者)

摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲以下:javascript


  1. JDBC connectorhtml

  2. JDBC Catalogjava

  3. JDBC Dialectmysql

  4. Demogit


Tips:点击下方连接可查看做者原版 PPT 及分享视频:
https://flink-learning.org.cn/developers/flink-training-course3/
github

 

JDBC-Connector 的重构web


JDBC Connector 在 Flink 1.11 版本发生了比较大的变化,咱们先从如下几个 Feature 来具体了解一下 Flink 社区在这个版本上对 JDBC 所作的改进。

  • FLINK-15782 :Rework JDBC Sinks[1] (重写 JDBC Sink)sql


这个 issue 主要为 DataStream API 新增了 JdbcSink,对于使用 DataStream 编程的用户会更加方便地把数据写入到 JDBC;而且规范了一些命名规则,之前命名使用的是 JDBC 加上链接器名称,目前命名规范为 Jdbc+ 链接器名称

  • FLINK-17537:Refactor flink-jdbc connector structure[2] (重构 flink-jdbc 链接器的结构)数据库


这个 issue 将 flink-jdbc 包名重命名为 flink-connector-jdbc,与 Flink 的其余 connector 统一,将全部接口和类从 org.apache.flink.java.io.jdbc(旧包)规范为新包路径 org.apache.flink.connector.jdbc(新包),经过这种重命名用户在对底层源代码的阅读上面会更加容易的理解和统一。

  • FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)apache


因为早期数据类型系统并非很完善,致使了比较多的 Connector 在使用上会常常报数据类型相关的异常,例如 DECIMAL 精度类型,在以往的 Flink 1.10 版本中有可能出现下图问题:


基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面作了重构,目前数据精度方面的支持已经很完善了。

  • FLIP-122:New Connector Property Keys for New Factory[4](新的链接器参数)


在 Flink 1.11 版本中,咱们对 DDL 的 WITH 参数相对于 1.10 版本作了简化,从用户视角看上就是简化和规范了参数,如表格所示:

Old Key (Flink 1.10)
New Key (Flink 1.11)
connector.type
connector.type
connector.url
url
connector.table
table-name
connector.driver
driver
connector.username
username
connector.password
password
connector.read.partition.column
scan.partition.column
connector.read.partition.num
scan.partition.num
connector.read.partition.lower-bound
scan.partition.lower-bound
connector.read.partition.upper-bound
scan.partition.upper-bound
connector.read.fetch-size
scan.fetch-size
connector.lookup.cache.max-rows
lookup.cache.max-rows
connector.lookup.cache.ttl
lookup.cache.ttl
connector.lookup.max-retries
lookup.max-retries
connector.write.flush.max-rows
sink.buffer-flush.max-rows
connector.write.flush.interval
sink.buffer-flush.interval
connector.write.max-retries
sink.max-retries

你们能够看到表格中有 3 个标红的地方,这个是相对于 1.10 有发生变化比较多的地方。此次 FLIP 但愿进一步简化链接器属性,以便使属性更加简洁和可读,并更好地与 FLIP-107 协做。若是须要了解更多的 Connector 参数能够进一步参考官方文档和 FLIP-122 中提到的改变,这样有助于从旧版本迁移到新版本并了解参数的变化。

  • FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主键约束问题)


Flink 1.10 存在某些 Query 没法推断出主键致使没法进行 Upsert 更新操做(以下图所示错误)。因此在 FLIP-87 中为 Flink SQL 引入的 Primary Key 约束。Flink 的主键约束遵循 SQL 标准,主键约束分为 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否对数据进行校验。咱们常见数据库的主键约束属于 PRIMARY KEY ENFORCED,会对数据进行校验。由于 Flink 并不持有数据,所以 Flink 支持的主键模式是 PRIMARY KEY NOT ENFORCED,  这意味着 Flink 不会校验数据,而是由用户确保主键的完整性。例如 HBase 里面对应的主键应该是 RowKey,在 MySQL 中对应的主键是在用户数据库的表中对应的主键。


JDBC Catalog


目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在关系数据库中的表,若是要在 Flink 中使用,用户须要手动写表的 DDL,一旦表的 Schema 发生改变,用户须要手动修改, 这是比较繁琐的事情。JDBC Catalog 提供了接口用于链接到各类关系型数据库,使得 Flink 可以自动检索表,不用用户手动输入和修改。目前 JDBC Catalog 内置目前实现了 Postgres Catalog。Postgres catalog 是一个 read-only (只读)的 Catalog,只支持读取 Postgres 表,支持的功能比较有限。下面代码展现了目前 Postgres catalog 支持的 6 个功能:数据库是否存在、数据库列表、获取数据库、根据数据库名获取表列表、得到表、表是否存在。


// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)

若是须要支持其余 DB (如 MySQL),须要用户根据 FLIP-93 的 JdbcCatalog 接口实现对应不一样的 JDBC Catalog。

JDBC Dialect


什么是 Dialect?

Dialect (方言)对各个数据库来讲,Dialect 体现各个数据库的特性,好比语法、数据类型等。若是须要查看详细的差别,能够点击这里[6]查看详细差别。下面经过对比 MySQL 和 Postgres 的一些常见场景举例:

Dialect
MySQL
Postgres
场景描述
Grammar(语法)
LIMIT 0,30
WITH LIMIT 30 OFFSET 0
分页
Data Type (数据类型)
BINARY
BYTEA,ARRAY
字段类型
Command (命令)
show tables
\dt
查看全部表

在数据类型上面,Flink SQL 的数据类型目前映射规则以下:

MySQL type
PostgreSQL type
Flink SQL type
TINYINT

TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT
UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT
UNSIGNED

DECIMAL(20, 0)

Flink 目前支持三种 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于测试,更多的类型映射能够点击下方连接前往官方文档查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping


如何保证 Dialect Upsert 的幂等性?

若是定义了主键,JDBC 写入时是可以保证 Upsert 语义的, 若是 DB 不支持 Upsert 语法,则会退化成 DELETE + INSERT 语义。Upsert query 是原子执行的,能够保证幂等性。这个在官方文档中也详细描述了更新失败或者存在故障时候如何作出的处理,下面的表格是不一样的 DB 对应不一样的 Upsert 语法:

Database
Upsert Grammar
MySQL
INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL
INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定义 Dialect?

目前若是要实现自定义 Dialect (好比 SQL Server、Oracle 等), 须要用户本身实现对应 Dialect 修改源码并从新打包 flink-connector-jdbc。社区正在讨论提供一种插件化 dialect 的机制, 让用户能够不用修改源码打包就能实现自定义 Dialect,这个机制须要把 Dialect 接口暴露给用户。目前的 Dialect 接口不够清晰,没有考虑 DataStream API 的使用场景,也没有考虑到 一些复杂的 SQL 场景,因此这个接口目前不太稳定(后续版本会修改) 。

社区目前之因此没有把这个 API 开放给用户,是从用户使用的体验角度考虑,但愿把这种顶级 API 设计得尽可能稳定、简洁后再开放出来给用户使用,避免用户在后续 Flink 版本的迭代中屡次修改代码。目前社区已经有相应的计划去作了,你们能够留意 FLINK-16833[7] 提出的 JDBCDialect 插件化设计。

实践 Demo


你们看完上述 Flink 1.11 在 JDBC 所作的改动后,你们能够尝试下面这个关于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步机制。

环境与版本:Flink 1.11.一、Docker、Kafka 1.11.一、MySQL Driver 5.1.4八、PostgreSQL Driver 42.2.14

流程以下:

  1. Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。
  2. 测试数据准备,经过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是经过 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 经过 SQL Client 编写 SQL 做业,分别建立 Flink 订单表,维表,用户表,产品表,并建立 Function UDF。从 PG Catalog 获取结果表信息以后,把做业提交至集群执行运行。
  4. 测试 CDC 数据同步和维表 join,经过新增订单、修改订单、删除订单、维表数据更新等一系列操做验证 CDC 在 Flink 上如何运行以及写入结果表。


上图为业务流程总体图,项目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl


问答环节


1.Flink SQL Client 上面执行的 use default,是使用哪一个 catlog 呢?

答:Flink 内部有一个内置 Catlog,它把 meta 数据存于内存中。在 SQL Client 上没有显式指定 Hive catlog 或者 jdbc catlog 时会使用内置的 Catalog,刚刚的案例给你们演示的是 Postgres Catalog,里面有结果表。在内置 Catlog 能够看到咱们刚刚建立 Kafka 的表,MySQL 的维度表。

2.Flink MySQL DDL 链接 8 小时后就会自动断开的问题是否已经解决?

这个问题会在 1.12 版本解决此问题,目前 master 分支已经合并,具体能够参考如下地址 ,描述了相关问题的讨论和解决办法。

https://issues.apache.org/jira/browse/FLINK-16681 


3.什么是 CDC?能大概讲下目前 Flink 支持的 CDC 吗?

经过 Change Data Capture 机制(CDC)来将外部系统的动态数据(如 Mysql BinLog、Kafka Compacted Topic)导入 Flink,以及将 Flink 的 Update/Retract 流写出到外部系统中是用户一直但愿的功能。

Flink 1.11 实现了对 CDC 数据读取和写出的支持。目前 Flink 能够支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴开源同步工具) 两种 CDC 格式。Debezium 在国外用得比较多,Canal 在国内用得比较多,二者格式会有所区别,详细能够参考官方使用文档。


总结


本文从 JDBC Connector 的重构、数据精度、主键约束、命名规范等方面详细介绍,分享了社区目前实现的 Postgres Catalog 功能点;介绍了 Flink 如何实现 JDBC Dialect 的统一以及目前社区对 Dialect 作的一些计划;最后的实践 Demo 环节演示了经过 SQL Client 进行维表 JOIN 和 ETL 操做以及解答了你们在实际生产中所遇到的问题,但愿对你们进一步了解 Flink CDC 新功能有所帮助。

参考连接:


[1]https://issues.apache.org/jira/browse/FLINK-15782
[2]https://issues.apache.org/jira/browse/FLINK-17537
[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
[6]https://www.postgresqltutorial.com/postgresql-vs-mysql/
[7]https://issues.apache.org/jira/browse/FLINK-16833



  Flink Forward Asia 2020 议题征集中  


洞察先机,智见将来,  Flink Forward Asia 2020 盛大开启!诚邀开源社区的各方力量与咱们一块儿,探讨新型数字化技术下的将来趋势,共同打造 2020 年大数据领域的这场顶级盛会!

在 Flink Forward Asia 2020,您可与全球开发者分享您的真知灼见,同各技术领域顶级专家面对面交流,探索数字化技术下的将来趋势若是您对以上任意技术方向有积累洞察,欢迎投递议题!每位嘉宾最多能够投递三个Topic, 投递日期截止至 10 月 12 日。


(点击可了解更多议题投递详情)


戳我投议题!

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索