canal是阿里开源的中间件,主要用于同步mysql数据库变动。具体参见:https://github.com/alibaba/canal/releasesmysql
搭建环境:linux
vmware centos7 部署mysql和canalgit
windows开发canal client,自动捕获mysql数据库变动github
一、尝试用yum安装mysqlweb
wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch.rpm
返回:2018-07-13 16:04:42 (63.9 KB/s) - ‘mysql57-community-release-el7-10.noarch.rpm’ saved [25548/25548]sql
sudo rpm -Uvh mysql57-community-release-el7-10.noarch.rpm
sudo yum install -y mysql-community-server
若是执行顺利,安装mysql server 成功。数据库
2.改用阿里源安装windows
但是官方的yum源在国内访问效果不佳,我下载mysql server的速度太慢了,决定改用阿里源centos
#下载wget yum install wget -y #备份当前的yum源 mv /etc/yum.repos.d /etc/yum.repos.d.backup4comex #新建空的yum源设置目录 mkdir /etc/yum.repos.d #下载阿里云的yum源配置 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo #最后重建缓存 yum clean all yum makecache
3.安装MariaDB缓存
MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL受权许可。开发这个分支的缘由之一是:甲骨文公司收购了MySQL后,有将MySQL闭源的潜在风险,所以社区采用分支的方式来避开这个风险。MariaDB的目的是彻底兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品。
安装mariadb,大小59 M。
[root@yl-web yl]# yum install mariadb-server mariadb
其它几条经常使用的mariadb命令:
systemctl start mariadb #启动MariaDB
systemctl stop mariadb #中止MariaDB
systemctl restart mariadb #重启MariaDB
systemctl enable mariadb #设置开机启动
运行systemctl start mariadb,而后就能够正常使用mysql了
4.设置数据库密码:
set password for 'root'@'localhost' =password('root');
5.遇到的几个问题
①从windows访问centos mysql失败
解决方案:设置mysql容许远程链接
mysql -u root; //赋予任何主机访问数据的权限 mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //使修改生效 mysql>FLUSH PRIVILEGES;
②进行上述操做以后,发现仍然链接失败,返回错误
Can't connect to MySQL server on '10.168.12.43' (10060)
解决方案:从windows链接vmware里面的mysql失败,关闭windows防火墙后成功。
(参考:https://github.com/alibaba/canal/wiki/QuickStart)
1.下载canal server
https://github.com/alibaba/canal/releases
我下载的是canal.exaple-1.0.24.gar.gz,下载完成后解压缩:
mkdir /tmp/canal
tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal
2.查看binlog相关数据库命令:
是否启用了日志 mysql>show variables like 'log_bin'; 怎样知道当前的日志 mysql> show master status; 查看mysql binlog模式 show variables like 'binlog_format'; 获取binlog文件列表 show binary logs; 查看当前正在写入的binlog文件 show master status\G 查看指定binlog文件的内容 show binlog events in 'mysql-bin.000002';
3.开启binlog
若是log_bin关闭,须要在etc下面找到my.cnf,开启binlog:
server-id=1
log-bin=/var/lib/mysql/mysql-bin
而后重启mysql服务。
4.添加canal mysql数据库帐号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
5.配置canal实例,设置本地数据库信息
vi conf/example/instance.properties
## mysql serverId canal.instance.mysql.slaveId = 1234 # position info canal.instance.master.address = 10.168.12.43:3306 canal.instance.master.journal.name =mysql-bin.000003 canal.instance.master.position = canal.instance.master.timestamp = …… canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName =testcanal canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex = .*\\..*
6.启动canal
sh bin/startup.sh
7.查看日志
vi logs/canal/canal.log vi logs/example/example.log
1.引入pom依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency>
2.客户端代码
public class ClientTest { public static void main(String args[]) { // 建立连接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
3.创建数据库链接,进行insert,delete等数据库操做
1.canal创建链接失败
解决方案:用telnet命令测试创建链接仍然失败,关闭linux防火墙。
systemctl stop firewalld.service
其余centos7防火墙相关命令:
firewall-cmd --list-ports#查看已经开放的端口:
firewall-cmd --reload #重启firewall
systemctl stop firewalld.service #中止firewall
systemctl disable firewalld.service #禁止firewall开机启动
firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)