debezium获取mysql binlog经过kafka实现mysql同步

目标是实现mysql经过binlog的同步



安装confluent

  • 下载地址地址
  • 解压tar -zxvf confluent-5.1.2-2.11.tar.gz
  • 进入到confluent中
  • 快速启动 ZooKeeper,Kafka,Schema Registry,Control Center,Kafka Connect,Kafka REST Proxy,KSQL。
  • ./bin/confluent start
    在这里插入图片描述
    如图就是快速启动成功

安装debezium插件

官网地址
debezium-connector-mysql/0.8.1.Final下载链接html

  • 安装插件Debezium 把解压后的debezium复制到conlfuent安装目录share/java文件中例如/usr/local/hadoop/confluent-5.1.2/share/java/
  • 配置好后从新启动conlfuent便可

开启mysql的binlog

  • 修改 my.cnf
log-bin=/var/lib/mysql/mysql-bin
server-id=1

从新启动服务java

systemctl start mysqld.service
  • 登录后查看
show variables like '%log_bin%'

建立kafka connect链接

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/  -d '{
  "name": "demo2",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "XXX.XX.XXX.XXX",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "XXXX",
    "database.server.id": "1",
    "database.server.name": "demo2",
    "database.whitelist": "kafka1",
    "decimal.handling.mode": "double",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.demo2"
  }
}'
  • 建立好后能够进行查看
curl -X GET http://localhost:8083/connectors/xxx/status
  • 启动成功以后会发现你的topics中出与表对应的topic
kafka-topics --list --zookeeper localhost:2181
  • 启动消费者进行查看数据的变动
./kafka-avro-console-consumer --topic full.kafka_base_db.test2-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

kafka向mysql中写入(未完成)目前认为序列化问题(此处思路错误可跳过)

  • 运用kafka-connect-jdbc
  • 修改/usr/local/hadoop/confluent-5.1.2/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties文件
name=test9
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

# The topics to consume from - required for sink connectors like this one
topics=full.test_kafka.test2

# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:mysql://114.67.100.229:3306/simple_stream?user=root&password=root
auto.create=true

*启动connect因为已经启动直接加载便可mysql

confluent load  elasticsearch-sink-test 
-d /usr/local/hadoop/confluent-5.1.2/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties
  • 若是没进行同步经过命令来查看启动后状态
curl -X GET http://localhost:8083/connectors/xxx/status

解决思路(已实现)

由于它是用avro序列化因此须要一个同步程序来进行解析它并写入mysql中去git

程序配置
spring:
  application:
    name: data-center
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://XXX.XX.XXX.XXX:3306/kafka2?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
    username: root
    password: XXX
  jpa:
    show-sql: true
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
  #    time-zone: UTC
  kafka:
    bootstrap-servers: XXX.XX.XXX.XXX:9092
    consumer:
      group-id: debezium-kafka-connector
      key-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      auto-offset-reset: earliest
      properties:
        schema.registry.url: http://XXX.XX.XXX.XXX:8081
logging:
  level:
    spirng:
      jpa:
        DEBUG
    io:
      debezium:
        transforms: TRACE
    org:
      hibernate: DEBUG
  • 同步程序主要是用来消费消息而后写入mysql,消费者有两个监听器,由于debezium监听分别是ddl和dml
  • ddl
  1. 插入:INSERT
  2. 更新:UPDATE
  3. 删除:DELETE
  • dml
    数据定义语言DDL用来建立数据库中的各类对象-----表、视图、
    索引、同义词、聚簇等如:
    CREATE TABLE/VIEW/INDEX/SYN/CLUSTER
    表 视图 索引 同义词 簇
    DDL操做是隐性提交的!不能rollback
  • 代码
package com.example.cdcdemo.kakfa.avro;

import com.example.cdcdemo.kakfa.avro.sql.SqlProvider;
import com.example.cdcdemo.kakfa.avro.sql.SqlProviderFactory;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.Optional;

@Slf4j
@Component
public class KafkaAvroConsumerRunner {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private NamedParameterJdbcTemplate namedTemplate;

    //,containerFactory = "kafkaListenerContainerFactory0"
    @KafkaListener(id = "demo_ddl", topics = "demo2")
    public void listenerUser(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception {
        GenericData.Record key = record.key();
        GenericData.Record value = record.value();
        log.info("Received record: {}", record);
        log.info("Received record: key {}", key);
        log.info("Received record: value {}", value);

        String databaseName = Optional.ofNullable(value.get("databaseName")).map(Object::toString).orElse(null);
        String ddl = Optional.ofNullable(value.get("ddl")).map(Object::toString).orElse(null);

        if (StringUtils.isBlank(ddl)) {            return;
        }
        handleDDL(ddl, databaseName);
    }

    /**
     * 执行数据库ddl语句
     *
     * @param ddl
     */
    private void handleDDL(String ddl, String db) {
        log.info("ddl语句 : {}", ddl);
        try {
            if (StringUtils.isNotBlank(db)) {
                ddl = ddl.replace(db + ".", "");
                ddl = ddl.replace("`" + db + "`.", "");
            }

            jdbcTemplate.execute(ddl);
        } catch (Exception e) {
            log.error("数据库操做DDL语句失败,", e);
        }
    }

    //,containerFactory = "kafkaListenerContainerFactory1"
    @KafkaListener(id = "demo_dml", topicPattern = "demo2.kafka1.*")
    public void listenerAvro(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception {
        GenericData.Record key = record.key();
        GenericData.Record value = record.value();
        log.info("Received record: {}", record);
        log.info("Received record: key {}", key);
        log.info("Received record: value {}", value);

        if (Objects.isNull(value)) {
            return;
        }

        GenericData.Record source = (GenericData.Record) value.get("source");
        String table = source.get("table").toString();
        Envelope.Operation operation = Envelope.Operation.forCode(value.get("op").toString());

        String db = source.get("db").toString();
        handleDML(key, value, table, operation);
    }

    private void handleDML(GenericData.Record key, GenericData.Record value,
                           String table, Envelope.Operation operation) {
        SqlProvider provider = SqlProviderFactory.getProvider(operation);
        if (Objects.isNull(provider)) {
            log.error("没有找到sql处理器提供者.");
            return;
        }

        String sql = provider.getSql(key, value, table);
        if (StringUtils.isBlank(sql)) {
            log.error("找不到sql.");
            return;
        }

        try {
            log.info("dml语句 : {}", sql);
            namedTemplate.update(sql, provider.getSqlParameterMap());
        } catch (Exception e) {
            log.error("数据库DML操做失败,", e);
        }
    }

}
  • provider和table字段解析器太多这里就不一一列出有须要的请联系本人

入坑

  • 坑1:
Caused by: org.apache.kafka.connect.errors.ConnectException: 
java.sql.SQLException: No suitable driver found for 
jdbc:mysql://XXX.XXX.XXX.XXX:3306/simple_stream?
user=root&password=root\n\

少mysql-connect驱动包
加入mysql-connect的驱动jar包放在/usr/local/hadoop/confluent-5.1.2/share/java/kafka-connect-jdbc/下面github

  • 坑2
    经过对日志的查看怀疑是avro的序列化问题没法写入到mysql中
    能够进行写入可是sql会报sql错误(已解决经过同步程序来解决)web

  • 坑3
    连通后发现建立好表启动同步成语不会读取历史,认为是offset问题,(由于测试时一直用的同一个消费者组因此偏移量一直会有一个记录的值——记录再connector的topic中:链接器也在一个单独的Kafka topic(即的数据库历史topic))spring

  • 坑4
    最开始的对应表的topic可监听到可是相应的server tiopc(ddl)监听不到,建立不了表致使监听到表变化后写入时报错(问题同上)sql

  • 坑5
    为何当链接成功connect后默认只有一个分区
    ( 数据库历史topic只给链接器用的,可是链接器能够选择在另外一个kafka topic上生产模式改变事件(schema change events),专门给消费者应用使用。(在数据库模式(schema)历史中,保证事件的全局顺序是很重要的,因此数据库历史topic不能分区,这意味着,建立这个topic的时候必须指定分区数为1。当依赖kafka自动建立topic的机制时,要保证配置选项中的默认分区数num.partitions被设置成1)。)数据库

  • 索引apache

https://damonchow.github.io/2019/08/07/cdc-debezium-two/#more

https://debezium.io/blog/2016/09/19/Serializing-Debezium-events-with-Avro/

https://cloud.tencent.com/developer/article/1406858

https://blog.csdn.net/laoyang360/article/details/87897886
https://docs.confluent.io/current/schema-registry/installation/config.html#avro-compatibility-level

http://www.luyixian.cn/news_show_17625.aspx

https://blog.csdn.net/qq_42612645/article/details/82891815