数据同步通常分为两种方式:全量和增量。增量数据是一类典型的流数据,基于日志的增量同步几乎已是全部数据库的标配,它能够减小常规ETL工做对系统带来的影响,并大大下降数据的延迟。做为Greenplum的流计算引擎,Greenplum Stream Server(gpss)能将不一样源端的增量数据同步到Greenplum中。为更好的支持这一应用场景,即将发布的gpss 1.3.6 对增量同步的功能作了加强。mysql
Greenplum Stream Server(简称gpss),是Greenplum的下一代数据加载解决方案,相比于gpfdist,GPSS会提供流数据支持及API接口,有更好的扩展性,支持更丰富的功能,并开放更细粒度的任务控制接口。在即将发布gpss 1.3.6 中,对增量同步所作的的功能加强包括:sql
本文将以MySQL为例,简要介绍下gpss如何实现向Greenplum的增量同步。数据库
咱们要完成的工做是:json
MySQSL和Maxwell的配置和使用,本文将不作深刻介绍,你们能够自行访问文章连接阅读学习,访问相关文章请点击文章底部的“阅读原文”。segmentfault
测试使用的表在MySQL中定义以下:并发
create table t_update_delete_0 (k1 decimal, k2 text, v1 decimal, v2 decimal, v3 text, c1 decimal, c2 text);
其中 k1 和 k2 列为键,用来惟一标识一条记录, v1, v2, v3 为每次更新的数据。学习
在源端分别对这个表进行了insert,update和delete操做,每一个语句为单独的transaction。 测试
Insert语句为:网站
insert into t_update_delete_0 (k1,k2,v1,v2,v3,c1,c2) values (1,'k_1', 1, 3, 'v_1', 1, 'c1');
Update语句为:spa
update t_update_delete_0 set v1=100,v2=300,v3='v_100' where k1='1' and k2='k_1';
Delete语句为:
delete from t_update_delete_0 where k1='1' and k2='k_1';
Maxwell能够将捕获到binlog解析为json格式并发送到kafka,不一样的操做生成的Kafka消息有细微的区别。为了将这些消息正确的恢复到Greenplum中,咱们先对这三种类型的消息进行简单的分析。
Insert时生成的消息示例以下:
{ "database": "test", "table": "t_update_delete_0", "type": "insert", "ts": 1586956781, "xid": 1398209, "commit": true, "data": { "k1": 41, "k2": "k_41", "v1": 818, "v2": 2454, "v3": "v_818", "c1": 41, "c2": "c_41" } }
database和table表示源表的表名,ts和xid字段用于表示消息的顺序,type和data表示执行的操做及对应的数据。这些是全部消息类型通用的。
Delete生成的消息以下,type为"delete",同时data中包含了完整的内容。
{ "database": "test", "table": "t_update_delete_0", "type": "delete", "ts": 1586956781, "xid": 1398195, "commit": true, "data": { "k1": 44, "k2": "k_44", "v1": 744, "v2": 2232, "v3": "v_744", "c1": 44, "c2": "c_44" } }
Update除了包含新数据外,还包含了更新以前的数据(old),这里咱们只须要新数据就够了。
{ "database": "test", "table": "t_update_delete_0", "type": "update", "ts": 1586956707, "xid": 1281915, "commit": true, "data": { "k1": 99, "k2": "k_99", "v1": 798, "v2": 2394, "v3": "v_798", "c1": 99, "c2": "c_99" }, "old": { "v1": 800, "v2": 2400, "v3": "v_800" } }
根据生成的消息,咱们须要执行以下操做:
Greenplum中的定义包含了排序的字段,用来区分消息更新的前后顺序,定义以下:
create table t_update_delete_0 (k1 decimal, k2 text, v1 decimal, v2 decimal, v3 text, c1 decimal, c2 text, ts decimal, xid decimal, del_mark boolean);
根据数据同步的需求,gpss须要的yaml配置文件以下:
DATABASE: test USER: gpadmin HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kafkahost:9092 TOPIC: test VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: json ERROR_LIMIT: 100 OUTPUT: MODE: MERGE MATCH_COLUMNS: - k1 - k2 UPDATE_COLUMNS: - v1 - v2 - v3 ORDER_COLUMNS: - ts - xid DELETE_CONDITION: del_mark TABLE: t_update_delete_0 MAPPING: k1 : (c1->'data'->>'k1')::decimal k2 : (c1->'data'->>'k2')::text v1 : (c1->'data'->>'v1')::decimal v2 : (c1->'data'->>'v2')::decimal v3 : (c1->'data'->>'v3')::text c1 : (c1->'data'->>'c1')::decimal c2 : (c1->'data'->>'c2')::text ts : (c1->>'ts')::decimal xid: (c1->>'xid')::decimal del_mark: (c1->>'type')::text = 'delete' COMMIT: MINIMAL_INTERVAL: 2000
几个主要的配置含义以下:
归纳下来,gpss执行的步骤为:
(因为有去重操做,为保证不丢失数据,在UPDATE时,Kafka的消息中须要包含整行的数据,而不单单是更新部分的数据。)
配置文件准备好后,咱们经过gpkafka来执行加载:
gpkafka load mysql.yaml
gpkafka便会从kafka中拉取对应的消息,按照设定的操做将Kafka中的增量数据同步到目标表中。
本文简单介绍了如何用gpss从MySQL进行增量同步,其它数据库(例如Oracle,SQL Server等)也均可以利用相似的方案实现同步。不一样的消息类型须要不一样的处理逻辑,gpss的配置文件中有不少能够进行后处理的部分,更详细的内容能够参考官方文档:https://gpdb.docs.pivotal.io/...。因为源端系统的多样性,gpss的增量复制仍有不少须要完善的地方。在gpss后续版本咱们会持续加强相关功能,例如一对多(一个topic到多个目标表)的同步,自动依据topic offset排序等;欢迎你们使用,反馈,指导。也欢迎你们前往askGP(ask.greenplum.cn)交流。
得到Greenplum更多干货内容,欢迎前往Greenplum中文社区网站