前面的文章里,咱们了解到 canal 能够从 MySQL 中感知数据的变化。这是由于它模拟 MySQL slave 的交互协议,假装本身为 MySQL slave ,从而实现了主从复制。mysql
正是了解到这一点,笔者有两个问题便一直萦绕于心:sql
今天,笔者准备就着这两个问题,扒拉扒拉 canal 的代码,一探究竟。数据库
在谈 canal 以前,咱们有必要再重温下 MySQL 主从复制的原理。数组
总结上图的流程以下:bash
上图就很形象的描述了 canal 的角色。它的原理也很简单:服务器
看完了 MySQL 主从复制和 canal 原理以后,为了方便 debug ,笔者已经在 GitHub Fork 了源码,并导入本地。网络
能够找到 com.alibaba.otter.canal.deployer.CanalLauncher
类,它就是 canal 独立版本启动的入口类。数据结构
在这里,直接运行 main 方法便可运行 canal ,和在 /canal/bin/startup.sh
中效果同样。架构
事实上,canal 的代码比较多,在架构上又分了不少模块设计,好比事件解析器、事件消费、内存存储、服务实例、元数据、高可用等。并发
本文不打算面面俱到介绍每个的实现,那就得正儿八经写一个 canal 系列才行。主要仍是为了开头咱们提出的那两个问题。
上面咱们已经说到,CanalLauncher
是canal 启动的入口类。
运行 main 方法以后, canal 会先作不少准备工做。好比加载配置文件、初始化消息队列、启动 canal Admin、加载Spring配置、注册钩子程序等。
canal 模拟 slave 协议,是在EventParser
模块中开始进行的。
在 canal 代码中,整个流程简化以下:
// 开始执行replication
// 1. 构造Erosa链接
ErosaConnection erosaConnection = buildErosaConnection();
// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);
// 3. 执行dump前的准备工做
preDump(erosaConnection);
erosaConnection.connect();// 连接
// 查询master serverId
long queryServerId = erosaConnection.queryServerId();
if (queryServerId != 0) {
serverId = queryServerId;
}
// 4. 获取binlog最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
// 加载元数据
processTableMeta(startPosition);
// 从新连接,由于在找position过程当中可能有状态,须要断开后重建
erosaConnection.reconnect();
// 4. 开始dump数据
erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);
复制代码
在开始以前,canal 必须先要和 MySQL 服务器创建链接,并完成客户端身份验证。
在 MySQL 中,链接过程协议以下:
在代码中,咱们看一下它的链接方法:
其中,negotiate
方法是握手协议和客户端验证的具体实现。就是按照 MySQL 的协议规范,经过上面建立的Socket channel
来读写网络数据。
正确链接到 MySQL 后,在开始执行 dump 指令以前,还要初始化一些配置信息。
思路就是经过 MySQL 执行器,执行 SQL 语句,获取信息。
代码就不粘了,不过它们执行的语句以下:
show variables like 'binlog_format' #获取binlog format格式
show variables like 'binlog_row_image' #获取binlog image格式
show variables like 'server_id' #获取matser serverId
show master status #获取binlog名称和position
复制代码
如今开始调用 erosaConnection.dump(binlogfilename,binlogPosition,func)
方法,来注册slave和发送dump命令。
在使用COM_BINLOG_DUMP
请求binlog事件以前发送,在主服务器上注册一个从服务器,它的指令是COM_REGISTER_SLAVE
。
注册完以后,就是发送dump请求,它的指令是COM_BINLOG_DUMP
。
在执行完这段代码后,咱们经过show processlist;
查看进程,就能够看到这个dump线程的状态。
id | user | host | db | command | time | state |
---|---|---|---|---|---|---|
139 | canal | localhost:62901 | null | Binlog Dump | 3 | Master has sent all binlog to slave; waiting for more updates |
在上面章节中,咱们已经看到,MySQL主服务器已经接受了 canal 这个从服务器,那么当canal拿到binlog内容后, 又是怎么解析它的呢?
首先,还记得在配置MySQL服务器的时候,咱们将binlog-format
设置为ROW模式,它是基于行的复制。
binlog中每个数据变动能够叫作事件,在ROW模式下,有几个主要的事件类型:
事件 | SQL命令 | rows 内容 |
---|---|---|
TABLE_MAP_EVENT | null | 定义将要更改的表。 |
WRITE_ROWS_EVENT | 插入 | 要插入的行数据 |
DELETE_ROWS_EVENT | 删除 | 被删除的数据 |
UPDATE_ROWS_EVENT | 更新 | 原数据+要更改的数据 |
每一次数据的变动,都会触发2个事件,先把要更改的表信息告诉你,而后再告诉你更改的row内容。
好比TABLE_MAP_EVENT + WRITE_ROWS_EVENT
。
canal在接收到binlog数据后,并不会立刻把它解析成咱们熟悉的JSON数据,而是在发送的时候才开始。
好比咱们选择使用RocketMQ
,那么在发送以前才开始将binlog里面的byte数组转化为对象。
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
复制代码
在这两个方法里,就完成了byte数组到对象的转化。转化成的FlatMessage
对象,就成了咱们在消息队列中消费到的数据结构。
public class FlatMessage implements Serializable {
private long id;
private String database;
private String table;
private List<String> pkNames;
private Boolean isDdl;
private String type;
// binlog executeTime
private Long es;
// dml build timeStamp
private Long ts;
private String sql;
private Map<String, Integer> sqlType;
private Map<String, String> mysqlType;
private List<Map<String, String>> data;
private List<Map<String, String>> old;
}
复制代码
正如本文开头所言,笔者在刚了解到canal机制的时候,确实以为很难以想象。
咦,它是怎么模拟MySQL slave的呢 ? 总以为是否是有啥黑科技在里面。。。
事实上,这是源于笔者对MySQL的无知。
MySQL早就制定好了各类接口协议,怎么链接、验证、注册和dump都明明白白的写在那儿啦。
正是应了那句话:花开正好,只待君来~