debezium+kafka实现mysql数据库的双向数据增量同步

一.部署前需要准备的东西

    jdk:1.8.0_181
    confluent:5.4.1
    debezium:1.1.0.Final

    修改数据库配置文件my.cnf 或 my.ini
    server-id=11
    log-bin=mysql-bin #添加这一行就ok
    binlog-format=ROW #选择row模式
    binlog_row_image=FULL
    expire_logs_days=10
    gtid_mode=ON
    enforce_gtid_consistency=ON
    wait_timeout=31536000
    interactive_timeout=31536000
    binlog_rows_query_log_events=ON
    default-time_zone='+8:00'
    character_set_server=utf8 #待同步的两个数据库要保持编码一致,否则会出现中文乱码

二.开始部署

1.解压 conflunet,将confluent添加到环境变量

vim /etc/profile

export CONFLUENT_HOME=/confluent-5.4.1
export PATH=$PATH:$CONFLUENT_HOME/bin

使环境变量生效 source /etc/profile

2.将debezium解压到 /confluent-5.4.1/share/java

3.修改相关配置文件
      路径   /confluent-5.4.1/etc/kafka

a.zookeeper.properties

测试环境无需修改
b.修改 server.properties
在文件最后添加以下下内容

max.poll.interval.ms=15000
max.poll.records=10

c.修改connect-standalone.properties

plugin.path=/confluent-5.4.1/share/java/

d.添加针对数据库A的 debezium connect ,将源数据库A中的数据同步到kafka

vim connector1.properties

name=htcloud_topics
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname= //debezium要监控的数据库地址
database.port=3306
database.user=admin
database.password=admin
database.server.id=1 //要与数据库配置文件my.cnf 中的server-id保持一致
database.server.name=htcloud0
database.whitelist=htcloud //等同于数据库名称
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=account_topic
include.schema.changes=true

e.添加针对数据库B的 kafka connect  jdbc sink ,将kafka中的数据同步到目标数据库B

vim connector2.properties

name=htcloud-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称  格式 database.server.name + database.whitelis + 表名称 ,前两个属性在connector1.properties,要与之对应
topics=htcloud0.htcloud.htdq_report_month
# 配置JDBC链接
connection.url=jdbc:mysql://172.16.16.xx:3306/htcloud_slave?user=root&password=root
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
#如果表不存在自动创建表
auto.create=true
delete.enabled=true
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode=record_key
pk.fields=id
auto.evolve=true
table.name.format=htdq_report_month

到此为止  数据库表   htdq_report_month 单向增量同步(A-->B)已经完成,如果需要同步多张表,添加多个 kafka connect jdbc sink  即可。 启动测试  ./connect-standalone connect-standalone.properties connector1.properties connector2.properties &

继续增加B-->A的增量同步

f.与d步骤雷同,添加针对于数据库B的debezium connect

vim slaveSource.properties

name=htcloud_follower
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=172.16.16.24
database.port=3306
database.user=root
database.password=root
database.server.id=11
database.server.name=htcloud_follower
#table.whitlelist=htdq_report_month
database.whitelist=htcloud_slave
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=history_htcloud_follower
include.schema.changes=true

g.与e步骤雷同,添加针对于数据库A的 kafka connect jdbc sink

vim slaveSink.properties

name=htcloud-master-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
#topics=htcloud0.htcloud.htdq_report_month
topics=htcloud_follower.htcloud_slave.htdq_report_month
# 配置JDBC链接
connection.url=jdbc:mysql://172.16.16.14:3306/htcloud?user=admin&password=admin
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
auto.create=true
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode=record_value
pk.fields=id
auto.evolve=true
table.name.format=htdq_report_month

到此为止  数据库表  htdq_report_month 双向向增量同步(A-->B  B->A)已经完成

启动测试 ./connect-standalone connect-standalone.properties connector1.properties connector2.properties slaveSource.properties  slaveSink.properties &