安装Zookeeper:https://my.oschina.net/jerval/blog/3057528mysql
安装Kafka:https://my.oschina.net/jerval/blog/3057502sql
下载MySQL Connector plugin archive:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/0.9.5.Final/debezium-connector-mysql-0.9.5.Final-plugin.tar.gz数据库
参考文档:https://debezium.io/docs/connectors/mysql/bootstrap
>>建立Kafka插件路径:vim
# mkdir -p /usr/local/share/kafka/plugins
>>解压debeziummaven
tar -zxvf /00temp/debezium-connector-mysql-0.9.5.Final-plugin.tar.gz -C /usr/local/share/kafka/plugins
>>修改MySQL配置测试
# vim /etc/my.cnf [mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction须要定义,不能和canal的slaveId重复
>>添加binlog用户账号并受权.net
CREATE USER binlog IDENTIFIED BY 'Binlog-123'; GRANT SELECT, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog'@'%'; -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog'@'%'; -- GRANT SELECT, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'binlog'@'%' ; FLUSH PRIVILEGES;
>>/usr/local/kafka/mysql.properties插件
//mysql.properties name=inventory-connector connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=10.17.81.212 database.port=3306 database.user=binlog database.password=Binlog-123 database.server.id=184054 database.server.name=fullfillment database.whitelist=inventory database.history.kafka.bootstrap.servers=10.17.81.211:9092 database.history.kafka.topic=dbhistory.fullfillment include.schema.changes=true
>>启动Zookeeper:code
# zkServer.sh start
>>启动Kafka:
# cd /usr/local/kafka # bin/kafka-server-start.sh config/server.properties
>>以独立模式启动kafka connect,此时debezium会对数据库中的每个表建立一个topic,消费相应的topic,便可获取binlog解析信息。
修改config/connect-standalone.properties
bootstrap.servers=10.17.81.211:9092 plugin.path=/usr/local/share/kafka/plugins
经常使用命令:
//启动kafka connect bin/connect-standalone.sh config/connect-standalone.properties mysql.properties //查看topic列表 bin/kafka-topics.sh --list --zookeeper 10.17.81.211:2181 //消费测试主题 bin/kafka-console-consumer.sh --bootstrap-server 10.17.81.211:9092 --topic test --from-beginning //消费dbhistory.fullfillment主题 bin/kafka-console-consumer.sh --bootstrap-server 10.17.81.211:9092 --topic dbhistory.fullfillment --from-beginning //消费fullfillment主题 bin/kafka-console-consumer.sh --bootstrap-server 10.17.81.211:9092 --topic fullfillment --from-beginning //消费fullfillment.inventory.TableName1主题(serverName.databaseName.tableName) bin/kafka-console-consumer.sh --bootstrap-server 10.17.81.211:9092 --topic fullfillment.inventory.TableName1 --from-beginning