https://github.com/alibaba/canalnode
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。mysql
起源:早期,阿里巴巴B2B公司由于存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变动,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变动进行同步,由此衍生出了增量订阅&消费的业务,今后开启了一段新纪元。git
基于日志增量订阅&消费支持的业务:github
mysql主备复制实现:web
从上层来看,复制分红三步:spring
原理相对比较简单:sql
我的理解,数据增量订阅与消费应当有以下几个点:docker
能够参考下图:数据库
说明:api
instance模块:
整个parser过程大体可分为几部:
说明:
1 数据1:n业务 :
为了合理的利用数据库资源, 通常常见的业务都是按照schema进行隔离,而后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是经过cobar/tddl来解决数据源路由问题。 因此,通常一个数据库实例上,会部署多个schema,每一个schema会有由1个或者多个业务方关注。
2 数据n:1业务:
一样,当一个业务的数据规模达到必定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据须要处理时,就须要连接多个store进行处理,消费的位点就会变成多份,并且数据消费的进度没法获得尽量有序的保证。 因此,在必定业务场景下,须要将拆分后的增量数据进行归并处理,好比按照时间戳/全局id进行排序归并.
目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:
定义了3个cursor
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
实现说明:
instance表明了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
1. manager方式: 和你本身的内部web console/manager系统进行对接。(alibaba内部使用方式)
2. spring方式:基于spring xml + properties进行定义,构建spring配置.
server表明了一个canal的运行实例,为了方便组件化使用,特地抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现:
具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:
流式api设计:
canal采用protobuff:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [发生的变动]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [
byte
数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是不是ddl变动操做,好比create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变动数据,可为多条,
1
个binlog event事件可对应多条变动,好比批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变动]
isNull [值是否为
null
]
value [具体的内容,注意为文本]
|
canal-message example:
好比数据库中的表:
1
2
3
4
5
6
7
8
9
|
mysql> select * from person;
+----+------+------+------+
| id | name | age | sex |
+----+------+------+------+
|
1
| zzh |
10
| m |
|
3
| zzh3 |
12
| f |
|
4
| zzh4 |
5
| m |
+----+------+------+------+
3
rows
in
set
(
0.00
sec)
|
更新一条数据(update person set age=15 where id=4):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
****************************************************
* Batch Id: [
2
] ,count : [
3
] , memsize : [
165
] , Time :
2016
-
09
-
07
15
:
54
:
18
* Start : [mysql-bin.
000003
:
6354
:
1473234846000
(
2016
-
09
-
07
15
:
54
:
06
)]
* End : [mysql-bin.
000003
:
6550
:
1473234846000
(
2016
-
09
-
07
15
:
54
:
06
)]
****************************************************
================> binlog[mysql-bin.
000003
:
6354
] , executeTime :
1473234846000
, delay : 12225ms
BEGIN ----> Thread id:
67
----------------> binlog[mysql-bin.
000003
:
6486
] , name[canal_test,person] , eventType : UPDATE , executeTime :
1473234846000
, delay : 12225ms
id :
4
type=
int
(
11
)
name : zzh4 type=varchar(
100
)
age :
15
type=
int
(
11
) update=
true
sex : m type=
char
(
1
)
----------------
END ----> transaction id:
308
================> binlog[mysql-bin.
000003
:
6550
] , executeTime :
1473234846000
, delay : 12240ms
|
canal的HA分为两部分,canal server和canal client分别有对应的ha实现:
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),能够看下我以前zookeeper的相关文章。
Canal Server:
大体步骤:
HA配置架构图(举例)以下所示:
canal还有几种链接方式:
1. 单连
2. 两个client+两个instance+1个mysql
当mysql变更时,两个client都能获取到变更
3. 一个server+两个instance+两个mysql+两个client
4. instance的standby配置
从总体架构上来讲canal是这种架构的(canal中没有包含一个运维的console web来对接,但要运用于分布式环境中确定须要一个Manager来管理):
一个整体的manager system对应于n个Canal Server(物理上来讲是一台服务器), 那么一个Canal Server对应于n个Canal Instance(destinations). 大致上是三层结构,第二层也须要Manager统筹运维管理。
那么随着Docker技术的兴起,是否能够试一下下面的架构呢?
这里总结了一下Canal的一些点,仅供参考: