在现实业务中,Kafka常常会遇到的一个集成场景就是,从数据库获取数据,由于关系数据库是一个很是丰富的事件源。数据库中的现有数据以及对该数据的任何更改均可以流式传输到Kafka主题中,在这里这些事件可用于驱动应用,也能够流式传输到其它数据存储(好比搜索引擎或者缓存)用于分析等。html
实现这个需求有不少种作法,可是在本文中,会聚焦其中的一个解决方案,即Kafka链接器中的JDBC链接器,讲述如何进行配置,以及一些问题排查的技巧,至于更多的细节,请参见Kafka的文档。java
Kafka链接器中的JDBC链接器包含在Confluent Platform中,也能够与Confluent Hub分开安装。它能够做为源端从数据库提取数据到Kafka,也能够做为接收端从一个Kafka主题中将数据推送到数据库。几乎全部关系数据库都提供JDBC驱动,包括Oracle、Microsoft SQL Server、DB二、MySQL和Postgres。mysql
下面将从最简单的Kafka链接器配置开始,而后进行构建。本文中的示例是从MySQL数据库中提取数据,该数据库有两个模式,每一个模式都有几张表:git
mysql> SELECT table_schema, table_name FROM INFORMATION_SCHEMA.tables WHERE TABLE_SCHEMA != 'information_schema'; +--------------+--------------+ | TABLE_SCHEMA | TABLE_NAME | +--------------+--------------+ | demo | accounts | | demo | customers | | demo | transactions | | security | firewall | | security | log_events | +--------------+--------------+
在进行配置以前,要确保Kafka链接器能够实际链接到数据库,即确保JDBC驱动可用。若是使用的是SQLite或Postgres,那么驱动已经包含在内,就能够跳过此步骤。对于全部其它数据库,须要将相关的JDBC驱动JAR文件放在和kafka-connect-jdbc
JAR相同的文件夹中。此文件夹的标准位置为:github
share/java/kafka-connect-jdbc/
;/usr/share/java/kafka-connect-jdbc/
,关于如何将JDBC驱动添加到Kafka链接器的Docker容器,请参阅此处;kafka-connect-jdbc
JAR位于其它位置,则可使用plugin.path
指向包含它的文件夹,并确保JDBC驱动位于同一文件夹中。还能够在启动Kafka链接器时指定CLASSPATH
,设置为能够找到JDBC驱动的位置。必定要将其设置为JAR自己,而不只仅是包含它的文件夹,例如:正则表达式
CLASSPATH=/u01/jdbc-drivers/mysql-connector-java-8.0.13.JAR ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
两个事情要注意一下:sql
kafka-connect-jdbc
JAR位于其它位置,则Kafka链接器的plugin.path
选项将没法直接指向JDBC驱动JAR文件 。根据文档,每一个JDBC驱动JAR必须与kafka-connect-jdbc
JAR位于同一目录;找不到合适的驱动docker
与JDBC链接器有关的常见错误是No suitable driver found
,好比:数据库
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
这可能有2个缘由:apache
确认是否已加载JDBC驱动
Kafka链接器会加载与kafka-connect-jdbc
JAR文件在同一文件夹中的全部JDBC驱动,还有在CLASSPATH
上找到的任何JDBC驱动。若是要验证一下,能够将链接器工做节点的日志级别调整为DEBUG
,而后会看到以下信息:
1.DEBUG Loading plugin urls
:包含kafka-connect-jdbc-5.1.0.jar
(或者对应当前正在运行的版本号)的一组JAR文件:
DEBUG Loading plugin urls: [file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/audience-annotations-0.5.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/common-utils-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jline-0.9.94.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jtds-1.3.1.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.13.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/netty-3.10.6.Final.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/postgresql-9.4-1206-jdbc41.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/slf4j-api-1.7.25.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/sqlite-jdbc-3.25.2.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zkclient-0.10.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zookeeper-3.4.13.jar] (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
在这个JAR列表中,应该有JDBC驱动JAR。在上面的输出中,能够看到MySQL、Postgres和SQLite的JAR。若是指望的JDBC驱动JAR不在,能够将驱动放入kafka-connect-jdbc
JAR所在的文件夹中。
2.INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector'
:在此以后,在记录任何其它插件以前,能够看到JDBC驱动已注册:
INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: jTDS 1.3.1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@7bbbb6a8 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: org.postgresql.Driver@ea9e141 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: org.sqlite.JDBC@236134a1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
确认JDBC驱动包含在已注册的列表中。若是没有,那么就是安装不正确。
注意,虽然可能会在日志的其它地方看到驱动的Registered java.sql.Driver
信息,但若是要确认其对于JDBC链接器可用,那么它必须直接
出如今INFO Added plugin 'io.confluent.connect.jdbc
消息的后面。
JDBC URL
对于源数据库来讲JDBC URL必须是正确的,若是搞错了,那么Kafka链接器即便驱动正确,也是不行。如下是一些常见的JDBC URL格式:
数据库 | 下载地址 | JDBC URL |
---|---|---|
IBM DB2 | 下载 | jdbc:db2://<host>:<port50000>/<database> |
IBM Informix | jdbc:informix-sqli://:/:informixserver=<debservername> |
|
MS SQL | 下载 | jdbc:sqlserver://<host>[:<port1433>];databaseName=<database> |
MySQL | 下载 | jdbc:mysql://<host>:<port3306>/<database> |
Oracle | 下载 | jdbc:oracle:thin://<host>:<port>/<service> or jdbc:oracle:thin:<host>:<port>:<SID> |
Postgres | Kafka链接器自带 | jdbc:postgresql://<host>:<port5432>/<database> |
Amazon Redshift | 下载 | jdbc:redshift://<server>:<port5439>/<database> |
Snowflake | jdbc:snowflake://<account_name>.snowflakecomputing.com/?<connection_params> |
注意,虽然JDBC URL一般容许嵌入身份验证信息,但这些内容将以明文形式记录在Kafka链接器日志中。所以应该使用单独的connection.user
和connection.password
配置项,这样在记录时会被合理地处理。
JDBC驱动安装完成以后,就能够配置Kafka链接器从数据库中提取数据了。下面是最小的配置,不过它不必定是最有用的,由于它是数据的批量导入,在本文后面会讨论如何进行增量加载。
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-01-", "mode":"bulk" } }'
使用此配置,每一个表(用户有权访问)将彻底复制到Kafka,经过使用KSQL列出Kafka集群上的主题,咱们能够看到:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- mysql-01-accounts | false | 1 | 1 | 0 | 0 mysql-01-customers | false | 1 | 1 | 0 | 0 mysql-01-firewall | false | 1 | 1 | 0 | 0 mysql-01-log_events | false | 1 | 1 | 0 | 0 mysql-01-transactions | false | 1 | 1 | 0 | 0
注意mysql-01
前缀,表格内容的完整副本将每五秒刷新一次,能够经过修改poll.interval.ms
进行调整,例如每小时刷新一次:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_02", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-02-", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
找个主题确认一下,显示完整的数据,看看是否是本身想要的:
ksql> PRINT 'mysql-02-accounts' FROM BEGINNING; Format:AVRO 12/20/18 3:18:44 PM UTC, null, {"id": 1, "first_name": "Hamel", "last_name": "Bly", "username": "Hamel Bly", "company": "Erdman-Halvorson", "created_date": 17759} 12/20/18 3:18:44 PM UTC, null, {"id": 2, "first_name": "Scottie", "last_name": "Geerdts", "username": "Scottie Geerdts", "company": "Mante Group", "created_date": 17692} 12/20/18 3:18:44 PM UTC, null, {"id": 3, "first_name": "Giana", "last_name": "Bryce", "username": "Giana Bryce", "company": "Wiza Inc", "created_date": 17627} 12/20/18 3:18:44 PM UTC, null, {"id": 4, "first_name": "Allen", "last_name": "Rengger", "username": "Allen Rengger", "company": "Terry, Jacobson and Daugherty", "created_date": 17746} 12/20/18 3:18:44 PM UTC, null, {"id": 5, "first_name": "Reagen", "last_name": "Volkes", "username": "Reagen Volkes", "company": "Feeney and Sons", "created_date": 17798} …
目前会展现全部可用的表,这可能不是实际的需求,可能只但愿包含特定模式的表,这个可使用catalog.pattern/schema.pattern
(具体哪个取决于数据库)配置项进行控制:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_03", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-03-", "mode":"bulk", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo" } }'
这样就只会从demo
模式中取得3张表:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- […] mysql-03-accounts | false | 1 | 1 | 0 | 0 mysql-03-customers | false | 1 | 1 | 0 | 0 mysql-03-transactions | false | 1 | 1 | 0 | 0 […]
也可使用table.whitelist
(白名单)或table.blacklist
(黑名单)来控制链接器提取的表,下面的示例显式地列出了但愿拉取到Kafka中的表清单:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_04", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-04-", "mode":"bulk", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo", "table.whitelist" : "accounts" } }'
这时就只有一个表从数据库流式传输到Kafka:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- mysql-04-accounts | false | 1 | 1 | 0 | 0
由于只有一个表,下面的配置:
"catalog.pattern" : "demo", "table.whitelist" : "accounts",
等同于:
"table.whitelist" : "demo.accounts",
也能够在一个模式中指定多个表,好比:
"catalog.pattern" : "demo", "table.whitelist" : "accounts, customers",
或者也能够跨越多个模式:
"table.whitelist" : "demo.accounts, security.firewall",
还可使用其它的表过滤选项,好比table.types
能够选择表以外的对象,例如视图。
过滤表时要注意,由于若是最终没有对象匹配该模式(或者链接到数据库的已认证用户没有权限访问),那么链接器将报错:
INFO After filtering the tables are: (io.confluent.connect.jdbc.source.TableMonitorThread) … ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) java.lang.IllegalArgumentException: Number of groups must be positive
在经过table.whitelist/table.blacklist
进行过滤以前,能够将日志级别调整为DEBUG
,查看用户能够访问的表清单:
DEBUG Got the following tables: ["demo"."accounts", "demo"."customers"] (io.confluent.connect.jdbc.source.TableMonitorThread)
而后,链接器会根据提供的白名单/黑名单过滤此列表,所以要确认指定的列表位于链接器可用的列表中,还要注意链接用户要有权限访问这些表,所以还要检查数据库端的GRANT
语句。
到目前为止,已经按计划将整张表都拉取到Kafka,这虽然对于转存数据很是有用,不过都是批量而且并不老是适合将源数据库集成到Kafka流系统中。
JDBC链接器还有一个流式传输到Kafka的选项,它只会传输上次拉取后的数据变动,具体能够基于自增列(例如自增主键)和/或时间戳(例如最后更新时间戳)来执行此操做。在模式设计中的常见作法是使用这些中的一个或两个,例如,事务表ORDERS
可能有:
ORDER_ID
:一个惟一键(多是主键),每一个新订单递增;UPDATE_TS
:每次数据变动时更新的时间戳列。可使用mode
参数配置该选项,好比使用timestamp
:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_08", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-08-", "mode":"timestamp", "table.whitelist" : "demo.accounts", "timestamp.column.name": "UPDATE_TS", "validate.non.null": false } }'
下面会获取表的所有数据,外加源数据后续的更新和插入:
注意:
时间戳+自增列
选项为识别新行和更新行提供了最大的覆盖范围;CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Courtesy of https://techblog.covermymeds.com/databases/on-update-timestamps-mysql-vs-postgres/ CREATE FUNCTION update_updated_at_column() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN NEW.update_ts = NOW(); RETURN NEW; END; $$; CREATE TRIGGER t1_updated_at_modtime BEFORE UPDATE ON foo FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
CREATE TABLE foo ( … CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP , ); CREATE OR REPLACE TRIGGER TRG_foo_UPD BEFORE INSERT OR UPDATE ON foo REFERENCING NEW AS NEW_ROW FOR EACH ROW BEGIN SELECT SYSDATE INTO :NEW_ROW.UPDATE_TS FROM DUAL; END; /
有时可能想从RDBMS中提取数据,但但愿有比整个表更灵活的方式,缘由可能包括:
这可使用JDBC链接器的query
模式。在了解如何实现以前,须要注意如下几点:
query
模式可能不那么灵活,所以从源中简单地删除列的另外一种方法(不管是简单地减小数量,仍是由于敏感信息)都是在链接器自己中使用ReplaceField
单消息转换;后处理
的绝佳方式,使管道尽量简单。下面将展现如何将transactions
表,再加上customers
表中的数据流式传输到Kafka:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_09", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-09", "mode":"bulk", "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;", "poll.interval.ms" : 3600000 } }'
可能注意到已切换回bulk
模式,可使用主键或者时间戳其中一个增量选项,但要确保在SELECT子句中包含相应的主键/时间戳列(例如txn_id
):
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_10", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-10", "mode":"incrementing", "query":"SELECT txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id", "incrementing.column.name": "txn_id", "validate.non.null": false } }'
若是不包括该列(即便它存在于源表中),那么链接器会报错并显示org.apache.kafka.connect.errors.DataException
异常(#561)或java.lang.NullPointerException
异常(#560),这是由于链接器须要在返回的数据中获取值,以即可以存储相应偏移量的最新值。
若是使用query
选项,除非使用mode: bulk
(#566),不然没法指定本身的WHERE子句,也就是说,在查询中使用本身的谓词和使用Kafka进行增量提取之间是互斥的。
若是须要不一样的参数设定,能够建立新的链接器,例如,可能但愿有不一样的参数:
简单来讲,若是全部表参数都同样,则可使用单个链接器。
建立链接器以后,可能在目标Kafka主题中看不到任何数据。下面会一步步进行诊断:
1.查询/connectors
端点,可确认链接器是否建立成功:
$ curl -s“http:// localhost:8083 / connectors” [ “jdbc_source_mysql_10”]
应该看到链接器列表,若是没有,则须要按照以前的步骤进行建立,而后关注Kafka链接器返回的任何错误。
2.检查链接器及其任务的状态:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/status"|jq '.' { "name": "jdbc_source_mysql_10", "connector": { "state": "RUNNING", "worker_id": "kafka-connect:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "kafka-connect:8083" } ], "type": "source" }
正常应该看到全部的链接器和任务的state
都是RUNNING
,不过RUNNING
不老是意味着正常。
3.若是链接器或任务的状态是FAILED
,或者即便状态是RUNNING
可是没有按照预期行为运行,那么能够转到Kafka链接器工做节点的输出(这里有相关的说明),这里会显示是否存在任何实际的问题。以上面的链接器为例,其状态为RUNNING
,可是链接器工做节点日志中实际上全是重复的错误:
ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask) java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `t.id` > -1 ORDER BY `t.id` ASC' at line 1 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
4.在这里,问题是什么并不明确,须要调出链接器的配置来检查指定的查询是否正确:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/config"|jq '.' { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "t.id", "topic.prefix": "mysql-10", "connection.password": "asgard", "validate.non.null": "false", "connection.user": "connect_user", "query": "SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;", "name": "jdbc_source_mysql_10", "connection.url": "jdbc:mysql://mysql:3306/demo" }
5.在MySQL中运行此查询发现能正常执行:
mysql> SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; +------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+ | id | customer_id | amount | currency | txn_timestamp | first_name | last_name | email | gender | comments | +------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+ | 1 | 5 | -72.97 | RUB | 2018-12-12T13:58:37Z | Modestia | Coltart | mcoltart4@scribd.com | Female | Reverse-engineered non-volatile success
6.因此确定是Kafka链接器在执行时作了什么。鉴于错误消息引用t.id
,这是在incrementing.column.name
参数中指定的,可能问题与此有关。经过将Kafka链接器的日志级别调整为DEBUG
,能够看到执行的完整SQL语句:
DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]} prepared SQL query: SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
7.看一下该prepared SQL query
部分,可能会发现:
[…] FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC
8.注意在JOIN
子句的c.id
后面有语句终止符(;),后面有WHERE子句。该WHERE
子句由Kafka链接器附加,用于实现所要求的incrementing
模式,但建立了一个无效的SQL语句; 9.而后在GitHub中查找与看到的错误相关的问题,由于有时它其实是一个已知的问题,例如这个问题; 10.若是链接器存在而且是RUNNING
,而且Kafka链接器工做节点日志中也没有错误,还应该检查:
- 链接器的提取间隔是多少?也许它彻底按照配置运行,而且源表中的数据已经更改,但就是没有拉取到新数据。要检查这一点,能够在Kafka链接器工做节点的输出中查找`JdbcSourceTaskConfig`的值和`poll.interval.ms`的值; - 若是正在使用的是增量摄取,Kafka链接器关于偏移量是如何存储的?若是删除并重建相同名称的链接器,则将保留前一个实例的偏移量。考虑这样的场景,建立完链接器以后,成功地将全部数据提取到源表中的给定主键或时间戳值,而后删除并从新建立了它,新版本的链接器将得到以前版本的偏移量,所以仅提取比先前处理的数据更新的数据,具体能够经过查看保存在其中的`offset.storage.topic`值和相关表来验证这一点。
当Kafka链接器以分布式模式运行时,它会在Kafka主题(经过offset.storage.topic
配置)中存储有关它在源系统中读取的位置(称为偏移量)的信息,当链接器任务重启时,它能够从以前的位置继续进行处理,具体能够在链接器工做节点日志中看到:
INFO Found offset {{protocol=1, table=demo.accounts}={timestamp_nanos=0, timestamp=1547030056000}, {table=accounts}=null} for partition {protocol=1, table=demo.accounts} (io.confluent.connect.jdbc.source.JdbcSourceTask)
每次链接器轮询时,都会使用这个偏移量,它会使用预编译的SQL语句,而且使用Kafka链接器任务传递的值替换?
占位符:
DEBUG TimestampIncrementingTableQuerier{table="demo"."accounts", query='null', topicPrefix='mysql-08-', incrementingColumn='', timestampColumns=[UPDATE_TS]} prepared SQL query: SELECT * FROM `demo`.`accounts` WHERE `demo`.`accounts`.`UPDATE_TS` > ? AND `demo`.`accounts`.`UPDATE_TS` < ? ORDER BY `demo`.`accounts`.`UPDATE_TS` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier) DEBUG Executing prepared statement with timestamp value = 2019-01-09 10:34:16.000 end time = 2019-01-09 13:23:40.000 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
这里,第一个时间戳值就是存储的偏移量,第二个时间戳值是当前时间戳。
虽然没有文档记载,但能够手动更改链接器使用的偏移量,由于是在JDBC源链接器的上下文中,因此能够跨多个源链接器类型,这意味着更改时间戳或主键,链接器会将后续记录视为未处理的状态。
首先要作的是确保Kafka链接器已经刷新了周期性的偏移量,能够在工做节点日志中看到什么时候执行此操做:
INFO WorkerSourceTask{id=jdbc_source_mysql_08-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
看下Kafka的主题,能够看到Kafka链接器建立的内部主题,而且负责偏移量的主题也是其中之一,名字可能有所不一样:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- docker-connect-configs | false | 1 | 1 | 0 | 0 docker-connect-offsets | false | 1 | 1 | 0 | 0 docker-connect-status | false | 5 | 1 | 0 | 0 ksql> PRINT 'docker-connect-offsets' FROM BEGINNING; Format:JSON {"ROWTIME":1547038346644,"ROWKEY":"[\"jdbc_source_mysql_08\",{\"protocol\":\"1\",\"table\":\"demo.customers\"}]","timestamp_nanos":0,"timestamp":1547030057000}
当Kafka链接器任务启动时,它会读取此主题并使用适当主键的最新值。要更改偏移量,只需插入一个新值便可。最简单的方法是转存当前主题内容,修改内容并从新执行,由于一致性和简单,能够考虑使用kafkacat:
$ kafkacat -b kafka:29092 -t docker-connect-offsets -C -K# -o-1 % Reached end of topic docker-connect-offsets [0] at offset 0 ["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547030056000} );
若是是多个链接器,可能复杂些,可是这里只有一个,因此使用了-o-1
标志,它定义了返回的偏移量。
mode=timestamp
来监测表中的变化。时间戳值是1547030056000
,使用相关的时间戳转换之类的工具,能够很容易地转换和操做,好比将其提早一小时(1547026456000
)。接下来,使用更新后的timestamp
值准备新消息:["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}
echo '["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}' | \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
NULL
消息值:echo'[“jdbc_source_mysql_08”,{“protocol”:“1”,“table”:“demo.accounts”}]#'| \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/jdbc_source_mysql_08/tasks/0/restart
从指定的时间戳或者主键处开启表的捕获
当使用时间戳或自增主键模式建立JDBC源链接器时,它会从主键为-1
和/或时间戳为1970-01-01 00:00:00.00
开始,这意味着会得到表的所有内容,而后在后续的轮询中获取任何插入/更新的数据。
可是若是不想要表的完整副本,只是但愿链接器从如今开始,该怎么办呢?这在目前的Kafka链接器中还不支持,但可使用前述的方法。不须要获取现有的偏移量消息并对其进行定制,而是本身建立。消息的格式依赖于正在使用的链接器和表的名称,一种作法是先建立链接器,肯定格式,而后删除链接器,另外一种作法是使用具备相同源表名和结构的环境,除非在该环境中没有可供链接器提取的数据,不然一样也能获得所需的消息格式。
在建立链接器以前,使用适当的值配置偏移量主题。在这里,但愿从demo.transactions
表中提取自增主键大于42的全部行:
echo '["jdbc_source_mysql_20",{"protocol":"1","table":"demo.transactions"}]#{"incrementing":42}' | \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
下面建立链接器:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_20", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-20-", "mode":"incrementing", "table.whitelist" : "demo.transactions", "incrementing.column.name": "txn_id", "validate.non.null": false } }'
在生成的Kafka链接器工做日志中,能够看到:
INFO Found offset {{protocol=1, table=demo.transactions}={incrementing=42}, {table=transactions}=null} for partition {protocol=1, table=demo.transactions} (io.confluent.connect.jdbc.source.JdbcSourceTask) … DEBUG Executing prepared statement with incrementing value = 42 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
和预期同样,Kafka主题中只注入了txn_id
大于42的行:
ksql> PRINT 'mysql-20x-transactions' FROM BEGINNING; Format:AVRO 1/9/19 1:44:07 PM UTC, null, {"txn_id": 43, "customer_id": 3, "amount": {"bytes": "ús"}, "currency": "CNY", "txn_timestamp": "2018-12-15T08:23:24Z"} 1/9/19 1:44:07 PM UTC, null, {"txn_id": 44, "customer_id": 5, "amount": {"bytes": "\f!"}, "currency": "CZK", "txn_timestamp": "2018-10-04T13:10:17Z"} 1/9/19 1:44:07 PM UTC, null, {"txn_id": 45, "customer_id": 3, "amount": {"bytes": "çò"}, "currency": "USD", "txn_timestamp": "2018-04-03T03:40:49Z"}
Kafka消息是键/值对,其中值是有效内容
。在JDBC链接器的上下文中,值是要被提取的表行的内容。Kafka消息中的键对于分区和下游处理很是重要,其中任何关联(好比KSQL)都将在数据中完成。
JDBC链接器默认不设置消息键,可是使用Kafka链接器的单消息转换(SMT)机制能够轻松实现。假设想要提取accounts
表并将其ID
列用做消息键。只需简单地将其添加到下面的配置中便可:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_06", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-06-", "poll.interval.ms" : 3600000, "table.whitelist" : "demo.accounts", "mode":"bulk", "transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" } }'
这时若是使用诸如kafka-avro-console-consumer
之类的工具检查数据,就会看到键(JSON内容以前的最左列)与id
值匹配:
kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --property schema.registry.url=http://schema-registry:8081 \ --topic mysql-06-accounts --from-beginning --property print.key=true 1 {"id":{"int":1},"first_name":{"string":"Hamel"},"last_name":{"string":"Bly"},"username":{"string":"Hamel Bly"},"company":{"string":"Erdman-Halvorson"},"created_date":{"int":17759}} 2 {"id":{"int":2},"first_name":{"string":"Scottie"},"last_name":{"string":"Geerdts"},"username":{"string":"Scottie Geerdts"},"company":{"string":"Mante Group"},"created_date":{"int":17692}}
若是要在数据中设置键以便与KSQL一块儿使用,则须要将其建立为字符串类型,由于KSQL目前不支持其它键类型,具体能够在链接器配置中添加以下内容:
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
而后就能够在KSQL中使用了:
ksql> CREATE STREAM ACCOUNTS WITH (KAFKA_TOPIC='mysql-06X-accounts', VALUE_FORMAT='AVRO'); ksql> SELECT ROWKEY, ID, FIRST_NAME + ' ' + LAST_NAME FROM ACCOUNTS; 1 | 1 | Hamel Bly 2 | 2 | Scottie Geerdts 3 | 3 | Giana Bryce
JDBC链接器要求指定topic.prefix
,但若是不想要,或者想将主题名更改成其它模式,SMT能够实现。
假设要删除mysql-07-
前缀,那么须要一点正则表达式的技巧:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_07", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-07-", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo", "table.whitelist" : "accounts", "mode":"bulk", "transforms":"dropTopicPrefix", "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropTopicPrefix.regex":"mysql-07-(.*)", "transforms.dropTopicPrefix.replacement":"$1" } }'
这样主题名就和表名一致了:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- accounts | false | 1 | 1 | 0 | 0
这是话题比较深刻。
numeric.mapping
: best_fit
若是源中包含NUMERIC/NUMBER
类型的数据,则可能须要这个配置项;query
选项,用于对源表中的数据进行转换;DECIMAL
类型暴露,则numeric.mapping
没法处理:
DECIMAL
;DECIMAL
和NUMERIC
原生存储,所以必须将DECIMAL
字段转换为NUMERIC
;NUMBER
字段中指定长度和标度,例如NUMBER(5,0)
,不能是NUMBER
;NUMERIC
和DECIMAL
都被视为NUMBER,INT
也是;完成以后,下面会作一个解释:
Kafka链接器是一个能够将数据注入Kafka、与特定源技术无关的框架。不管是来自SQL Server、DB二、MQTT、文本文件、REST仍是Kafka链接器支持的任何其它数十种来源,它发送给Kafka的数据格式都为Avro
或JSON
,这一般是一个透明的过程,只是在处理数值数据类型时有些特别,好比DECIMAL
,NUMBER
等等,如下面的MySQL查询为例:
mysql> SELECT * FROM transactions LIMIT 1; +--------+-------------+--------+----------+----------------------+ | txn_id | customer_id | amount | currency | txn_timestamp | +--------+-------------+--------+----------+----------------------+ | 1 | 5 | -72.97 | RUB | 2018-12-12T13:58:37Z | +--------+-------------+--------+----------+----------------------+
挺正常是吧?其实,amount
列是DECIMAL(5,2)
:
mysql> describe transactions; +---------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------------+--------------+------+-----+---------+-------+ | txn_id | int(11) | YES | | NULL | | | customer_id | int(11) | YES | | NULL | | | amount | decimal(5,2) | YES | | NULL | | | currency | varchar(50) | YES | | NULL | | | txn_timestamp | varchar(50) | YES | | NULL | | +---------------+--------------+------+-----+---------+-------+ 5 rows in set (0.00 sec)
可是当使用JDBC链接器的默认设置提取到Kafka中时,最终会是这样:
ksql> PRINT 'mysql-02-transactions' FROM BEGINNING; Format:AVRO 1/4/19 5:38:45 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": {"bytes": "ã\u007F"}, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}
DECIMAL
变成了一个看似乱码的bytes
值,链接器默认会使用本身的DECIMAL
逻辑类型,该类型在Avro中被序列化为字节,能够经过查看Confluent Schema Registry中的相关条目来看到这一点:
$ curl -s "http://localhost:8081/subjects/mysql-02-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")' { "name": "amount", "type": [ "null", { "type": "bytes", "scale": 2, "precision": 64, "connect.version": 1, "connect.parameters": { "scale": "2" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } ], "default": null }
当链接器使用AvroConverter
消费时,这会正常处理并保存为DECIMAL
(而且在Java中也能够反序列化为BigDecimal
),但对于反序列化Avro的其它消费者,它们只会获得字节。在使用启用了模式的JSON时,也会看到这一点,amount
值会是Base64编码的字节字符串:
{ "schema": { "type": "struct", "fields": [ { "type": "bytes", "optional": true, "name": "org.apache.kafka.connect.data.Decimal", "version": 1, "parameters": { "scale": "2" }, "field": "amount" }, }, "payload": { "txn_id": 1000, "customer_id": 5, "amount": "Cv8=" } }
所以,无论使用的是JSON仍是Avro,这都是numeric.mapping配置项的来源。它默认设置为none
(即便用链接器的DECIMAL
类型),但一般但愿链接器将类型实际转换为更兼容的类型,以适合数字的精度,更具体的说明,能够参见相关的文档。
此选项目前不支持DECIMAL
类型,所以这里是在Postgres中具备NUMERIC
类型的相同原理的示例:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_postgres_12", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "postgres-12-", "numeric.mapping": "best_fit", "table.whitelist" : "demo.transactions", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
结果以下所示:
ksql> PRINT 'postgres-12-transactions' FROM BEGINNING; Format:AVRO 1/7/19 6:27:16 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": -72.97, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}
能够在这里看到有关此内容的更多详细信息,以及Postgres、Oracle和MS SQL Server中的示例。
若是须要从多个表中提取数据,则能够经过并行处理来减小总提取时间,这在Kafka的JDBC链接器有两种方法:
前者具备更高的管理开销,但确实提供了每一个表自定义设置的灵活性。若是可使用相同的链接器配置提取全部表,则增长单个链接器中的任务数是一种好方法。
当增长从数据库中提取数据的并发性时,要从总体上考虑。由于运行一百个并发任务虽然可能会更快,但数百个与数据库的链接可能会对数据库产生负面影响。
如下是同一链接器的两个示例。二者都将从数据库中提取全部表,总共6个。在第一个链接器中,未指定最大任务数,所以为默认值1。在第二个中,指定了最多运行三个任务("tasks.max":3
):
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-01-", "mode":"bulk" } }'
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_11", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-11-", "mode":"bulk", "tasks.max":3 } }'
当查询链接器的Kafka链接器RESTAPI时,能够看到每一个链接器正在运行的任务数以及它们已分配的表。第一个链接器有一个任务负责全部6张表:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_01/tasks"|jq '.' [ { "id": { "connector": "jdbc_source_mysql_01", "task": 0 }, "config": { "tables": "`demo`.`NUM_TEST`,`demo`.`accounts`,`demo`.`customers`,`demo`.`transactions`,`security`.`firewall`,`security`.`log_events`", … } } ]
第二个链接器有3个任务,每一个任务分配2张表:
$ curl -s“http:// localhost:8083 / connectors / jdbc_source_mysql_11 / tasks”| jq'。' [ { “ID”: { “connector”:“jdbc_source_mysql_11”,“任务”:0 }, “config”:{ “tables”:“`demo` .NUM_TEST`,`demo` .accounts`”, ... } }, { “ID”: { “connector”:“jdbc_source_mysql_11”,“任务”:1 }, “config”:{ “tables”:“`demo``customers`,`demo` .transactions`”, ... } }, { “ID”: { “connector”:“jdbc_source_mysql_11”,“任务”:2 }, “config”:{ “tables”:“`security``firewall`,`security``log_events`”, ... } } ]