老刘是一名即将找工做的研二学生,写博客一方面是总结大数据开发的知识点,一方面是但愿可以帮助伙伴让自学今后不求人。因为老刘是自学大数据开发,博客中确定会存在一些不足,还但愿你们可以批评指正,让咱们一块儿进步!java
大数据领域数据源有业务库的数据,也有移动端埋点数据、服务器端产生的日志数据。咱们在对数据进行采集时根据下游对数据的要求不一样,咱们可使用不一样的采集工具来进行。今天老刘给你们讲的是同步mysql增量数据的工具Canal,本篇文章的大纲以下:mysql
老刘争取用这一篇文章让你们直接上手 Canal 这个工具,再也不花别的时间来学习。web
因为 Canal 是用来同步 mysql 中增量数据的,因此老刘先讲 mysql 的主备复制原理,以后再讲 Canal 的核心知识点。sql
根据这张图,老刘把 mysql 的主备复制原理分解为以下流程:数据库
那么 mysql 主备复制实现原理就讲完了,你们看完这个流程,能不能猜到 Canal 的工做原理?服务器
Canal 的工做原理就是它模拟 MySQL slave 的交互协议,把本身假装为 MySQL slave,向 MySQL master 发动 dump 协议。MySQL master 收到 dump 请求后,就会开始推送 binlog 给 Canal。最后 Canal 就会解析 binlog 对象。架构
Canal,美[kəˈnæl],是这样读的,意思是水道/管道/渠道,主要用途就是用来同步 MySQL 中的增量数据(能够理解为实时数据),是阿里巴巴旗下的一款纯 Java 开发的开源项目。并发
server 表明一个 canal 运行实例,对应于一个 JVM。 instance 对应于一个数据队列,1 个 canal server 对应 1..n 个 instance instance 下的子模块:框架
到如今 Canal 的基本概念就讲完了,那接下来就要讲 Canal 如何同步 mysql 的增量数据。编辑器
咱们用 Canal 同步 mysql 增量数据的前提是 mysql 的 binlog 是开启的,阿里云的 mysql 数据库是默认开启 binlog 的,可是若是咱们是本身安装的 mysql 须要手动开启 binlog 日志功能。
先找到 mysql 的配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
这里有一个知识点是关于 binlog 的格式,老刘给你们讲讲。
binlog 的格式有三种:STATEMENT、ROW、MIXED
ROW 模式(通常就用它)
日志会记录每一行数据被修改的形式,不会记录执行 SQL 语句的上下文相关信息,只记录要修改的数据,哪条数据被修改了,修改为了什么样子,只有 value,不会有 SQL 多表关联的状况。
优势:它仅仅只须要记录哪条数据被修改了,修改为什么样子了,因此它的日志内容会很是清楚地记录下每一行数据修改的细节,很是容易理解。
缺点:ROW 模式下,特别是数据添加的状况下,全部执行的语句都会记录到日志中,都将以每行记录的修改来记录,这样会产生大量的日志内容。
STATEMENT 模式
每条会修改数据的 SQL 语句都会被记录下来。
缺点:因为它是记录的执行语句,因此,为了让这些语句在 slave 端也能正确执行,那他还必须记录每条语句在执行过程当中的一些相关信息,也就是上下文信息,以保证全部语句在 slave 端被执行的时候可以获得和在 master 端执行时候相同的结果。
但目前例如 step()函数在有些版本中就不能被正确复制,在存储过程当中使用了 last-insert-id()函数,可能会使 slave 和 master 上获得不一致的 id,就是会出现数据不一致的状况,ROW 模式下就没有。
MIXED 模式
以上两种模式都使用。
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,须要修改为本身的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,须要修改为本身的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中,canal.instance.connectionCharset 表明数据库的编码方式对应到 java 中的编码类型,好比 UTF-8,GBK,ISO-8859-1。
sh bin/startup.sh
关闭使用 bin/stop.sh
观察日志
通常使用 cat 查看 canal/canal.log、example/example.log
启动客户端
在 IDEA 中业务代码,mysql 中若是有增量数据就拉取过来,在 IDEA 控制台打印出来
在 pom.xml 文件中添加:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
添加客户端代码:
public class Demo {
public static void main(String[] args) {
//建立链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//订阅
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List<CanalEntry.Entry> entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
在大数据领域不少框架都会有 HA 机制,Canal 的 HA 分为两部分,Canal server 和 Canal client 分别有对应的 HA 实现:
整个 HA 机制的控制主要是依赖了 ZooKeeper 的几个特性,ZooKeeper 这里就不讲了。
Canal Server:
Canal HA 的配置,并把数据实时同步到 kafka 中。
canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode = kafka
canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
canal.instance.mysql.slaveId = 790 /两台canal server的slaveID惟一
canal.mq.topic = canal_log //指定将数据发送到kafka的topic
讲完了 Canal 工具,如今给你们简单总结下目前常见的数据采集工具,不会涉及架构知识,只是简单总结,让你们有个印象。
常见的数据采集工具备:DataX、Flume、Canal、Sqoop、LogStash 等。
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,异构数据源离线同步指的是将源端数据同步到目的端,可是端与端的数据源类型种类繁多,在没有 DataX 以前,端与端的链路将组成一个复杂的网状结构,很是零散没法把同步核心逻辑抽象出来。
为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 做为中间传输载体负责链接各类数据源。
因此,当须要接入一个新的数据源的时候,只须要将此数据源对接到 DataX,就能够跟已有的数据源作到无缝数据同步。
DataX自己做为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,归入到整个同步框架中。
DataX的核心架构以下图:
核心模块介绍:
Flume主要应用的场景是同步日志数据,主要包含三个组件:Source、Channel、Sink。
Flume最大的优势就是官网提供了丰富的Source、Channel、Sink,根据不一样的业务需求,咱们能够在官网查找相关配置。另外,Flume还提供了自定义这些组件的接口。
Logstash就是一根具有实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可让你根据本身的需求在中间加上过滤网,Logstash提供了不少功能强大的过滤网来知足各类应用场景。
Logstash是由JRuby编写,使用基于消息的简单架构,在JVM上运行。在管道内的数据流称之为event,它分为inputs阶段、filters阶段、outputs阶段。
Sqoop是Hadoop和关系型数据库之间传送数据的一种工具,它是用来从关系型数据库如MySQL到Hadoop的HDFS从Hadoop文件系统导出数据到关系型数据库。Sqoop底层用的仍是MapReducer,用的时候必定要注意数据倾斜。
老刘本篇文章主要讲述了Canal工具的核心知识点及其数据采集工具的对比,其中数据采集工具只是大体讲了讲概念和应用,目的也是让你们有个印象。老刘敢作保证看完这篇文章基本等于入门,剩下的就是练习了。
好啦,同步mysql增量数据的工具Canal的内容就讲完了,尽管当前水平可能不及各位大佬,但老刘会努力变得更加优秀,让各位小伙伴自学今后不求人!
若是有相关问题,联系公众号:努力的老刘。文章都看到这了,点赞关注支持一波!