摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲以下:javascript
JDBC connectorhtml
JDBC Catalogjava
JDBC Dialectmysql
Demogit
Tips:点击下方连接可查看做者原版 PPT 及分享视频:
https://flink-learning.org.cn/developers/flink-training-course3/github
JDBC-Connector 的重构web
JDBC-Connector 的重构web
FLINK-15782 :Rework JDBC Sinks[1] (重写 JDBC Sink)sql
FLINK-17537:Refactor flink-jdbc connector structure[2] (重构 flink-jdbc 链接器的结构)数据库
FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)apache
FLIP-122:New Connector Property Keys for New Factory[4](新的链接器参数)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主键约束问题)
JDBC Catalog
JDBC Catalog
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)
JDBC Dialect
JDBC Dialect
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
|
|
|
|
|
|
实践 Demo
实践 Demo
-
Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。 -
测试数据准备,经过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是经过 debezium connector 抓取的 MySQL orders表 的 binlog。 -
经过 SQL Client 编写 SQL 做业,分别建立 Flink 订单表,维表,用户表,产品表,并建立 Function UDF。从 PG Catalog 获取结果表信息以后,把做业提交至集群执行运行。 -
测试 CDC 数据同步和维表 join,经过新增订单、修改订单、删除订单、维表数据更新等一系列操做验证 CDC 在 Flink 上如何运行以及写入结果表。
https://github.com/leonardBang/flink-sql-etl
问答环节
问答环节
https://issues.apache.org/jira/browse/FLINK-16681
总结
总结
参考连接:
Flink Forward Asia 2020 议题征集中
(点击可了解更多议题投递详情)

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。