美团DB数据同步到数据仓库的架构与实践

背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,咱们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来讲,从MySQL等关系型数据库的业务数据进行采集,而后导入到Hive中,是进行数据仓库生产的重要环节。正则表达式

如何准确、高效地把MySQL数据同步到Hive中?通常经常使用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,而后存到本地文件做为中间存储,最后把文件Load到Hive表中。这种方案的优势是实现简单,可是随着业务的发展,缺点也逐渐暴露出来:数据库

  • 性能瓶颈:随着业务规模的增加,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间愈来愈长,没法知足下游数仓生产的时间要求。
  • 直接从MySQL中Select大量数据,对MySQL的影响很是大,容易形成慢查询,影响业务线上的正常服务。
  • 因为Hive自己的语法不支持更新、删除等SQL原语,对于MySQL中发生Update/Delete的数据没法很好地进行支持。

为了完全解决这些问题,咱们逐步转向CDC (Change Data Capture) + Merge的技术方案,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志,记录了MySQL中发生的全部数据变动,MySQL集群自身的主从同步就是基于Binlog作的。服务器

本文主要从Binlog实时采集和离线处理Binlog还原业务数据两个方面,来介绍如何实现DB数据准确、高效地进入数仓。架构

总体架构

图片0

总体的架构如上图所示。在Binlog实时采集方面,咱们采用了阿里巴巴的开源项目Canal,负责从MySQL实时拉取Binlog并完成适当解析。Binlog采集后会暂存到Kafka上供下游消费。总体实时采集部分如图中红色箭头所示。负载均衡

离线处理Binlog的部分,如图中黑色箭头所示,经过下面的步骤在Hive上还原一张MySQL表:框架

  1. 采用Linkedin的开源项目Camus,负责每小时把Kafka上的Binlog数据拉取到Hive上。
  2. 对每张ODS表,首先须要一次性制做快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式。
  3. 对每张ODS表,天天基于存量数据和当天增量产生的Binlog作Merge,从而还原出业务数据。

咱们回过头来看看,背景中介绍的批量取数并Load方案遇到的各类问题,为何用这种方案能解决上面的问题呢?性能

  • 首先,Binlog是流式产生的,经过对Binlog的实时采集,把部分数据处理需求由天天一次的批处理分摊到实时流上。不管从性能上仍是对MySQL的访问压力上,都会有明显地改善。
  • 第二,Binlog自己记录了数据变动的类型(Insert/Update/Delete),经过一些语义方面的处理,彻底可以作到精准的数据还原。

Binlog实时采集

对Binlog的实时采集包含两个主要模块:一是CanalManager,主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接;二是真正执行采集任务的Canal和CanalClient。优化

图片1

当用户提交某个DB的Binlog采集请求时,CanalManager首先会调用DBA平台的相关接口,获取这一DB所在MySQL实例的相关信息,目的是从中选出最适合Binlog采集的机器。而后把采集实例(Canal Instance)分发到合适的Canal服务器上,即CanalServer上。在选择具体的CanalServer时,CanalManager会考虑负载均衡、跨机房传输等因素,优先选择负载较低且同地域传输的机器。spa

CanalServer收到采集请求后,会在ZooKeeper上对收集信息进行注册。注册的内容包括:设计

  • 以Instance名称命名的永久节点。
  • 在该永久节点下注册以自身ip:port命名的临时节点。

这样作的目的有两个:

  • 高可用:CanalManager对Instance进行分发时,会选择两台CanalServer,一台是Running节点,另外一台做为Standby节点。Standby节点会对该Instance进行监听,当Running节点出现故障后,临时节点消失,而后Standby节点进行抢占。这样就达到了容灾的目的。
  • 与CanalClient交互:CanalClient检测到本身负责的Instance所在的Running CanalServer后,便会进行链接,从而接收到CanalServer发来的Binlog数据。

对Binlog的订阅以MySQL的DB为粒度,一个DB的Binlog对应了一个Kafka Topic。底层实现时,一个MySQL实例下全部订阅的DB,都由同一个Canal Instance进行处理。这是由于Binlog的产生是以MySQL实例为粒度的。CanalServer会抛弃掉未订阅的Binlog数据,而后CanalClient将接收到的Binlog按DB粒度分发到Kafka上。

离线还原MySQL数据

完成Binlog采集后,下一步就是利用Binlog来还原业务数据。首先要解决的第一个问题是把Binlog从Kafka同步到Hive上。

图片2

Kafka2Hive

整个Kafka2Hive任务的管理,在美团数据平台的ETL框架下进行,包括任务原语的表达和调度机制等,都同其余ETL相似。而底层采用LinkedIn的开源项目Camus,并进行了有针对性的二次开发,来完成真正的Kafka2Hive数据传输工做。

对Camus的二次开发

Kafka上存储的Binlog未带Schema,而Hive表必须有Schema,而且其分区、字段等的设计,都要便于下游的高效消费。对Camus作的第一个改造,即是将Kafka上的Binlog解析成符合目标Schema的格式。

对Camus作的第二个改造,由美团的ETL框架所决定。在咱们的任务调度系统中,目前只对同调度队列的任务作上下游依赖关系的解析,跨调度队列是不能创建依赖关系的。而在MySQL2Hive的整个流程中,Kafka2Hive的任务须要每小时执行一次(小时队列),Merge任务天天执行一次(天队列)。而Merge任务的启动必需要严格依赖小时Kafka2Hive任务的完成。

为了解决这一问题,咱们引入了Checkdone任务。Checkdone任务是天任务,主要负责检测前一天的Kafka2Hive是否成功完成。若是成功完成了,则Checkdone任务执行成功,这样下游的Merge任务就能够正确启动了。

Checkdone的检测逻辑

Checkdone是怎样检测的呢?每一个Kafka2Hive任务成功完成数据传输后,由Camus负责在相应的HDFS目录下记录该任务的启动时间。Checkdone会扫描前一天的全部时间戳,若是最大的时间戳已经超过了0点,就说明前一天的Kafka2Hive任务都成功完成了,这样Checkdone就完成了检测。

此外,因为Camus自己只是完成了读Kafka而后写HDFS文件的过程,还必须完成对Hive分区的加载才能使下游查询到。所以,整个Kafka2Hive任务的最后一步是加载Hive分区。这样,整个任务才算成功执行。

每一个Kafka2Hive任务负责读取一个特定的Topic,把Binlog数据写入original_binlog库下的一张表中,即前面图中的original_binlog.db,其中存储的是对应到一个MySQL DB的所有Binlog。

图片3

上图说明了一个Kafka2Hive完成后,文件在HDFS上的目录结构。假如一个MySQL DB叫作user,对应的Binlog存储在original_binlog.user表中。ready目录中,按天存储了当天全部成功执行的Kafka2Hive任务的启动时间,供Checkdone使用。每张表的Binlog,被组织到一个分区中,例如userinfo表的Binlog,存储在table_name=userinfo这一分区中。每一个table_name一级分区下,按dt组织二级分区。图中的xxx.lzo和xxx.lzo.index文件,存储的是通过lzo压缩的Binlog数据。

Merge

Binlog成功入仓后,下一步要作的就是基于Binlog对MySQL数据进行还原。Merge流程作了两件事,首先把当天生成的Binlog数据存放到Delta表中,而后和已有的存量数据作一个基于主键的Merge。Delta表中的数据是当天的最新数据,当一条数据在一天内发生屡次变动时,Delta表中只存储最后一次变动后的数据。

把Delta数据和存量数据进行Merge的过程当中,须要有惟一键来断定是不是同一条数据。若是同一条数据既出如今存量表中,又出如今Delta表中,说明这一条数据发生了更新,则选取Delta表的数据做为最终结果;不然说明没有发生任何变更,保留原来存量表中的数据做为最终结果。Merge的结果数据会Insert Overwrite到原表中,即图中的origindb.table

Merge流程举例

下面用一个例子来具体说明Merge的流程。

图片4

数据表共id、value两列,其中id是主键。在提取Delta数据时,对同一条数据的屡次更新,只选择最后更新的一条。因此对id=1的数据,Delta表中记录最后一条更新后的值value=120。Delta数据和存量数据作Merge后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1和id=2),一条数据未变(id=3)。

默认状况下,咱们采用MySQL表的主键做为这一判重的惟一键,业务也能够根据实际状况配置不一样于MySQL的惟一键。

上面介绍了基于Binlog的数据采集和ODS数据还原的总体架构。下面主要从两个方面介绍咱们解决的实际业务问题。

实践一:分库分表的支持

随着业务规模的扩大,MySQL的分库分表状况愈来愈多,不少业务的分表数目都在几千个这样的量级。而通常数据开发同窗须要把这些数据聚合到一块儿进行分析。若是对每一个分表都进行手动同步,再在Hive上进行聚合,这个成本很难被咱们接受。所以,咱们须要在ODS层就完成分表的聚合。

图片5

首先,在Binlog实时采集时,咱们支持把不一样DB的Binlog写入到同一个Kafka Topic。用户能够在申请Binlog采集时,同时勾选同一个业务逻辑下的多个物理DB。经过在Binlog采集层的聚集,全部分库的Binlog会写入到同一张Hive表中,这样下游在进行Merge时,依然只须要读取一张Hive表。

第二,Merge任务的配置支持正则匹配。经过配置符合业务分表命名规则的正则表达式,Merge任务就能了解本身须要聚合哪些MySQL表的Binlog,从而选取相应分区的数据来执行。

这样经过两个层面的工做,就完成了分库分表在ODS层的合并。

这里面有一个技术上的优化,在进行Kafka2Hive时,咱们按业务分表规则对表名进行了处理,把物理表名转换成了逻辑表名。例如userinfo123这张表名会被转换为userinfo,其Binlog数据存储在original_binlog.user表的table_name=userinfo分区中。这样作的目的是防止过多的HDFS小文件和Hive分区形成的底层压力。

实践二:删除事件的支持

Delete操做在MySQL中很是常见,因为Hive不支持Delete,若是想把MySQL中删除的数据在Hive中删掉,须要采用“迂回”的方式进行。

对须要处理Delete事件的Merge流程,采用以下两个步骤:

  • 首先,提取出发生了Delete事件的数据,因为Binlog自己记录了事件类型,这一步很容易作到。将存量数据(表A)与被删掉的数据(表B)在主键上作左外链接(Left outer join),若是可以所有join到双方的数据,说明该条数据被删掉了。所以,选择结果中表B对应的记录为NULL的数据,便是应当被保留的数据。
  • 而后,对上面获得的被保留下来的数据,按照前面描述的流程作常规的Merge。

图片6

总结与展望

做为数据仓库生产的基础,美团数据平台提供的基于Binlog的MySQL2Hive服务,基本覆盖了美团内部的各个业务线,目前已经可以知足绝大部分业务的数据同步需求,实现DB数据准确、高效地入仓。在后面的发展中,咱们会集中解决CanalManager的单点问题,并构建跨机房容灾的架构,从而更加稳定地支撑业务的发展。

本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面,介绍了这一服务的架构,并介绍了咱们在实践中遇到的一些典型问题和解决方案。但愿可以给其余开发者一些参考价值,同时也欢迎你们和咱们一块儿交流。

招聘

若是你对咱们的工做内容比较感兴趣,欢迎发送简历给 wangmengmeng05@meituan.com,和咱们一块儿致力于解决海量数据采集和传输的问题中来吧!

相关文章
相关标签/搜索