canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据 订阅 和 消费。应该是阿里云DTS(Data Transfer Service)的开源版本。mysql
Canal与DTS提供的功能基本类似:git
1)基于Mysql的Slave协议实时dump binlog流,解析为事件发送给订阅方。github
2)单Canal instance,单DTS数据订阅通道均只支持订阅一个RDS,提供给一个消费者。sql
3)可使用canal-client客户端进行消息消费。docker
4)也能够经过简单配置,也能够不须要自行使用canal-client消费,能够选择直接投递到kafka或者RocketMQ集群,用户只须要使用消息队列的consumer消费便可。数据库
5)成功消费消息后须要进行Ack,以确保一致性,服务端则会维护客户端目前的消费位点。安全
MySQL的主从复制分红三步:ruby
canal 就是模拟了这个过程。架构
canal 1.1.4开始支持admin管理,经过canal-admin为canal提供总体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操做界面,方便更多用户快速和安全的操做,替代了过去繁琐的配置文件管理。运维
总体部署架构以下。
说明:
Instance模块
EventParser模块的类图设计以下
每一个EventParser都会关联两个内部组件:CanalLogPositionManager , CanalHAController
EventParser根据HAController获知连到哪里,经过LogPositionManager获知从哪一个位点开始解析,以后便经过Mysql Slave协议拉取binlog进行解析,推入EventSink
目前只提供了一个带有实际做用的实现:GroupEventSink
GroupEventSink用于将多个instance上的数据进行归并,经常使用于分库后的多数据源归并。
EventStore的类图以下
官方提供的实现类是
MemoryEventStoreWIthBuffer,内部采用的是一个RingBuffer:
这些位点信息经过MetaManager进行管理。这也解释了为何一个canal instance只能支撑一个消费者:EventStore的RingBuffer只为一个消费者维护信息。
数据格式已经在前文给出,Canal和DTS客户端均采起:
拉取事件 -> 消费 -> 消费成功后ACK
这样的消费模式,并支持消费不成功时进行rollback,从新消费该数据。
下面是一段简单的客户端调用实例(略去异常处理):
// 建立CanalConnector, 链接到localhost:11111
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
connector.connect(); // 链接
connector.subscribe(); // 开始订阅binlog
// 开始循环拉取
while (running) {
Message message = connector.getWithoutAck(1024); // 获取指定数量的数据
long batchId = message.getId();
for (Entry entry : message.getEntries()){
// 对每条消息进行处理
}
connector.ack(batchId); // ack
}
5.1 优势
1)性能优异、功能全面
2)运维方便
3)多语言支持
5.2 缺点
好了,花了10分钟应该对canal有大体了解了,下一期,阿丸计划手把手教你搭建canal集群和admin管理平台,记得关注哦。
都看到最后了,原创不易,点个关注,点个赞吧~
知识碎片从新梳理,构建Java知识图谱: github.com/saigu/JavaK…(历史文章查阅很是方便)