官网地址
debezium-connector-mysql/0.8.1.Final下载链接html
log-bin=/var/lib/mysql/mysql-bin server-id=1
从新启动服务java
systemctl start mysqld.service
show variables like '%log_bin%'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "demo2", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "XXX.XX.XXX.XXX", "database.port": "3306", "database.user": "root", "database.password": "XXXX", "database.server.id": "1", "database.server.name": "demo2", "database.whitelist": "kafka1", "decimal.handling.mode": "double", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.demo2" } }'
curl -X GET http://localhost:8083/connectors/xxx/status
kafka-topics --list --zookeeper localhost:2181
./kafka-avro-console-consumer --topic full.kafka_base_db.test2-smt --bootstrap-server 192.168.1.22:9092 --from-beginning
name=test9 connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 # The topics to consume from - required for sink connectors like this one topics=full.test_kafka.test2 # Configuration specific to the JDBC sink connector. # We want to connect to a SQLite database stored in the file test.db and auto-create tables. connection.url=jdbc:mysql://114.67.100.229:3306/simple_stream?user=root&password=root auto.create=true
*启动connect因为已经启动直接加载便可mysql
confluent load elasticsearch-sink-test -d /usr/local/hadoop/confluent-5.1.2/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties
curl -X GET http://localhost:8083/connectors/xxx/status
由于它是用avro序列化因此须要一个同步程序来进行解析它并写入mysql中去git
spring: application: name: data-center datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://XXX.XX.XXX.XXX:3306/kafka2?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC username: root password: XXX jpa: show-sql: true jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 # time-zone: UTC kafka: bootstrap-servers: XXX.XX.XXX.XXX:9092 consumer: group-id: debezium-kafka-connector key-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer" value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer" auto-offset-reset: earliest properties: schema.registry.url: http://XXX.XX.XXX.XXX:8081 logging: level: spirng: jpa: DEBUG io: debezium: transforms: TRACE org: hibernate: DEBUG
package com.example.cdcdemo.kakfa.avro; import com.example.cdcdemo.kakfa.avro.sql.SqlProvider; import com.example.cdcdemo.kakfa.avro.sql.SqlProviderFactory; import io.debezium.data.Envelope; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.Optional; @Slf4j @Component public class KafkaAvroConsumerRunner { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private NamedParameterJdbcTemplate namedTemplate; //,containerFactory = "kafkaListenerContainerFactory0" @KafkaListener(id = "demo_ddl", topics = "demo2") public void listenerUser(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception { GenericData.Record key = record.key(); GenericData.Record value = record.value(); log.info("Received record: {}", record); log.info("Received record: key {}", key); log.info("Received record: value {}", value); String databaseName = Optional.ofNullable(value.get("databaseName")).map(Object::toString).orElse(null); String ddl = Optional.ofNullable(value.get("ddl")).map(Object::toString).orElse(null); if (StringUtils.isBlank(ddl)) { return; } handleDDL(ddl, databaseName); } /** * 执行数据库ddl语句 * * @param ddl */ private void handleDDL(String ddl, String db) { log.info("ddl语句 : {}", ddl); try { if (StringUtils.isNotBlank(db)) { ddl = ddl.replace(db + ".", ""); ddl = ddl.replace("`" + db + "`.", ""); } jdbcTemplate.execute(ddl); } catch (Exception e) { log.error("数据库操做DDL语句失败,", e); } } //,containerFactory = "kafkaListenerContainerFactory1" @KafkaListener(id = "demo_dml", topicPattern = "demo2.kafka1.*") public void listenerAvro(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception { GenericData.Record key = record.key(); GenericData.Record value = record.value(); log.info("Received record: {}", record); log.info("Received record: key {}", key); log.info("Received record: value {}", value); if (Objects.isNull(value)) { return; } GenericData.Record source = (GenericData.Record) value.get("source"); String table = source.get("table").toString(); Envelope.Operation operation = Envelope.Operation.forCode(value.get("op").toString()); String db = source.get("db").toString(); handleDML(key, value, table, operation); } private void handleDML(GenericData.Record key, GenericData.Record value, String table, Envelope.Operation operation) { SqlProvider provider = SqlProviderFactory.getProvider(operation); if (Objects.isNull(provider)) { log.error("没有找到sql处理器提供者."); return; } String sql = provider.getSql(key, value, table); if (StringUtils.isBlank(sql)) { log.error("找不到sql."); return; } try { log.info("dml语句 : {}", sql); namedTemplate.update(sql, provider.getSqlParameterMap()); } catch (Exception e) { log.error("数据库DML操做失败,", e); } } }
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://XXX.XXX.XXX.XXX:3306/simple_stream? user=root&password=root\n\
少mysql-connect驱动包
加入mysql-connect的驱动jar包放在/usr/local/hadoop/confluent-5.1.2/share/java/kafka-connect-jdbc/下面github
坑2
经过对日志的查看怀疑是avro的序列化问题没法写入到mysql中
能够进行写入可是sql会报sql错误(已解决经过同步程序来解决)web
坑3
连通后发现建立好表启动同步成语不会读取历史,认为是offset问题,(由于测试时一直用的同一个消费者组因此偏移量一直会有一个记录的值——记录再connector的topic中:链接器也在一个单独的Kafka topic(即的数据库历史topic))spring
坑4
最开始的对应表的topic可监听到可是相应的server tiopc(ddl)监听不到,建立不了表致使监听到表变化后写入时报错(问题同上)sql
坑5
为何当链接成功connect后默认只有一个分区
( 数据库历史topic只给链接器用的,可是链接器能够选择在另外一个kafka topic上生产模式改变事件(schema change events),专门给消费者应用使用。(在数据库模式(schema)历史中,保证事件的全局顺序是很重要的,因此数据库历史topic不能分区,这意味着,建立这个topic的时候必须指定分区数为1。当依赖kafka自动建立topic的机制时,要保证配置选项中的默认分区数num.partitions被设置成1)。)数据库
索引apache
https://damonchow.github.io/2019/08/07/cdc-debezium-two/#more
https://debezium.io/blog/2016/09/19/Serializing-Debezium-events-with-Avro/
https://cloud.tencent.com/developer/article/1406858
https://blog.csdn.net/laoyang360/article/details/87897886
https://docs.confluent.io/current/schema-registry/installation/config.html#avro-compatibility-level
http://www.luyixian.cn/news_show_17625.aspx
https://blog.csdn.net/qq_42612645/article/details/82891815