WX搜索【程序员内点事】,回复【666】妙趣横生。java
canal
是阿里开发的一款基于数据库增量日志解析,提供增量数据订阅与消费的框架,整个框架纯JAVA
开发,目前仅支持Mysql
和MariaDB
(和mysql相似)。mysql
那什么是数据库增量日志?git
MySQL的日志种类是比较多的,主要包含:错误日志、查询日志、慢查询日志、事务日志、二进制日志。而MySQL
数据库所发生的数据变动(DML
(data manipulation language)数据操纵语言,也就是咱们熟悉的增删改),都会以二进制日志(binary log
)形式存储。程序员
在介绍canal
原理以前,咱们先来回顾一下MySQL
主从同步的原理,这或许会让你更好的理解canal
的工做机制。github
一、MySQL主从同步原理:web
MySQL主从同步也叫读写分离,能够提高数据库的负载和容错能力,实现数据库的高可用sql
先来分析一张MySQL主从同步原理图: 数据库
以上图片源自网络,若有侵权联系删除vim
master节点操做过程:服务器
当master
节点数据发生更改后(delete、update、insert,仍是建立函数、存储过程等操做),向binary log
中写入记录日志,这些记录又叫作二进制日志事件
(binary log events)。
show binlog events
复制代码
这些事件会按照顺序写入bin log中。当slave节点启动链接到master节点的时候,master节点会为slave节点开启binlog dump线程(负责传输binlog数据)。
一旦master节点的bin log发生变化时,bin logdump线程会通知slave节点有能够传输的binlog,并将相应的bin log内容发送给slave节点。
slave节点操做过程:
slave节点上会建立两个线程:一个I/O线程,一个SQL线程。I/O线程链接到master节点,master节点上的binlog dump
线程会将binlog的内容发送给该I\O线程。
该I/O线程接收到binlog内容后,再将内容写入到本地的relay log。而sql线程读取到I/O线程写入的ralay log,将relay log中的内容写入slave数据库。
二、canal原理
懂了上边MySQL的主从同步原理,canal的工做机制就很好理解了。其实canal是模拟了MySQL数据库中,slave节点与master节点的交互协议,假装本身为MySQL slave节点,向MySQL master节点发送dump协议
,MySQL master节点收到dump请求,开始推送binary log给slave节点(也就是canal
)。
以上图片源自网络,若有侵权联系删除
光说不练假把式,开干!
在写代码前咱们先对MySQL进行一下改造,安装MySQL就再也不细说了,基本操做。
一、查看一下MySQL是否开启了binary log功能
show binary logs
复制代码
若是没有开启是图中的状态,通常用户是没有这个命令权限的,不过我有,啧啧啧! 若是没有须要手动开启,而且在
my.cnf
文件中配置binlog-format
为Row
模式
log-bin=mysq-bin binlog-format=Row 复制代码
log-bin
是binlog
文件存放位置 binlog-format
设置MySQL复制log-bin的方式
MySQL的三种复制方式:
基于SQL语句的复制(statement-based replication, SBR)
基于行的复制(row-based replication, RBR)
混合模式复制(mixed-based replication, MBR)
二、为canal 建立一个有权限操做MySQL的用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; 复制代码
三、安装canal
下载地址:https://github.com/alibaba/canal/releases
下载后选择版本例如:canal.deployer-xxx.tar.gz
四、配置canal
修改instance.properties文件,须要添加监听数据库和表的规则,canal能够全量监听数据库,也能够针对某个表进行监听,比较灵活。
vim conf/example/instance.properties
复制代码
#################################################
## mysql serverId canal.instance.mysql.slaveId = 2020 # position info 修改本身的数据库(canal要监听的数据库 地址 ) canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password 修改为本身 数据库信息的帐号 (单独开一个 准备阶段建立的帐号) canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # table regex 表的监听规则 # canal.instance.filter.regex = blogs\.blog_info canal.instance.filter.regex = .\*\\\\..\* # table black regex canal.instance.filter.black.regex = 复制代码
启动canal
sh bin/startup.sh
复制代码
看一下server日志,确认一下canal是否正常启动
vi logs/canal/canal.log
复制代码
显示canal server is running now即为成功
2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111] 2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 复制代码
五、编写Java客户端代码,实现canal监听
引入依赖包
<dependency>
<groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> 复制代码
这里只是简单实现
public class MainApp {
public static void main(String... args) throws Exception { /** * 建立与 */ CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); /** * 监控数据库中全部表 */ connector.subscribe(".*\\..*"); /** * 指定要监控的表,库名.表名 */ //connector.subscribe("xin-master.jk_order"); connector.rollback(); //120次心跳事后未检测到,跳出 int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } /** * 提交确认 */ connector.ack(batchId); /** * 处理失败, 回滚数据 */ connector.rollback(batchId); } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); /** * 手动开启事务回滚 */ //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry .EntryType .TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } } 复制代码
代码到这就编写完成了,咱们启动服务看下是什么效果,因为并无操做数据库,因此监听的结果都是空的。 接下来咱们在数据库执行一条
update
语句试试
update jk_orderset order_no = '1111' where id = 40
复制代码
控制台检测到了数据库的修改,并生成binlog 日志文件mysql-bin.000009:3830
那么生成的binlog 文件该怎么用,如何解析成SQl语句呢?
<!-- mysql binlog解析 -->
<dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.13.0</version> </dependency> 复制代码
将刚才的binlog文件下载本地测试一下
public static void main(String[] args) throws IOException {
String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830"; File binlogFile = new File(filePath); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setChecksumType(ChecksumType.CRC32); BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer); try { for (Event event; (event = reader.readEvent()) != null; ) { System.out.println(event.toString()); } } finally { reader.close(); } } 复制代码
查看一下执行结果,发现数据库最近的一次操做是加了一个idx_index索引
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order` DROP INDEX `idx_index` , ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}} Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null} 复制代码
至此咱们就已经实现了监控MySQL,
canal
应用场景大体有如下:
重点分析一下canal是如何解决MySQL主从同步延迟的问题
生产环境下MySQL
的主从同步模式(maser-slave
)很常见,但对于跨机房部署的集群,会出现同步延时的状况。举个栗子:
一条订单状态是未付款,master
节点修改为已付款,可因为某些缘由出现延迟数据未能及时同步到slave
,这时用户当即查看订单状态(查询走slave
)显示仍是未付款,哪一个用户看到这种状况不得慌啊。
为何会出现主从同步延迟呢?
当主库master
的TPS
并发较高时,master
节点并发产生的修改操做,而slave
节点的sql线程
是单线程处理同步数据,延时天然而言就产生了。
咱们用canal
实时监听maser
节点的数据更新(能够针对某个表监听),canal
捕捉到更改的SQL后当即在slave
节点执行,以此来解决主从延迟问题。
不过形成主从同步的缘由不止这些,因为主从服务器存在跨机器而且跨机房,除了网络带宽缘由以外,网络的稳定性以及机器之间的同步,都是主从同步应该考虑的主要缘由。
本文只是简单实现canal监听数据库的功能,旨在给你们提供一种解决问题的思路,仍是反复絮叨的那句话,解决问题的技术方法不少,具体如何应用还需结合具体业务。
整理了几百本各种技术电子书和视频资料 ,
嘘~
,免费
送,公号内回复【666
】自行领取。和小伙伴们建了一个技术交流群
,一块儿探讨技术、分享技术资料,旨在共同窗习进步,感兴趣就入咱们吧!