一.部署前需要准备的东西
jdk:1.8.0_181
confluent:5.4.1
debezium:1.1.0.Final
修改数据库配置文件my.cnf 或 my.ini
server-id=11
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
binlog_row_image=FULL
expire_logs_days=10
gtid_mode=ON
enforce_gtid_consistency=ON
wait_timeout=31536000
interactive_timeout=31536000
binlog_rows_query_log_events=ON
default-time_zone='+8:00'
character_set_server=utf8 #待同步的两个数据库要保持编码一致,否则会出现中文乱码
二.开始部署
1.解压 conflunet,将confluent添加到环境变量
vim /etc/profile
export CONFLUENT_HOME=/confluent-5.4.1
export PATH=$PATH:$CONFLUENT_HOME/bin
使环境变量生效 source /etc/profile
2.将debezium解压到 /confluent-5.4.1/share/java
3.修改相关配置文件
路径 /confluent-5.4.1/etc/kafka
a.zookeeper.properties
测试环境无需修改
b.修改 server.properties
在文件最后添加以下下内容
max.poll.interval.ms=15000
max.poll.records=10
c.修改connect-standalone.properties
plugin.path=/confluent-5.4.1/share/java/
d.添加针对数据库A的 debezium connect ,将源数据库A中的数据同步到kafka
vim connector1.properties
name=htcloud_topics
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname= //debezium要监控的数据库地址
database.port=3306
database.user=admin
database.password=admin
database.server.id=1 //要与数据库配置文件my.cnf 中的server-id保持一致
database.server.name=htcloud0
database.whitelist=htcloud //等同于数据库名称
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=account_topic
include.schema.changes=true
e.添加针对数据库B的 kafka connect jdbc sink ,将kafka中的数据同步到目标数据库B
vim connector2.properties
name=htcloud-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称 格式 database.server.name + database.whitelis + 表名称 ,前两个属性在connector1.properties,要与之对应
topics=htcloud0.htcloud.htdq_report_month
# 配置JDBC链接
connection.url=jdbc:mysql://172.16.16.xx:3306/htcloud_slave?user=root&password=root
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
#如果表不存在自动创建表
auto.create=true
delete.enabled=true
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode=record_key
pk.fields=id
auto.evolve=true
table.name.format=htdq_report_month
到此为止 数据库表 htdq_report_month 单向增量同步(A-->B)已经完成,如果需要同步多张表,添加多个 kafka connect jdbc sink 即可。 启动测试 ./connect-standalone connect-standalone.properties connector1.properties connector2.properties &
继续增加B-->A的增量同步
f.与d步骤雷同,添加针对于数据库B的debezium connect
vim slaveSource.properties
name=htcloud_follower
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=172.16.16.24
database.port=3306
database.user=root
database.password=root
database.server.id=11
database.server.name=htcloud_follower
#table.whitlelist=htdq_report_month
database.whitelist=htcloud_slave
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=history_htcloud_follower
include.schema.changes=true
g.与e步骤雷同,添加针对于数据库A的 kafka connect jdbc sink
vim slaveSink.properties
name=htcloud-master-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
#topics=htcloud0.htcloud.htdq_report_month
topics=htcloud_follower.htcloud_slave.htdq_report_month
# 配置JDBC链接
connection.url=jdbc:mysql://172.16.16.14:3306/htcloud?user=admin&password=admin
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
auto.create=true
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode=record_value
pk.fields=id
auto.evolve=true
table.name.format=htdq_report_month
到此为止 数据库表 htdq_report_month 双向向增量同步(A-->B B->A)已经完成
启动测试 ./connect-standalone connect-standalone.properties connector1.properties connector2.properties slaveSource.properties slaveSink.properties &