大数据总线平台架构

写在前面

研发团队,研发规模发展到必定阶段,各类平台化,中台化的方案就走上了日程。见多了业务架构的平台化方案,今天咱们来拆解下数据总线平台的架构。mysql

数据总线平台架构

数据平台的数据源主要来自于两个渠道:nginx

  1. 关系数据库
  2. 日志数据

先看一张通用的数据总线平台架构图:正则表达式

数据采集

关系数据库源数据采集,通常采用模拟mysql的slave方式接收binlog信息以实现数据抽取,同时须要对日志信息进行信息转换,转换后数据入kafka进行平滑流控传输,下游消费者进行数据消费,写入数据管理平台。redis

日志数据来自于各类中间件数据,好比redis日志,nginx日志,rpc日志,es日志,文件系统日志等,经过filebeat或者socket方式到服务器节点agent,经过agent采集并统一发往kafka系统,以后写入数据管理平台。sql

关系数据库采集

采集流程分为三个部分:数据库

  1. 日志抽取模块
  2. 增量转换模块
  3. 全量拉取模块

日志抽取模块由两部分组成:json

  • canal server:负责从mysql拉取增量日志
  • mysql-extractor storm:负责将增量日志输出到kafka,过滤掉不须要的表数据,保证at least one和高可用

mysql主备是经过binlog实现的。binlog同步有三种模式:服务器

  • Row模式
  • Statement模式
  • Mixed模式

通常采用Row模式进行复制,能够读取全量日志。架构

部署上能够采用2个master(vip)+1个slave+1个backup做为容灾,读取binlog日志从slave读取。并发

binlog采集工具比较多,有dbus和阿里的canal均可以进行增量数据读取。

日志抽取模块将目标数据从canal server读取,放到kafka中。

能够基于zk的canal server高可用模式,不出现单点问题,日志抽取模块能够用storm程序,一样作好高可用。

增量日志抽取流程以下:

分发模块:

  • 未来自数据源的日志按照不一样的schema分发到不一样topic上,这样为了数据隔离,通常不一样到schema对应不一样的数据库
  • 同时为了分离转换模块的计算的压力,转换模块计算量较大,能够多节点部署,每一个schema一个,以便提高效率

转换模块:

  • 实时数据格式转换模块,canal数据是pb编码格式,须要转换成业务要求的格式,并生成相关id信息
  • 实时数据脱敏,对指定列信息进行脱敏,编码,加盐等
  • 响应全量事件的能力,当收到须要响应全量数据的需求时,为保证数据顺序,会暂停拉取增量数据,等全量完成以后,再继续
  • 监控数据,分发模块和转换模块都会响应event,统计每一张表在两次心跳中的数据和延迟状况,发到统计系统进行监控数据
  • 分发模块和转换模块,均可以执行reload事件,对zk上的源数据进行加载配置

全量拉取:

全量拉取借鉴sqoop思想,整个全量过程分为两个部分:

  1. 数据分片
  2. 实际拉取

数据分片

分片获取max,min,count等信息,根据片大小计算分片数,生成分片信息保存到split topic中。

关系数据采用主键索引进行分片,高效,且主键和数据存储顺序一致。

实际拉取

每一个分片表明一个小任务,由拉取转换模块经过多个并发度到方式链接slave从库拉取,完成状况汇报到zk中,便于监控。

因为全量拉取对于源数据库有必定的压力,作法以下:

  1. 从slave从库拉取数据
  2. 并发度6~8
  3. 推荐在业务低峰期进行

全量拉取不常常发生,通常作初始化拉取一次,某种状况下须要全量时能够触发一次。

一致性处理

为保证日志消息顺序性,kafka咱们使用一个partition方式,基本上顺序的和惟一的。若是出现写kafka异步写入失败,storm有重作机制,所以并非严格保证exactly once和彻底顺序性,保证的是at least once。

所以ums_id_变得尤其重要。 对于全量抽取,ums_id是一个值,该值为全量拉取event的ums_id号,表示该批次的全部数据是一批的,由于数据都是不一样的能够共享一个ums_id_号。ums_uid_流水号从zk中生成,保证了数据的惟一性。 对于增量抽取,咱们使用的是 mysql的日志文件号 + 日志偏移量做为惟一id。Id做为64位的long整数,高6位用于日志文件号,低13位做为日志偏移量。 例如:000103000012345678。 103 是日志文件号,12345678 是日志偏移量。 这样,从日志层面保证了物理惟一性(即使重作也这个id号也不变),同时也保证了顺序性(还能定位日志)。经过比较ums_id_就能知道哪条消息更新。

ums_ts_的价值在于从时间维度上能够准确知道event发生的时间。好比:若是想获得一个某时刻的快照数据。能够经过ums_ts 来知道截断时间点。

日志数据采集

业界日志收集、结构化、分析工具方案不少,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所长。

在结构化日志这个方面,大多采用配置正则表达式模板:用于提取日志中模式比较固定、通用的部分,例如日志时间、日志类型、行号等。对于真正的和业务比较相关的信息,这边部分是最重要的,称为message部分,咱们但愿使用可视化的方式来进行结构化。

log4j的日志以下:

若是想将上述日志转换成结构化数据:

DBUS设计的数据日志同步方案以下:

日志抓取端采用业界流行的组件(例如Logstash、Flume、Filebeat等)。一方面便于用户和业界统一标准,方便用户的整合;另外一方面也避免无谓的重造轮子。抓取数据称为原始数据日志(raw data log)放进Kafka中,等待处理。

提供可视化界面,配置规则来结构化日志。用户可配置日志来源和目标。同一个日志来源能够输出到多个目标。每一条“日志源-目标”线,中间数据通过的规则处理用户根据本身的需求来自由定义。最终输出的数据是结构化的,即:有schema约束,能够理解为相似数据库中的表。

所谓规则,在DBUS中,即“规则算子”。DBUS设计了丰富易用的过滤、拆分、合并、替换等算子供用户使用。用户对数据的处理可分多个步骤进行,每一个步骤的数据处理结果可即时查看、验证;可重复使用不一样算子,直到转换、裁剪获得本身须要的数据。

将配置好的规则算子组运用到执行引擎中,对目标日志数据进行预处理,造成结构化数据,输出到Kafka,供下游数据使用方使用。

流程以下:

根据配置,咱们支持同一条原始日志,能提取为一个表数据,或者能够提取为多个表数据。

每一个表是结构化的,知足相同的schema。

每一个表是一个规则 算子组的合集,能够配置1个到多个规则算子组 每一个规则算子组,由一组规则算子组合而成 拿到一条原始数据日志, 它最终应该属于哪张表呢?

每条日志须要与规则算子组进行匹配:

符合条件的进入规则算子组的,最终被规则组转换为结构化的表数据。 不符合的尝试下一个规则算子组。 都不符合的,进入unknown_table表。

自定义统一消息格式

不管是增量、全量仍是日志,最终输出到结果kafka中的消息都是咱们约定的统一消息格式,称为UMS(unified message schema)格式。以下图所示:

Protocol

数据的类型,被UMS的版本号

schema 1)namespace 由:类型. 数据源名.schema名 .表名.表版本号. 分库号 .分表号 组成,可以描述全部表。

例如:mysql.db1.schema1.testtable.5.0.0

2)fields是字段名描述。

ums_id_ 消息的惟一id,保证消息是惟一的 ums_ts_ canal捕获事件的时间戳; ums_op_ 代表数据的类型是I (insert),U (update),B (before Update),D(delete) ums_uid_ 数据流水号,惟一值 3)payload是指具体的数据。

一个json包里面能够包含1条至多条数据,提升数据的有效载荷。

心跳监控和预警

RDBMS类系统涉及到数据库的主备同步,日志抽取,增量转换等多个模块等。

日志类系统涉及到日志抽取端,日志转换模模块等。

如何知道系统正在健康工做,数据是否可以实时流转? 所以对流程的监控和预警就尤其重要。

对于RDBMS类系统

心跳模块从dbusmgr库中得到须要监控的表列表,以固定频率(好比每分钟)向源端dbus库的心跳表插入心跳数据(该数据中带有发送时间),该心跳表也做为增量数据被实时同步出来,而且与被同步表走相同的逻辑和线程(为了保证顺序性,当遇到多并发度时是sharding by table的,心跳数据与table数据走一样的bolt),这样当收到心跳数据时,即使没有任何增删改的数据,也能证实整条链路是通的。

增量转换模块和心跳模块在收到心跳包数据后,就会发送该数据到influxdb中做为监控数据,经过grafana进行展现。 心跳模块还会监控延时状况,根据延时状况给以报警。

对于日志类系统

从源端就会自动产生心跳包,相似RDBMS系统,将心跳包经过抽取模块,和算子转换模块同步到末端,由心跳模块负责监控和预警。

相关文章
相关标签/搜索