Canal
是阿里的一款开源项目,纯 Java
开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了 MySQL
(也支持 mariaDB
)。web
Canal
除了支持 binlog
实时 增量同步 数据库以外也支持 全量同步 ,本文主要分享使用Canal来实现从MySQL到Elasticsearch的全量同步;sql
可经过使用 adapter
的 REST
接口手动触发 ETL
任务,实现全量同步。数据库
在执行全量同步的时候,同一个
destination
的增量同步任务会被
阻塞,待全量同步完成被阻塞的增量同步会被
从新唤醒
PS:关于Canal的部署与 实时同步 请看文章《Canal高可用架构部署》json
adapter
的 ETL
接口为:/etl/{type}/{task}
bash
8081
例子:服务器
curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml
执行成功输出:多线程
{"succeeded":true,"resultMessage":"导入ES 数据:17 条"}
当同步的数据量比较大时,执行一段时间后会出现下图的错误架构
查看 canal
源码得知当同步的数据量大于1w时,会分批进行同步,每批1w条记录,并使用多线程来并行执行任务,而 adapter
默认的链接池为3,当线程获取数据库链接等待超过1分钟就会抛出该异常。app
线程数为当前服务器cpu的可用线程数
修改 adapter
的 conf/application.yml
文件中的 srcDataSources
配置项,增长 maxActive
配置数据库的最大链接数为当前服务器cpu的可用线程数curl
cpu线程数能够下命令查看
grep 'processor' /proc/cpuinfo | sort -u | wc -l
当同步的表字段比较多时,概率出现如下报错
因为 adapter
的表映射配置文件中的 commitBatch
提交批大小设置过大致使(6000)
修改 adapter
的 conf/es7/xxx.yml
映射文件中的 commitBatch
配置项为3000
三千万的数据量用时3.5小时左右
因为当数据量大于1w时 canal
会对数据进行分批同步,每批1w条经过分页查询实现;因此当数据量较大时会出现深分页的状况致使查询很是慢。
预先使用ID、时间或者业务字段等进行数据分批后再进行同步,减小每次同步的数据量。
使用ID进行数据分批,适合增加类型的ID,如自增ID、雪花ID等;
计算过程:
(1) 计算同步的次数
总数据量 / 每次同步量 = 10
(2) 计算每批ID的增量值
(最大ID - 最小ID) / 次数 = 847405488993555.9
(3) 计算每批ID的值
最小ID + 增量值 = ID2 ID2 + 增量值 = ID3 ... ID9 + 增量值 = 最大ID
(4) 使用分批的ID值进行同步
修改sql映射配置,的 etlCondition
参数:
etlCondition: "where id >= {} and id < {}"
调用etl接口,并增长 params
参数,多个参数之间使用 ;
分割
curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml?params=最小ID;ID2 curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml?params=ID2;ID3 ...
扫码关注有惊喜!