数据同步利器 - canal

前言

大约两年之前,笔者在一个项目中遇到了数据同步的难题。mysql

当时,系统部署了几十个实例,分为1个中心平台和N个分中心平台,而每个系统都对应一个单独的数据库实例。git

在数据库层面,有这样一个需求:github

  • 中心平台数据库要包含全部系统平台的数据。
  • 分中心数据库只包含本系统平台的数据。
  • 在中心平台能够新增或修改 中心平台的数据,但要讲数据实时同步到对应的分中心平台数据库。

这几十个数据库实例之间,没有明确的主从关系,是否同步还要看数据的来源,因此并不能用MySQL的主从同步来作。sql

当时,笔者实验了几种方式,最后采用的方式是基于Mybatis拦截器机制 + 消息队列的方式来作的。数据库

大概原理是经过Mybatis拦截器,拦截到事务操做,好比新增、修改和删除,根据自定义的数据主键(标识数据来源和去向),封装成对象,投递到消息队列对应的topic中去。而后,每一个系统监听不一样的topic,消费数据并同步到数据库。缓存

在此后的一段时间里,知道了canal这个开源组件。发现它更直接,它能够从MySQL的binlog中解析数据,投递到消息队列或其它地方。bash

1、canal简介

提及canal,也是阿里巴巴存在数据同步的业务需求。因此从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变动进行同步,由此衍生出了增量订阅&消费的业务。服务器

基于日志增量订阅&消费支持的业务:测试

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

咱们正能够基于canal的机制,来完成一系列如数据同步、缓存刷新等业务。ui

2、启动canal

一、修改MySQL配置

对于自建的MySQL服务, 须要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置以下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 重复
复制代码

而后建立一个帐户,用来连接MySQL,做为 MySQL slave 的权限。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
复制代码

二、下载

下载canal很是简单,访问 releases页面选择须要的包下载,而后将下载的包解压到指定的目录便可。

tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal

解压完成后,咱们能够看到这样一个目录:

三、修改配置

在启动以前,还须要修改一些配置信息。

首先,定位到canal/conf/example ,编辑instance.properties配置文件,重点有几项:

canal.instance.mysql.slaveId=1234               # canal模拟slaveid
canal.instance.master.address=127.0.0.1:3306    # MySQL数据库地址
canal.instance.dbUsername=canal                 # 做为slave角色的帐户
canal.instance.dbPassword=canal                 # 做为slave角色的帐户密码
canal.instance.connectionCharset = UTF-8        # 数据库编码方式对应Java中的编码类型
canal.instance.filter.regex=.*\\..*             # 表过滤的表达式
canal.mq.topic=example                          # MQ 主题名称
复制代码

咱们但愿canal监听到的数据,要发送到消息队列中,还须要修改canal.properties文件,在这里主要是MQ的配置。在这里笔者使用的是阿里云版RocketMQ,参数以下:

# 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
# 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 内网接入点
canal.mq.producerGroup = GID_**group(在后台建立)
canal.mq.namespace = rocketmq实例id
canal.mq.topic=(在后台建立)
复制代码

四、启动

直接运行启动脚本便可运行:./canal/bin/startup.sh 。 而后打开logs/canal/canal.log文件,能够看到启动效果。

2020-02-26 21:12:36.715 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
复制代码

3、启动MQ监听

咱们把canal监听到的数据,投送到了消息队列中,那么接下来就是写个监听程序来消费其中的数据。

为了方便,笔者直接使用的是阿里云版RocketMQ,测试代码以下:

public static void main(String[] args) {
	Properties properties = new Properties();
	// 您在控制台建立的 Group ID
	properties.put(PropertyKeyConst.GROUP_ID, "GID_CANAL");
	// AccessKey 阿里云身份验证,在阿里云服务器管理控制台建立
	properties.put(PropertyKeyConst.AccessKey, "accessKey");
	// SecretKey 阿里云身份验证,在阿里云服务器管理控制台建立
	properties.put(PropertyKeyConst.SecretKey, "secretKey");
	// 设置 TCP 接入域名,到控制台的实例基本信息中查看
	properties.put(PropertyKeyConst.NAMESRV_ADDR,"http://MQ_INST_xxx.mq-internet.aliyuncs.com:80");
	// 集群订阅方式(默认)
	// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
	Consumer consumer = ONSFactory.createConsumer(properties);
	consumer.subscribe("example","*",new CanalListener());
	consumer.start();
	logger.info("Consumer Started");
}
复制代码

4、测试

把环境都部署好以后,咱们进入测试阶段来看一看实际效果。

咱们以一张t_account表为例,这里面记录着帐户id和帐户余额。

首先,咱们新增一条记录,insert into t_account (id,user_id,amount) values (4,4,200);

此时,MQ消费到数据以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "200.0"
	}],
	"database": "seata",
	"es": 1582723607000,
	"id": 2,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582723607656,
	"type": "INSERT"
}
复制代码

经过数据能够看到,这里面详细记录了数据库的名称、表的名称、表的字段和新增数据的内容等。

而后,咱们还能够把这条数据修改一下:update t_account set amount = 150 where id = 4;

此时,MQ消费到数据以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724016000,
	"id": 3,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": [{
		"amount": "200.0"
	}],
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724016353,
	"type": "UPDATE"
}
复制代码

能够看到,除了修改后的内容,canal还用old字段记录了修改前字段的值。

最后,咱们删除这条数据:delete from t_account where id = 4;

相应的,MQ消费到数据以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724155000,
	"id": 4,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724155370,
	"type": "DELETE"
}
复制代码

监听到数据库表的变化以后,就能够根据本身的业务场景,对这些数据进行业务上的处理啦。

5、总结

能够看到,利用canal组件能够很方便的完成对数据变化的监听。若是利用消息队列来作数据同步的话,只有一点须要格外注意,即消息顺序性的问题。

binlog自己是有序的,但写入到mq以后如何保障顺序是值得关注的问题。

mq顺序性问题这里,能够看到canal的消费顺序性相关解答。

相关文章
相关标签/搜索