作积极的人,越努力越幸运!java
源码分析 Canal 系列开始了,一个全新的系列,即能探讨 canal 自己的实现原理,也是笔者源码阅读技巧的展现。mysql
提到 Canal,你们应该都能想到这是一个用于解析 MySQL binlog 日志的工具,并将 MySQL 数据库中数据同步到其余存储介质中,例如 Elasticsearch。sql
即 Canal 一个很是经常使用的使用场景:数据异构,一种更高级别的数据读写分离架构设计方法。数据库
随着业务不断的发展,企业发展到必定阶段,发现单体的关系型数据库已没法支撑业务高速发展带来数据不断累积的压力,从而会诞生出一种设计架构:分库分表。分库分表对缓解单库数据库压力确实是一种很是好的解决方案,但又衍生出另一种困境,关联查询不友好,甚至跨库JOIN就更加如此。架构
举例说明以下:例如一个订单系统,一般有两类用户须要去查询订单,一类是顾客,一类是商家,在对数据库进行分库分表时,若是以顾客(buy_id)进行分库的话,同一个商家的订单数据会分布在不一样的库中,若是以商家(shop_id)进行分库的话,同一个用户购买的全部订单数据将会分布在不一样的库中,这样进行关联查询,就必然须要跨库进行join,其成本都会偏高。并且上面的场景只能知足一方的需求,那如何是好呢?jvm
Canal 这个时候就闪亮登场了,在电商设计中,其实商家、顾客会被拆分红两个不一样的服务,咱们能够为两个不一样的服务搭建不一样的数据库集群,咱们能够用户订单库、商家订单库进行分库,以用户订单库为主库,当用户在订单系统下单后,数据进入到用户订单库中,而后能够经过 canal 监听数据库的binlog日志,而后将数据再同步到商家订单库,而用户订单库以用户ID为维度进行分库,商家订单库以商家ID作分库,完美解决问题。ide
在了解到 Canal 的基本使用场景后,咱们经过 canal 官方文档,去探究一下其核心架构设计理念,以此打开进入 Canal 的神秘世界中。工具
首先咱们简单看一下 MySQL 的主从同步原理:
在这里插入图片描述源码分析
从上面的图中能够当作主从复制主要分红三个步骤:单元测试
master将改变记录到二进制日志(binary log ) 中( 这些记录叫作二进制日志事件,binary log events,能够经过show binlog events进行查看)
slave将master的binary log events拷贝到它的中继日志(relay log)
基于 MySQL 这种数据同步机制,那 Canal 的设计目标主要就是实现数据的同步,即数据的复制,从上面的图天然而然的想到了以下的设计:
原理相对比较简单:
canal 模拟 mysql slave 的交互协议,假装本身为 mysql slave,向 mysql master 发送 dump 协议
mysql master 收到 dump 请求,开始推送 binary log 给 slave (canal)
接下来咱们来看一下 Canale 的总体组成部分:
说明:
server表明一个canal运行实例,对应于一个jvm
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store连接器,进行数据过滤,加工,分发的工做)
eventStore (数据存储)
这些组件我暂时不打算深刻去研究,由于在目前这个阶段我本身也不清楚,但这个是我后续须要学习研究的重点。
在 Linux 环境中安装 canal 比较简单,你们能够按照官方手册一步一步操做便可,在这里我就不重复介绍,本节主要的目的是但愿在开发工具中运行 Canal 的 Demo,以便后续在研究源码的过程当中遇到难题时能够进行 Debug。
舒适提示:你们在学习过程当中,能够根据官方文档先安装一遍 canal,对理解 Canal 的核心组件有着很是重要的帮助。
首先先从 canal 源码中寻找官方提供的 Demo,其示例代码在 example 包中,以下图所示:
可是另外稍微遗憾的是 canal 提供提供的示例代码中只包含了 client 端相关的代码,并无包含服务端(server),故咱们将目光放到其单元测试中,以下图所示:
接下来我根据官方的一些提示,结合本身的理解,编写出以下测试代码,在 IDEA 开发工具中实现运行 Canal 相关的 Demo。下面的代码已经过测试,可直接使用。
一、Canal Server Demo
package com.alibaba.otter.canal.server; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; import com.alibaba.otter.canal.instance.manager.model.Canal; import com.alibaba.otter.canal.instance.manager.model.CanalParameter; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; import com.alibaba.otter.canal.server.netty.CanalServerWithNetty; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; public class CanalServerTestMain { protected static final String ZK_CLUSTER_ADDRESS = "127.0.0.1:2181"; protected static final String DESTINATION = "example"; protected static final String DETECTING_SQL = "select 1"; protected static final String MYSQL_ADDRESS = "127.0.0.1"; protected static final String USERNAME = "canal"; protected static final String PASSWORD = "canal"; protected static final String FILTER = ".\\*\\\\\\\\..\\*"; /** 默认 500s 后关闭 */ protected static final long RUN_TIME = 120 * 1000; private final ByteBuffer header = ByteBuffer.allocate(4); private CanalServerWithNetty nettyServer; public static void main(String[] args) { CanalServerTestMain test = new CanalServerTestMain(); try { test.setUp(); System.out.println("start"); } catch (Throwable e) { e.printStackTrace(); } finally { System.out.println("sleep"); try { Thread.sleep(RUN_TIME); } catch (Throwable ee) { } test.tearDown(); System.out.println("end"); } } public void setUp() { CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded(); embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { public CanalInstance generate(String destination) { Canal canal = buildCanal(); return new CanalInstanceWithManager(canal, FILTER); } }); nettyServer = CanalServerWithNetty.instance(); nettyServer.setEmbeddedServer(embeddedServer); nettyServer.setPort(11111); nettyServer.start(); // 启动 instance embeddedServer.start("example"); } public void tearDown() { nettyServer.stop(); } private Canal buildCanal() { Canal canal = new Canal(); canal.setId(1L); canal.setName(DESTINATION); canal.setDesc("test"); CanalParameter parameter = new CanalParameter(); //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS)); parameter.setMetaMode(CanalParameter.MetaMode.MEMORY); parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT); parameter.setIndexMode(CanalParameter.IndexMode.MEMORY); parameter.setStorageMode(CanalParameter.StorageMode.MEMORY); parameter.setMemoryStorageBufferSize(32 * 1024); parameter.setSourcingType(CanalParameter.SourcingType.MYSQL); parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306), new InetSocketAddress(MYSQL_ADDRESS, 3306))); parameter.setDbUsername(USERNAME); parameter.setDbPassword(PASSWORD); parameter.setSlaveId(1234L); parameter.setDefaultConnectionTimeoutInSeconds(30); parameter.setConnectionCharset("UTF-8"); parameter.setConnectionCharsetNumber((byte) 33); parameter.setReceiveBufferSize(8 * 1024); parameter.setSendBufferSize(8 * 1024); parameter.setDetectingEnable(false); parameter.setDetectingIntervalInSeconds(10); parameter.setDetectingRetryTimes(3); parameter.setDetectingSQL(DETECTING_SQL); canal.setCanalParameter(parameter); return canal; } }
二、Canal Client Demo
package com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; public class SimpleCanalClientExample { public static void main(String[] args) { // 建立连接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 3000; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
运行 client 的效果以下图所示:
在数据库中变动一条数据,以便产生新的binlog日志,其输出结果以下:
能在 IDEA 中搭建并运行 Demo,是咱们踏入 canal 的第一步,后续将根据官方文档中的内容为提纲,尝试逐步解开 canal 的实现原理,以便更好的指导实践。
原创不易,若是对你有所帮助请你为本文点个【在看】吧,这将是我写做更多优质文章的最强动力。
欢迎加入个人知识星球,一块儿交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按以下二维码加入。
丁威素质三连是对我最大的鼓励