canal 1.1.4集群安装并监听mysql表的变更

        需求是要可以实时监听数据库表的变更,动态获取。采用的方法是canal+kafka动态队列创建websocket。以前安装的canal是在apache集群上的,此次须要安装在CDH集群上面,遇到很多问题。这里把过程分享给你们,但愿可以少走弯路。java

1、canal安装及配置mysql

         先说canal的安装。canal有不少版本,我选择的是1.1.4版本。这个版本有webUI界面,算是极大的优化,配置起来不用那么麻烦。canal的下载地址在这里,能够本身选择合适的版本。git

         canal版本选择下载github

        选择1.1.4以后,有简单的配置说明和原理说明。上面虽然说mysql支持的版本为5.5及如下,不过个人mysql是5.7版本,也仍是能够获取到数据的,这个没必要担忧。首先开启mysql的binlog功能,简单的配置如下就好,不须要太过复杂。web

       1. mysql部分spring

        假如mysql是默认安装位置的话,修改C:\ProgramData\MySQL\MySQL Server 5.7下的my.ini文件。添加如下内容:sql

# Binary Logging.
log-bin=mysql-bin
#binlog日志格式
binlog-format=ROW
expire_logs_days = 30

    binlog日志格式要选择row,通常状况下 statement  模式也不会有问题,可以正常获取到数据。可是一旦mysql表的变更比较复杂,statement  生成的日志是极可能有bug,不能正确复制的。选择row模式日志的数据量会大一些,可是问题也会少一些。数据库

    下面两个可配可不配,官方建议是强烈建议配置,假如只是尝试canal的话那就无所谓了,正式使用环境仍是配置比较好。apache

binlog-do-db = epg #配置须要同步的库

binlog-ignore-db = mysql #配置不须要同步的库

    配置完以后建立一个专门用来读取binlog日志的mysql用户,并赋予相关的权限。bootstrap

CREATE USER canal IDENTIFIED BY 'canal';      

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost ;    

FLUSH PRIVILEGES;

在建立用户并赋予权限的时候可能会遇到各类各样的问题,好比密码太简单、大写不识别之类的。大写不识别就换成小写,密码太简单就修改密码级别。

    先查看mysql密码策略:

SHOW VARIABLES LIKE 'validate_password%';

    再修改密码等级:

set global validate_password.policy=LOW;

    这个列名有的是validate_password_policy有的是validate_password.policy,修改的时候请对应本身mysql数据库中的字段名。

    以上mysql的准备工做就作完了,如今开始配置canal。

    2.canal部分

    1)canal想要web界面的话要下载两个压缩包,一个是admin一个是deployer。一个是集群管理包一个是单机部署包,假如不须要集群的话那么下载deployer就能够了。在本身喜欢的位置使用wget下载。

wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    下载完以后建立文件夹准备将解压的文件放入其中,我喜欢直接选择压缩包名字做为解压文件夹。

mkdir canal.admin-1.1.4
mkdir canal.deployer-1.1.4
tar -zxvf canal.deployer-1.1.4.tar.gz -C canal.deployer-1.1.4
tar -zxvf canal.admin-1.1.4.tar.gz -C canal.admin-1.1.4

    解压指定文件夹的时候-C要大写。

    2)先配置canal.admin。解压以后有4个文件夹,进入conf文件夹

    修改conf文件夹中的application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  #canal_manager数据库所在的mysql数据库ip地址
  address: 192.168.49.104:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  #密码默认为admin,实际在web界面时要求密码大于等于6位
  adminPasswd: 123456

    要改的地方很少配置一下数据库和canalweb的用户密码就能够了。密码要大于等于6位,不然没法登陆。

    在conf文件夹下面还有canal_manager的建立语句。 cat canal_manager.sql,再把sql文件的内容复制下来,在mysql中执行,建立canal_manager数据库。

    3)到这里canal_admin的配置接就完成了,执行bin目录下的start.sh文件,启动canal_admin。

sh ../bin/startup.sh

在上面的配置文件中端口配置为8089,因此页面的访问就是你所配置canal-admin主机的ip:8089,假如没有问题的话就能看到canal-admin的登陆界面。

输入用户名密码,没有问题的话就可以进入管理界面。简单说说我在上面两个步骤中遇到的比较有价值的问题,遇到了问题能够从log文件夹下面的日志中读取。

1.JDBC链接MySQL报错Unknown system variable 'query_cache_size'。这是由于mysql的链接驱动包和个人mysql版本对应不上。将mysql-connecter-java换成高版本的就能够看到webUi的界面了。mysql-connecter-java能够到mavenrepository中下载。下载地址

成功登陆以后进入管理界面,点击集群管理->新建集群,输入集群名称和集群的zookeeper地址,多个地址用,分隔。

建立集群的点击肯定的时候可能会报错modified_time不能为空,这应该是webUI的bug了...因为项目源码已经打包,因此不如直接去改数据库,把canal_manager中的modified_time都修改成可为空,避免相似的错误出现。

创建集群以后就开始配置集群信息,点击右边的操做->主配置,开始配置集群信息(即conf/canal-template.properties文件)。

点击载入模板,而后配置本身的信息,配置完后点击保存。

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
#canal_admin登陆的用户名和密码
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
# canal.admin的管理地址
canal.admin.manager = 192.168.49.104:8089
canal.admin.port = 11110
# canal登陆的用户名和密码,密码在canal_manager的canal_user表中
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# 配置zookpeer地址
canal.zkServers = 192.168.49.104:2181,192.168.49.105:2181,192.168.49.106:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
# 配置由tcp接收仍是kafka接收或者RocketMQ接收消息
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#配置canal访问mysql的用户名和密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
#定义canal_deployer的instance列表,不一样instance用,隔开
canal.destinations = nyhx_student
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
#自动扫描canal_deployer的conf目录下面instance,因此上面的canal.destinations配不配、配置对不对都没关系,可#以空在那里
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
#配置kafka地址
canal.mq.servers = 192.168.49.104:9092,192.168.49.105:9092,192.168.49.106:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
#canal发送信息的最大大小,要修改成268435456(8M)
canal.mq.maxRequestSize = 268435456
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

这里要说明一点,因为canal1.1.4和以前的版本相比,一样一条数据库改动,发送的信息要多不少,因此默认的1M是不够用的,须要canal.mq.maxRequestSize修改成本身想要的大小。除此以外,kafka也有队列消息体最大值的限定,须要将 message.max.bytes 配置增大, fetch.max.bytes 也增大。

kafka的配置我是经过CDH Cloudera-Manager的webUI界面修改的,由于我没找到它把kafka安装在哪了...

吐槽一下,bin目录下面stop.sh并不能中止canal_admin,在没有启动的状况下也能登陆8089端口访问web页面。

4)以上canal_admin就安装完了,下面是canal_deployer的安装。回到canal_deployer1.1.4.tar.gz解压后的位置,一样有四个文件夹。

进入conf文件夹,将canal_local.properties的内容复制到canal.properties文件中,配置以下:

# register ip
canal.register.ip = 192.168.49.106

# canal admin config
canal.admin.manager = 192.168.49.104:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal_cluster

修改完后将canal.deployer-1.1.4发送给其它从机,发送以后要修改canal.properties中的canal.register.ip为各自主机ip。

cd ..
scp canal.deployer-1.1.4 root@192.168.105:/usr/software

修改完ip以后,分别启动canal.deployer,启动方法和canal.admin相似。

sh bin/startup.sh

都修改完后,刷新web页面,在server管理页面中可以看到所配置的实例。

5)配置instance

在web界面进入instance管理,点击新建instance。输入instance名称,选择集群,载入模板,开始修改instance。

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
#由于canal是假装成mysql的一个slave,因此slaveId不要和mysql 的slaveId以及其它instance的slaveId重复
canal.instance.mysql.slaveId=654319

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#监听的mysql地址
canal.instance.master.address=192.168.49.214:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
#查看mysql binlog日志所用的用户密码,因为我建立的canal用户所在的数据库和监听的数据库不是同一个,这里求方便用#的root用户
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
#查看的数据库
canal.instance.defaultDatabaseName=nyhx
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
#配置监听nyhx数据库下的全部表
canal.instance.filter.regex=nyhx\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
#默认topic,在canal.mq.dynamicTopic没有匹配到的表变更都会到这个topic下面来
canal.mq.topic=other
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#配置动态topic,指定哪张表进入哪一个分区
canal.mq.dynamicTopic=student:nyhx.student;magicStone:nyhx.magicstone
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#配置主键id
canal.mq.partitionHash=nyhx.student:uid,nyhx.magicstone:magic_id
#################################################

在开始监听以前,在kafka中须要有对应的topic才行。kafka相关指令:

#查看全部topic
kafka-topics --list --zookeeper 192.168.49.104:2181
#建立topic
kafka-topics --create --zookeeper 192.168.49.104:2181 --topic other --replication-factor 3 --partitions 1
#监听topic消息
kafka-console-consumer --bootstrap-server 192.168.49.104:9092 --topic other

当全部topic都建立好以后,在web页面将instance设置为启动,集群会自动为它分配一台服务器。以后监听topic,修改监听表,看看是否有数据产生。像这样:

假若有的话,那么同步实时数据的任务就完成啦!没有的话,能够看看instance的日志。有web界面也不用一台虚拟机一台虚拟机去翻了。

通常来讲都是一些配置问题,看着日志判断问题在哪里产生,再修改过来。这里要注意canal发送的数据量大小和kafka接受的数据量大小,我就在这里卡了很久,监听kafka topic虽然不报错,可是也没有数据。回过头来看整个配置过程并不困难,只要理解配置内容的格式和做用很快就能配好。

最后再附上看过的参考文档:

canal-admin1.1.14界面化安装配置canal集群详解

如何使用Canal同步MySQL的Binlog到Kafka

CDC系列(一)、Canal 集群部署及使用(带WebUI)

Canal简介及配置说明

canal系列—配置文件介绍

Kafka消息体大小设置的一些细节

记录一次canal发送大请求至kafka失败

相关文章
相关标签/搜索