事情是从公司前段时间的需求提及,你们知道宜信是一家金融科技公司,咱们的不少数据与标准互联网企业不一样,大体来讲就是:jquery
玩数据的人都知道数据是很是有价值的,而后这些数据是保存在各个系统的数据库中,如何让须要数据的使用方获得一致性、实时的数据呢?git
过去的通用作法有几种,分别是:github
这些方案都不算完美。咱们在了解和考虑了不一样实现方式后,最后借鉴了 linkedin的思想,认为要想同时解决数据一致性和实时性,比较合理的方法应该是来自于log。web
(此图来自:www.confluent.io/blog/using-…sql
把增量的Log做为一切系统的基础。后续的数据使用方,经过订阅kafka来消费log。数据库
好比:json
为何使用log和kafka做为基础,而不使用Sqoop进行抽取呢? 由于:缓存
为何不使用dual write(双写)呢?,请参考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/安全
这里就很少作解释了。架构
因而咱们提出了构建一个基于log的公司级的平台的想法。
下面解释一下DWS平台,DWS平台是有3个子项目组成:
图中:
因为时间关系,我今天主要介绍DWS中的Dbus和Wormhole,在须要的时候附带介绍一下Swifts。
如前面所说,Dbus主要解决的是将日志从源端实时的抽出。 这里咱们以MySQL为例子,简单说明如何实现。
咱们知道,虽然MySQL InnoDB有本身的log,MySQL主备同步是经过binlog来实现的。以下图:
而binlog有三种模式:
他们各自的优缺点以下:
因为statement 模式的缺点,在与咱们的DBA沟经过程中了解到,实际生产过程当中都使用row 模式进行复制。这使得读取全量日志成为可能。
一般咱们的MySQL布局是采用 2个master主库(vip)+ 1个slave从库 + 1个backup容灾库 的解决方案,因为容灾库一般是用于异地容灾,实时性不高也不便于部署。
为了最小化对源端产生影响,显然咱们读取binlog日志应该从slave从库读取。
读取binlog的方案比较多,github上很多,参考https://github.com/search?utf8=%E2%9C%93&q=binlog。最终咱们选用了阿里的canal作位日志抽取方。
Canal最先被用于阿里中美机房同步, canal原理相对比较简单:
Dbus 的MySQL版主要解决方案以下:
对于增量的log,经过订阅Canal Server的方式,咱们获得了MySQL的增量日志:
在考虑使用Storm做为解决方案的时候,咱们主要是认为Storm有如下优势:
对于流水表,有增量部分就够了,可是许多表须要知道最初(已存在)的信息。这时候咱们须要initial load(第一次加载)。
对于initial load(第一次加载),一样开发了全量抽取Storm程序经过jdbc链接的方式,从源端数据库的备库进行拉取。initial load是拉所有数据,因此咱们推荐在业务低峰期进行。好在只作一次,不须要天天都作。
全量抽取,咱们借鉴了Sqoop的思想。将全量抽取Storm分为了2 个部分:
数据分片须要考虑分片列,按照配置和自动选择列将数据按照范围来分片,并将分片信息保存到kafka中。
下面是具体的分片策略:
全量抽取的Storm程序是读取kafka的分片信息,采用多个并发度并行链接数据库备库进行拉取。由于抽取的时间可能很长。抽取过程当中将实时状态写到Zookeeper中,便于心跳程序监控。
不管是增量仍是全量,最终输出到kafka中的消息都是咱们约定的一个统一消息格式,称为UMS(unified message schema)格式。
以下图所示:
消息中schema部分,定义了namespace 是由 类型+数据源名+schema名+表名+版本号+分库号+分表号 可以描述整个公司的全部表,经过一个namespace就能惟必定位。
payload是指具体的数据,一个json包里面能够包含1条至多条数据,提升数据的有效载荷。
UMS中支持的数据类型,参考了Hive类型并进行简化,基本上包含了全部数据类型。
在整个数据传输中,为了尽可能的保证日志消息的顺序性,kafka咱们使用的是1个partition的方式。在通常状况下,基本上是顺序的和惟一的。
可是咱们知道写kafka会失败,有可能重写,Storm也用重作机制,所以,咱们并不严格保证exactly once和彻底的顺序性,但保证的是at least once。
所以_ums_id_变得尤其重要。
对于全量抽取,_ums_id_是惟一的,从zk中每一个并发度分别取不一样的id片区,保证了惟一性和性能,填写负数,不会与增量数据冲突,也保证他们是早于增量消息的。
对于增量抽取,咱们使用的是MySQL的日志文件号 + 日志偏移量做为惟一id。Id做为64位的long整数,高7位用于日志文件号,低12位做为日志偏移量。
例如:000103000012345678。 103 是日志文件号,12345678 是日志偏移量。
这样,从日志层面保证了物理惟一性(即使重作也这个id号也不变),同时也保证了顺序性(还能定位日志)。经过比较_ums_id_ 消费日志就能经过比较_ums_id_知道哪条消息更新。
其实_ums_ts_与_ums_id_意图是相似的,只不过有时候_ums_ts_可能会重复,即在1毫秒中发生了多个操做,这样就得靠比较_ums_id_了。
整个系统涉及到数据库的主备同步,Canal Server,多个并发度Storm进程等各个环节。
所以对流程的监控和预警就尤其重要。
经过心跳模块,例如每分钟(可配置)对每一个被抽取的表插入一条心态数据并保存发送时间,这个心跳表也被抽取,跟随着整个流程下来,与被同步表在实际上走相同的逻辑(由于多个并发的的Storm可能有不一样的分支),当收到心跳包的时候,即使没有任何增删改的数据,也能证实整条链路是通的。
Storm程序和心跳程序将数据发送公共的统计topic,再由统计程序保存到influxdb中,使用grafana进行展现,就能够看到以下效果:
图中是某业务系统的实时监控信息。上面是实时流量状况,下面是实时延时状况。能够看到,实时性仍是很不错的,基本上1~2秒数据就已经到末端kafka中。
Granfana提供的是一种实时监控能力。
若是出现延时,则是经过dbus的心跳模块发送邮件报警或短信报警。
考虑到数据安全性,对于有脱敏需求的场景,Dbus的全量storm和增量storm程序也完成了实时脱敏的功能。脱敏方式有3种:
总结一下:简单的说,Dbus就是将各类源的数据,实时的导出,并以UMS的方式提供订阅, 支持实时脱敏,实际监控和报警。
说完Dbus,该说一下Wormhole,为何两个项目不是一个,而要经过kafka来对接呢?
其中很大一个缘由就是解耦,kafka具备自然的解耦能力,程序直接能够经过kafka作异步的消息传递。Dbus和Wornhole内部也使用了kafka作消息传递和解耦。
另一个缘由就是,UMS是自描述的,经过订阅kafka,任何有能力的使用方来直接消费UMS来使用。
虽然UMS的结果能够直接订阅,但还须要开发的工做。Wormhole解决的是:提供一键式的配置,将kafka中的数据落地到各类系统中,让没有开发能力的数据使用方经过wormhole来实现使用数据。
如图所示,Wormhole 能够将kafka中的UMS 落地到各类系统,目前用的最多的HDFS,JDBC的数据库和HBase。
在技术栈上, wormhole选择使用spark streaming来进行。
在Wormhole中,一条flow是指从一个namaspace从源端到目标端。一个spark streaming服务于多条flow。
选用Spark的理由是很充分的:
这里补充说一下Swifts的做用:
Wormhole和Swifts对好比下:
经过Wormhole Wpark Streaming程序消费kafka的UMS,首先UMS log能够被保存到HDFS上。
kafka通常只保存若干天的信息,不会保存所有信息,而HDFS中能够保存全部的历史增删改的信息。这就使得不少事情变为可能:
能够说HDFS中的日志是不少的事情基础。
介于Spark原生对parquet支持的很好,Spark SQL可以对Parquet提供很好的查询。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的内容是全部log的增删改信息以及_ums_id_,_ums_ts_都存下来。
Wormhole spark streaming根据namespace 将数据分布存储到不一样的目录中,即不一样的表和版本放在不一样目录中。
因为每次写的Parquet都是小文件,你们知道HDFS对于小文件性能并很差,所以另外还有一个job,天天定时将这些的Parquet文件进行合并成大文件。
每一个Parquet文件目录都带有文件数据的起始时间和结束时间。这样在回灌数据时,能够根据选取的时间范围来决定须要读取哪些Parquet文件,没必要读取所有数据。
经常咱们遇到的需求是,将数据通过加工落地到数据库或HBase中。那么这里涉及到的一个问题就是,什么样的数据能够被更新到数据?
这里最重要的一个原则就是数据的幂等性。
不管是遇到增删改任何的数据,咱们面临的问题都是:
对于第一个问题,其实就须要定位数据要找一个惟一的键,常见的有:
对于第二个问题,就涉及到_ums_id_了,由于咱们已经保证了_ums_id_大的值更新,所以在找到对应数据行后,根据这个原则来进行替换更新。
之因此要软删除和加入_is_active_列,是为了这样一种状况:
若是已经插入的_ums_id_比较大,是删除的数据(代表这个数据已经删除了), 若是不是软删除,此时插入一个_ums_id_小的数据(旧数据),就会真的插入进去。
这就致使旧数据被插入了。不幂等了。因此被删除的数据依然保留(软删除)是有价值的,它能被用于保证数据的幂等性。
插入数据到Hbase中,至关要简单一些。不一样的是HBase能够保留多个版本的数据(固然也能够只保留一个版本)默认是保留3个版本;
所以插入数据到HBase,须要解决的问题是:
Version的选择颇有意思,利用_ums_id_的惟一性和自增性,与version自身的比较关系一致:即version较大等价于_ums_id_较大,对应的版本较新。
从提升性能的角度,咱们能够将整个Spark Streaming的Dataset集合直接插入到HBase,不须要比较。让HBase基于version自动替咱们判断哪些数据能够保留,哪些数据不须要保留。
Jdbc的插入数据:插入数据到数据库中,保证幂等的原理虽然简单,要想提升性能在实现上就变得复杂不少,总不能一条一条的比较而后在插入或更新。
咱们知道Spark的RDD/dataset都是以集合的方式来操做以提升性能,一样的咱们须要以集合操做的方式实现幂等性。
具体思路是:
A:不存在的数据,即这部分数据insert就能够;
B:存在的数据,比较_ums_id_, 最终只将哪些_ums_id_更新较大row到目标数据库,小的直接抛弃。
使用Spark的同窗都知道,RDD/dataset都是能够partition的,可使用多个worker并进行操做以提升效率。
在考虑并发状况下,插入和更新均可能出现失败,那么还有考虑失败后的策略。
好比:由于别的worker已经插入,那么由于惟一性约束插入失败,那么须要改成更新,还要比较_ums_id_看是否可以更新。
对于没法插入其余状况(好比目标系统有问题),Wormhole还有重试机制。插入到其余存储中的就很少介绍了,总的原则是:根据各自存储自身特性,设计基于集合的,并发的插入数据实现。这些都是Wormhole为了性能而作的努力,使用Wormhole的用户没必要关心 。
说了那么多,DWS有什么实际运用呢?下面我来介绍某系统使用DWS实现了的实时营销。
如上图所示:
系统A的数据都保存到本身的数据库中,咱们知道,宜信提供不少金融服务,其中包括借款,而借款过程当中很重要的就是信用审核。
借款人须要提供证实具备信用价值的信息,好比央行征信报告,是具备最强信用数据的数据。 而银行流水,网购流水也是具备较强的信用属性的数据。
借款人经过Web或手机APP在系统A中填写信用信息时,可能会某些缘由没法继续,虽然可能这个借款人是一个优质潜在客户,但之前因为没法或好久才能知道这个信息,因此实际上这样的客户是流失了。
应用了DWS之后,借款人已经填写的信息已经记录到数据库中,并经过DWS实时的进行抽取、计算和落地到目标库中。根据对客户的打分,评价出优质客户。而后马上将这个客户的信息输出到客服系统中。
客服人员在很短的时间(几分钟之内)就经过打电话的方式联系上这个借款人(潜客),进行客户关怀,将这个潜客转换为真正的客户。咱们知道借款是有时效性的,若是时间过久就没有价值了。
若是没有实时抽取/计算/落库的能力,那么这一切都没法实现。
另一个实时报表的应用以下:
咱们数据使用方的数据来自多个系统,之前是经过T+1的方式得到报表信息,而后指导次日的运营,这样时效性不好。
经过DWS,将数据从多个系统中实时抽取,计算和落地,并提供报表展现,使得运营能够及时做出部署和调整,快速应对。
做者:王东
7月25日晚8点,线上直播,【AI中台——智能聊天机器人平台】,点击了解详情。
来源:宜信技术学院