canal实现mysql与redis数据同步

一、canal

1.简介

     早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

2.工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

二、部署

1.mysql

a) docker安装mysql

docker run -p 3306:3306 --name mysql --privileged --restart=always  -v /home/mysql/conf:/etc/mysql/conf.d -v /home/mysql/logs:/var/log  -v /home/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --lower_case_table_names=1

b) 开启binlog记录功能,修改/home/mysql/conf/mysqld.cnf,在[mysqld]下加入

  server-id=1                                #主从复制时主机的服务id
  log-bin=mysql-bin                     #开启nbinlog日志记录
  binlog_format = ROW              #日志记录格式

c) 验证是否开启binlog

  docker restart mysql                #重启mymsql

  docekr exec -it mysql bash     #进入mysql容器

  mysql  -uroot -p123456          #登录mysql

  show variables like 'log_bin';                 #查询binlong是否开启

  show variables like 'binlog_format';       #查询binlong日志记录格式

  

d) 创建数据同步账号

  use mysql;

  grant all privileges on *.* to [email protected]'%' identified by 'canal';

  flush privileges;

2.redis

a) docker安装redis

docker run -p 6379:6379 --name redis --privileged --restart=always  -v /home/redis/conf/redis.conf:/etc/redis/redis.conf -v /home/redis/data:/data -d redis redis-server  --appendonly yes

3.canal

a) docker安装canal

docker run -p 11111:11111 --name canal -v /home/canal/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties  -v /home/canal/logs:/home/admin/canal-server/logs -d canal/canal-server

b) 修改/home/canal/conf/example/instance.properties配置

  canal.instance.mysql.slaveId=1                              #与server-id一致
  canal.instance.master.address=172.17.0.1:3306      #mysql服务ip和端口
  canal.instance.dbUsername=canal                        #mysql登录账号
  canal.instance.dbPassword=canal                         #mysql登录账号密码

三、开发

a) 引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>

b) 开发测试代码

public static void main(String[] args) {
    //创建单链接的客户端链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
            "example",
            "canal",
            "canal");
    int batchSize = 1000;
    //建立链接
    connector.connect();
    //客户端订阅,重复订阅时会更新对应的filter信息
    connector.subscribe("shed-reform\\..*");
    //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
    connector.rollback();
    try {
        while(true) {
            /**  
             * 获取数据,自动进行确认  * 该方法返回的条件:  
             *  a. 拿够batchSize条记录或者超过timeout时间  
             *  b. 如果timeout=0,则阻塞至拿到batchSize记录才返回  
             */
            Message message = connector.get(batchSize, 5000L, TimeUnit.MILLISECONDS);
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                continue;
            } else {
                dataHandle(message.getEntries());
            }
        }
    } catch (InvalidProtocolBufferException e) {
        e.printStackTrace();
    } finally {
        connector.unsubscribe();
        connector.disconnect();
    }
}
public static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.ROWDATA) {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChange.getEventType();
            if (eventType == EventType.INSERT) {
                System.out.println("insert");
            } else if (eventType == EventType.UPDATE) {
                System.out.println("update");
                update(entry);
            } else if (eventType == EventType.DELETE) {
                System.out.println("delete");
            } else if (eventType == EventType.QUERY) {
                System.out.println("query");
            }
        }
    }
}
public static void update(Entry entry) {
    try {
        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
        List<RowData> rowDatasList = rowChange.getRowDatasList();
        for (RowData rowData : rowDatasList) {
            List<Column> newColumnList = rowData.getAfterColumnsList();
            JSONObject json = new JSONObject();
            for (Column column : newColumnList) {
                json.put(column.getName(), column.getValue());
            }
            System.out.println(json.toJSONString());
        }
    } catch (InvalidProtocolBufferException e) {
        e.printStackTrace();
    }
}