咱们在作实时数仓时数据每每都是保存到数据库中例如MySQL,当有一条数据新增或修改须要立刻将数据同步到kafka中或其余的数据库中,这时候咱们须要借助阿里开源出来的Canal
,来实现咱们功能。java
咱们看下官网的描述:node
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于
MySQL
数据库增量日志解析,提供增量数据订阅和消费
mysql
根据官网的描述咱们大约能够理解为Canal
主要是基于MySQL
作增量数据同步的例如:将数据实时同步到kafka、HBase、ES等,能够理解一个数据同步工具
。git
注意: 当前Canal支持的MySQL版本有 5.1.x
, 5.5.x
, 5.6.x
, 5.7.x
, 8.0.x
程序员
MySQL slave 工做原理github
canal 工做原理面试
我以前发过如何部署MySQL我在这就不在写一遍了,若是你的机器中没有安装MySQL那能够去看这篇—> https://blog.csdn.net/qq_43791724/article/details/108196454spring
开启MySQL的 binary log 日志sql
当咱们在安装成功MySQL成功后会有一个my.cnf
文件须要添加一下内容数据库
[mysqld]
log-bin=/var/lib/mysql/mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 重复
注意: 当咱们在开启了binary log
日志模式后会在咱们log-bin
目录下建立 mysql-bin.*
的文件。当咱们数据库中的数据发生改变时就会mysql-bin.*
文件中生成记录。
去官下载须要的版本 https://github.com/alibaba/canal/releases 我在这里使用的版本为:1.0.24
mkdir canal
tar -zxvf canal.deployer-1.0.24.tar.gz -C ../servers/canal/
canal.properties
common 属性前四个配置项:
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=
canal.id是canal的编号,在集群环境下,不一样canal的id不一样,注意它和mysql的server_id不一样
。ip这里不指定,默认为本机,好比上面是192.168.100.201,端口号是11111。zk用于canal cluster。5. 再看下canal.properties
下destinations相关的配置:
#################################################
######### destinations #############
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
这里的canal.destinations = example
能够设置多个,好比example1,example2,则须要建立对应的两个文件夹,而且每一个文件夹下都有一个instance.properties
文件。全局的canal实例管理用spring,这里的file-instance.xml
最终会实例化全部的destinations instances:\
file-instance.xml
最终会实例化全部的destinations instances:<!-- properties -->
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
<property name="ignoreResourceNotFound" value="true" />
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 容许system覆盖 -->
<property name="locationNames">
<list>
<value>classpath:canal.properties</value> <value>classpath:${canal.instance.destination:}/instance.properties</value>
</list>
</property>
</bean>
<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="propertyEditorRegistrars">
<list>
<ref bean="socketAddressEditor" />
</list>
</property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser">
<ref local="eventParser" />
</property>
<property name="eventSink">
<ref local="eventSink" />
</property>
<property name="eventStore">
<ref local="eventStore" />
</property>
<property name="metaManager">
<ref local="metaManager" />
</property>
<property name="alarmHandler">
<ref local="alarmHandler" />
</property>
</bean>
好比canal.instance.destination
等于example,就会加载example/instance.properties
配置文件 7. 修改instance 配置文件
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id同样
canal.instance.mysql.slaveId = 1234
# 按需修改为本身的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
#################################################
sh bin/startup.sh
sh bin/stop.sh
[root@node01 ~]# jps
2133 CanalLauncher
4184 Jps
到这里说明咱们的服务就配好了,这时候咱们可使用java代码建立一个客户端来进行测试
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
package com.canal.Test;
/**
* @author 大数据老哥
* @version V1.0
* @Package com.canal.Test
* @File :CanalTest.java
* @date 2021/1/11 21:54
*/
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 测试canal配置是否成功
*/
public class CanalTest {
public static void main(String[] args) {
//1.建立链接
CanalConnector connect = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.100.201", 11111),
"example", "", ""); //指定一次性读取的条数
int bachChSize = 1000;
// 设置转态
boolean running = true;
while (running) {
//2.创建链接
connect.connect();
//回滚上次请求的信息放置防止数据丢失
connect.rollback();
// 订阅匹配日志
connect.subscribe();
while (running) {
Message message = connect.getWithoutAck(bachChSize);
// 获取batchId
long batchId = message.getId();
// 获取binlog数据的条数
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
printSummary(message);
}
// 确认指定的batchId已经消费成功
connect.ack(batchId);
}
}
}
private static void printSummary(Message message) {
// 遍历整个batch中的每一个binlog实体
for (CanalEntry.Entry entry : message.getEntries()) {
// 事务开始
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 获取binlog文件名
String logfileName = entry.getHeader().getLogfileName();
// 获取logfile的偏移量
long logfileOffset = entry.getHeader().getLogfileOffset();
// 获取sql语句执行时间戳
long executeTime = entry.getHeader().getExecuteTime();
// 获取数据库名
String schemaName = entry.getHeader().getSchemaName();
// 获取表名
String tableName = entry.getHeader().getTableName();
// 获取事件类型 insert/update/delete
String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();
System.out.println("logfileName" + ":" + logfileName);
System.out.println("logfileOffset" + ":" + logfileOffset);
System.out.println("executeTime" + ":" + executeTime);
System.out.println("schemaName" + ":" + schemaName);
System.out.println("tableName" + ":" + tableName);
System.out.println("eventTypeName" + ":" + eventTypeName);
CanalEntry.RowChange rowChange = null;
try {
// 获取存储数据,并将二进制字节数据解析为RowChange实体
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
// 迭代每一条变动数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 判断是否为删除事件
if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) {
System.out.println("---delete---");
printColumnList(rowData.getBeforeColumnsList());
System.out.println("---");
}
// 判断是否为更新事件
else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) {
System.out.println("---update---");
printColumnList(rowData.getBeforeColumnsList());
System.out.println("---");
printColumnList(rowData.getAfterColumnsList());
}
// 判断是否为插入事件
else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) {
System.out.println("---insert---");
printColumnList(rowData.getAfterColumnsList());
System.out.println("---");
}
}
}
}
// 打印全部列名和列值
private static void printColumnList(List<CanalEntry.Column> columnList) {
for (CanalEntry.Column column : columnList) {
System.out.println(column.getName() + "\t" + column.getValue());
}
}
}
在数据库中随便修改一条数据看看能不能使用Canal客户端能不能消费到
今天给你们分享了Canle它的主要的功能作增量数据同步,后面会使用Canle进行作实时数仓。我在这里为你们提供大数据的资源
须要的朋友能够去下面GitHub去下载,信本身,努力和汗水总会能获得回报的。我是大数据老哥,咱们下期见~~~
资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去 GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下载 https://gitee.com/li_hey_hey/dashboard/projects