上一篇介绍了移山(数据迁移平台)实时数据同步的总体架构;
本文主要介绍移山(数据迁移平台)实时数据同步是如何保证消息的顺序性。
能够访问 这里 查看更多关于 大数据平台建设的原创文章。
- 消息生产端将消息发送给同一个MQ服务器的同一个分区,而且按顺序发送;
- 消费消费端按照消息发送的顺序进行消费。
在某些业务功能场景下须要保证消息的发送和接收顺序是一致的,不然会影响数据的使用。mysql
移山的实时数据同步使用canal
组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想了解移山实时同步服务架构设计的能够点 这里);
kafka 消费端再根据具体的数据使用场景去处理数据(存入 HBase、MySQL 或直接作实时分析);
因为binlog 自己是有序的,所以写入到mq以后也须要保障顺序。
实时同步服务消息处理总体流程以下:算法
咱们主要经过如下两个方面去保障保证消息的顺序性。sql
要保证同一个订单的屡次修改到达 kafka 里的顺序不能乱,能够在Producer 往 kafka 插入数据时,控制同一个key (能够采用订单主键key-hash算法来实现)发送到同一 partition,这样就能保证同一笔订单都会落到同一个 partition 内。数据库
canal 目前支持的mq有kafka/rocketmq
,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。咱们只需在配置 instance 的时候开启以下配置便可:apache
1> canal.propertiesbootstrap
# leader节点会等待全部同步中的副本确认以后再确认这条记录是否发送完成 canal.mq.acks = all
备注:服务器
2> instance.properties微信
# 散列模式的分区数 canal.mq.partitionsNum=2 # 散列规则定义 库名.表名: 惟一主键,多个表之间用逗号分隔 canal.mq.partitionHash=test.lyf_canal_test:id
备注:网络
查看指定topic的指定分区的消息,可使用以下命令:多线程
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
将同一个订单数据经过指定key的方式发送到同一个 partition 能够解决大部分状况下的数据乱序问题。
对于一个有着前后顺序的消息A、B,正常状况下应该是A先发送完成后再发送B。可是在异常状况下:
移山的实时同步服务会在将订阅到的数据存入HBase以前再加一层乱序处理 。
使用 mysqlbinlog
查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
执行时间和偏移量:
备注:
--base64-output=decode-rows -v
参数来解码;偏移量:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test'); Query OK, 1 row affected (0.00 sec) MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13; Query OK, 1 row affected (0.00 sec)
把插入,第一次更新,第二次更新这三次操做产生的 binlog 被 canal server 推送至 kafka 中的消息分别称为:消息A,消息B,消息C。
假设因为不可知的网络缘由:
咱们利用HBase的特性,将数据主键作为目标表的 rowkey。当 kafka 消费端消费到数据时,乱序处理主要流程(摘自禧云数芯大数据平台技术白皮书)以下:
demo的三条消息处理流程以下:
1> 判断消息A 的主键id作为rowkey在hbase的目标表中不存在,则将消息A的数据直接插入HBase:
2> 消息C 的主键id作为rowkey,已经在目标表中存在,则这时须要拿消息C 的执行时间和表中存储的执行时间去判断:
消息C 中的执行时间等于表中存储的执行时间,则这时须要拿消息C 的偏移量和表中存储的偏移量去判断:
3> 消息B 的主键id作为rowkey,已经在目标表中存在,则这时须要拿消息B 的执行时间和表中存储的执行时间去判断:
kafka 消费端将消费到的消息进行格式化处理和组装,并借助 HBase-client API
来完成对 HBase 表的操做。
1> 使用Put
组装单行数据
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 为从binlog订阅到的数据,经过循环,为目标HBase表 * 添加rowkey、列簇、列数据。 * 做用:用来对单个行执行加入操做。 */ Put put = new Put(Bytes.toBytes(hbaseData.get("id"))); // hbaseData 为从binlog订阅到的数据,经过循环,为目标HBase表添加列簇和列 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 使用 checkAndMutate
,更新HBase
表的数据
只有服务端对应rowkey的列数据与预期的值符合指望条件(大于、小于、等于)时,才会将put操做提交至服务端。
// 若是 update_info(列族) execute_time(列) 不存在值就插入数据,若是存在则返回false boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put); // 若是存在,则去比较执行时间 if (!res1) { // 若是本次传递的执行时间大于HBase中的执行时间,则插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put); // 执行时间相等时,则去比较偏移量,本次传递的值大于HBase中的值则插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); } }
欢迎你们关注个人微信公众号阅读更多文章: