上一篇 介绍了移山(数据迁移平台)实时数据同步的总体架构;
本文主要介绍移山(数据迁移平台)实时数据同步是如何保证消息的顺序性。node能够访问 这里 查看更多关于大数据平台建设的原创文章。mysql
消息生产端将消息发送给同一个MQ服务器的同一个分区,而且按顺序发送;算法
消费消费端按照消息发送的顺序进行消费。sql
在某些业务功能场景下须要保证消息的发送和接收顺序是一致的,不然会影响数据的使用。数据库
移山的实时数据同步使用
canal
组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想了解移山实时同步服务架构设计的能够点 这里);
kafka 消费端再根据具体的数据使用场景去处理数据(存入 HBase、MySQL 或直接作实时分析);
因为binlog 自己是有序的,所以写入到mq以后也须要保障顺序。apache
假如如今移山建立了一个实时同步任务,而后订阅了一个业务数据库的订单表;bootstrap
上游业务,向订单表里插入了一个订单,而后对该订单又作了一个更新操做,则 binlog 里会自动写入插入操做和更新操做的数据,这些数据会被 canal server 投递至 kafka broker 里面;服务器
若是 kafka 消费端先消费到了更新日志,后消费到插入日志,则在往目标表里作操做时就会由于数据缺失致使发生异常。微信
实时同步服务消息处理总体流程以下:网络
咱们主要经过如下两个方面去保障保证消息的顺序性。
kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,因此能够保证FIFO的顺序;
所以生产者按照必定的顺序发送消息,broker 就会按照这个顺序把消息写入 partition,消费者也会按照相同的顺序去读取消息;
kafka 的每个 partition 不会同时被两个消费者实例消费,由此能够保证消息消费的顺序性。
要保证同一个订单的屡次修改到达 kafka 里的顺序不能乱,能够在Producer 往 kafka 插入数据时,控制同一个key (能够采用订单主键key-hash算法来实现)发送到同一 partition,这样就能保证同一笔订单都会落到同一个 partition 内。
canal 目前支持的mq有kafka/rocketmq
,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。咱们只需在配置 instance 的时候开启以下配置便可:
1> canal.properties
# leader节点会等待全部同步中的副本确认以后再确认这条记录是否发送完成
canal.mq.acks = all
备注:
这样只要至少有一个同步副本存在,记录就不会丢失。
2> instance.properties
1 # 散列模式的分区数 2 canal.mq.partitionsNum=2 3 # 散列规则定义 库名.表名: 惟一主键,多个表之间用逗号分隔 4 canal.mq.partitionHash=test.lyf_canal_test:id
备注:
同一条数据的增删改操做 产生的 binlog 数据都会写到同一个分区内;
查看指定topic的指定分区的消息,可使用以下命令:
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
将同一个订单数据经过指定key的方式发送到同一个 partition 能够解决大部分状况下的数据乱序问题。
对于一个有着前后顺序的消息A、B,正常状况下应该是A先发送完成后再发送B。可是在异常状况下:
A发送失败了,B发送成功,而A因为重试机制在B发送完成以后重试发送成功了;
这时对于自己顺序为AB的消息顺序变成了BA。
移山的实时同步服务会在将订阅到的数据存入HBase以前再加一层乱序处理 。
使用 mysqlbinlog
查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
执行时间和偏移量:
备注:
每条数据都会有执行时间和偏移量这两个重要信息,下边的校验逻辑核心正是借助了这两个值;
执行的sql 语句在 binlog 中是以base64编码格式存储的,若是想查看sql 语句,须要加上:--base64-output=decode-rows -v
参数来解码;
偏移量:
Position 就表明 binlog 写到这个偏移量的地方,也就是写了这么多字节,即当前 binlog 文件的大小;
也就是说后写入数据的 Position 确定比先写入数据的 Position 大,所以能够根据 Position 大小来判断消息的顺序。
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test'); Query OK, 1 row affected (0.00 sec) MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13; Query OK, 1 row affected (0.00 sec)
把插入,第一次更新,第二次更新这三次操做产生的 binlog 被 canal server 推送至 kafka 中的消息分别称为:消息A,消息B,消息C。
消息A:
消息B:
消息C:
假设因为不可知的网络缘由:
kafka broker收到的三条消息分别为:消息A,消息C,消息B;
则 kafka 消费端消费到的这三条消息前后顺序就是:消息A,消息C,消息B
这样就形成了消息的乱序,所以订阅到的数据在存入目标表前必须得加乱序校验处理。
咱们利用HBase的特性,将数据主键作为目标表的 rowkey。当 kafka 消费端消费到数据时,乱序处理主要流程(摘自禧云数芯大数据平台技术白皮书)以下:
demo的三条消息处理流程以下:
1> 判断消息A 的主键id作为rowkey在hbase的目标表中不存在,则将消息A的数据直接插入HBase:
2> 消息C 的主键id作为rowkey,已经在目标表中存在,则这时须要拿消息C 的执行时间和表中存储的执行时间去判断:
若是消息C 中的执行时间小于表中存储的执行时间,则证实消息C 是重复消息或乱序的消息,直接丢弃;
消息C 中的执行时间大于表中存储的执行时间,则直接更新表数据(本demo即符合该种场景):
消息C 中的执行时间等于表中存储的执行时间,则这时须要拿消息C 的偏移量和表中存储的偏移量去判断:
消息C 中的偏移量小于表中存储的偏移量,则证实消息C 是重复消息,直接丢弃;
消息C 中的偏移量大于等于表中存储的偏移量,则直接更新表数据。
3> 消息B 的主键id作为rowkey,已经在目标表中存在,则这时须要拿消息B 的执行时间和表中存储的执行时间去判断:
因为消息B中的执行时间小于表中存储的执行时间(即消息C 的执行时间),所以消息B 直接丢弃。
kafka 消费端将消费到的消息进行格式化处理和组装,并借助 HBase-client API
来完成对 HBase 表的操做。
1> 使用Put
组装单行数据
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 为从binlog订阅到的数据,经过循环,为目标HBase表 * 添加rowkey、列簇、列数据。 * 做用:用来对单个行执行加入操做。 */ Put put = new Put(Bytes.toBytes(hbaseData.get("id"))); // hbaseData 为从binlog订阅到的数据,经过循环,为目标HBase表添加列簇和列 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 使用 checkAndMutate
,更新HBase
表的数据
只有服务端对应rowkey的列数据与预期的值符合指望条件(大于、小于、等于)时,才会将put操做提交至服务端。
// 若是 update_info(列族) execute_time(列) 不存在值就插入数据,若是存在则返回false boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put); // 若是存在,则去比较执行时间 if (!res1) { // 若是本次传递的执行时间大于HBase中的执行时间,则插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put); // 执行时间相等时,则去比较偏移量,本次传递的值大于HBase中的值则插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); } }
目前移山的实时同步服务,kafka 消费端是使用一个线程去消费数据;
若是未来有版本升级需求,将消费端改成多个线程去消费数据时,要考虑到多线程消费时有序的消息会被打乱这种状况的解决办法。
欢迎你们关注个人微信公众号阅读更多文章: