需求是要可以实时监听数据库表的变更,动态获取。采用的方法是canal+kafka动态队列创建websocket。以前安装的canal是在apache集群上的,此次须要安装在CDH集群上面,遇到很多问题。这里把过程分享给你们,但愿可以少走弯路。java
1、canal安装及配置mysql
先说canal的安装。canal有不少版本,我选择的是1.1.4版本。这个版本有webUI界面,算是极大的优化,配置起来不用那么麻烦。canal的下载地址在这里,能够本身选择合适的版本。git
canal版本选择下载github
选择1.1.4以后,有简单的配置说明和原理说明。上面虽然说mysql支持的版本为5.5及如下,不过个人mysql是5.7版本,也仍是能够获取到数据的,这个没必要担忧。首先开启mysql的binlog功能,简单的配置如下就好,不须要太过复杂。web
1. mysql部分spring
假如mysql是默认安装位置的话,修改C:\ProgramData\MySQL\MySQL Server 5.7下的my.ini文件。添加如下内容:sql
# Binary Logging. log-bin=mysql-bin #binlog日志格式 binlog-format=ROW expire_logs_days = 30
binlog日志格式要选择row,通常状况下 statement 模式也不会有问题,可以正常获取到数据。可是一旦mysql表的变更比较复杂,statement 生成的日志是极可能有bug,不能正确复制的。选择row模式日志的数据量会大一些,可是问题也会少一些。数据库
下面两个可配可不配,官方建议是强烈建议配置,假如只是尝试canal的话那就无所谓了,正式使用环境仍是配置比较好。apache
binlog-do-db = epg #配置须要同步的库 binlog-ignore-db = mysql #配置不须要同步的库
配置完以后建立一个专门用来读取binlog日志的mysql用户,并赋予相关的权限。bootstrap
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost ; FLUSH PRIVILEGES;
在建立用户并赋予权限的时候可能会遇到各类各样的问题,好比密码太简单、大写不识别之类的。大写不识别就换成小写,密码太简单就修改密码级别。
先查看mysql密码策略:
SHOW VARIABLES LIKE 'validate_password%';
再修改密码等级:
set global validate_password.policy=LOW;
这个列名有的是validate_password_policy有的是validate_password.policy,修改的时候请对应本身mysql数据库中的字段名。
以上mysql的准备工做就作完了,如今开始配置canal。
2.canal部分
1)canal想要web界面的话要下载两个压缩包,一个是admin一个是deployer。一个是集群管理包一个是单机部署包,假如不须要集群的话那么下载deployer就能够了。在本身喜欢的位置使用wget下载。
wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
下载完以后建立文件夹准备将解压的文件放入其中,我喜欢直接选择压缩包名字做为解压文件夹。
mkdir canal.admin-1.1.4 mkdir canal.deployer-1.1.4 tar -zxvf canal.deployer-1.1.4.tar.gz -C canal.deployer-1.1.4 tar -zxvf canal.admin-1.1.4.tar.gz -C canal.admin-1.1.4
解压指定文件夹的时候-C要大写。
2)先配置canal.admin。解压以后有4个文件夹,进入conf文件夹
修改conf文件夹中的application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: #canal_manager数据库所在的mysql数据库ip地址 address: 192.168.49.104:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin #密码默认为admin,实际在web界面时要求密码大于等于6位 adminPasswd: 123456
要改的地方很少配置一下数据库和canalweb的用户密码就能够了。密码要大于等于6位,不然没法登陆。
在conf文件夹下面还有canal_manager的建立语句。 cat canal_manager.sql,再把sql文件的内容复制下来,在mysql中执行,建立canal_manager数据库。
3)到这里canal_admin的配置接就完成了,执行bin目录下的start.sh文件,启动canal_admin。
sh ../bin/startup.sh
在上面的配置文件中端口配置为8089,因此页面的访问就是你所配置canal-admin主机的ip:8089,假如没有问题的话就能看到canal-admin的登陆界面。
输入用户名密码,没有问题的话就可以进入管理界面。简单说说我在上面两个步骤中遇到的比较有价值的问题,遇到了问题能够从log文件夹下面的日志中读取。
1.JDBC链接MySQL报错Unknown system variable 'query_cache_size'。这是由于mysql的链接驱动包和个人mysql版本对应不上。将mysql-connecter-java换成高版本的就能够看到webUi的界面了。mysql-connecter-java能够到mavenrepository中下载。下载地址
成功登陆以后进入管理界面,点击集群管理->新建集群,输入集群名称和集群的zookeeper地址,多个地址用,分隔。
建立集群的点击肯定的时候可能会报错modified_time不能为空,这应该是webUI的bug了...因为项目源码已经打包,因此不如直接去改数据库,把canal_manager中的modified_time都修改成可为空,避免相似的错误出现。
创建集群以后就开始配置集群信息,点击右边的操做->主配置,开始配置集群信息(即conf/canal-template.properties文件)。
点击载入模板,而后配置本身的信息,配置完后点击保存。
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd #canal_admin登陆的用户名和密码 canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config # canal.admin的管理地址 canal.admin.manager = 192.168.49.104:8089 canal.admin.port = 11110 # canal登陆的用户名和密码,密码在canal_manager的canal_user表中 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # 配置zookpeer地址 canal.zkServers = 192.168.49.104:2181,192.168.49.105:2181,192.168.49.106:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ # 配置由tcp接收仍是kafka接收或者RocketMQ接收消息 canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #配置canal访问mysql的用户名和密码 canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# #定义canal_deployer的instance列表,不一样instance用,隔开 canal.destinations = nyhx_student # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance #自动扫描canal_deployer的conf目录下面instance,因此上面的canal.destinations配不配、配置对不对都没关系,可#以空在那里 canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = manager canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## #配置kafka地址 canal.mq.servers = 192.168.49.104:9092,192.168.49.105:9092,192.168.49.106:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 #canal发送信息的最大大小,要修改成268435456(8M) canal.mq.maxRequestSize = 268435456 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
这里要说明一点,因为canal1.1.4和以前的版本相比,一样一条数据库改动,发送的信息要多不少,因此默认的1M是不够用的,须要canal.mq.maxRequestSize修改成本身想要的大小。除此以外,kafka也有队列消息体最大值的限定,须要将 message.max.bytes 配置增大, fetch.max.bytes 也增大。
kafka的配置我是经过CDH Cloudera-Manager的webUI界面修改的,由于我没找到它把kafka安装在哪了...
吐槽一下,bin目录下面stop.sh并不能中止canal_admin,在没有启动的状况下也能登陆8089端口访问web页面。
4)以上canal_admin就安装完了,下面是canal_deployer的安装。回到canal_deployer1.1.4.tar.gz解压后的位置,一样有四个文件夹。
进入conf文件夹,将canal_local.properties的内容复制到canal.properties文件中,配置以下:
# register ip canal.register.ip = 192.168.49.106 # canal admin config canal.admin.manager = 192.168.49.104:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal_cluster
修改完后将canal.deployer-1.1.4发送给其它从机,发送以后要修改canal.properties中的canal.register.ip为各自主机ip。
cd .. scp canal.deployer-1.1.4 root@192.168.105:/usr/software
修改完ip以后,分别启动canal.deployer,启动方法和canal.admin相似。
sh bin/startup.sh
都修改完后,刷新web页面,在server管理页面中可以看到所配置的实例。
5)配置instance
在web界面进入instance管理,点击新建instance。输入instance名称,选择集群,载入模板,开始修改instance。
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 #由于canal是假装成mysql的一个slave,因此slaveId不要和mysql 的slaveId以及其它instance的slaveId重复 canal.instance.mysql.slaveId=654319 # enable gtid use true/false canal.instance.gtidon=false # position info #监听的mysql地址 canal.instance.master.address=192.168.49.214:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password #查看mysql binlog日志所用的用户密码,因为我建立的canal用户所在的数据库和监听的数据库不是同一个,这里求方便用#的root用户 canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 #查看的数据库 canal.instance.defaultDatabaseName=nyhx # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #canal.instance.filter.regex=.*\\..* #配置监听nyhx数据库下的全部表 canal.instance.filter.regex=nyhx\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config #默认topic,在canal.mq.dynamicTopic没有匹配到的表变更都会到这个topic下面来 canal.mq.topic=other # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #配置动态topic,指定哪张表进入哪一个分区 canal.mq.dynamicTopic=student:nyhx.student;magicStone:nyhx.magicstone #canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #配置主键id canal.mq.partitionHash=nyhx.student:uid,nyhx.magicstone:magic_id #################################################
在开始监听以前,在kafka中须要有对应的topic才行。kafka相关指令:
#查看全部topic kafka-topics --list --zookeeper 192.168.49.104:2181 #建立topic kafka-topics --create --zookeeper 192.168.49.104:2181 --topic other --replication-factor 3 --partitions 1 #监听topic消息 kafka-console-consumer --bootstrap-server 192.168.49.104:9092 --topic other
当全部topic都建立好以后,在web页面将instance设置为启动,集群会自动为它分配一台服务器。以后监听topic,修改监听表,看看是否有数据产生。像这样:
假若有的话,那么同步实时数据的任务就完成啦!没有的话,能够看看instance的日志。有web界面也不用一台虚拟机一台虚拟机去翻了。
通常来讲都是一些配置问题,看着日志判断问题在哪里产生,再修改过来。这里要注意canal发送的数据量大小和kafka接受的数据量大小,我就在这里卡了很久,监听kafka topic虽然不报错,可是也没有数据。回过头来看整个配置过程并不困难,只要理解配置内容的格式和做用很快就能配好。
最后再附上看过的参考文档:
canal-admin1.1.14界面化安装配置canal集群详解