最近又能挤一挤时间,来聊一聊前一段时间接手的一个大数据系统项目。
随着云计算的普及,大部分互联网公司的系统都是基于云原生的产品和体系来搭建的,我接手的系统也不例外。数据处理部分从底层存储,到中间层数据处理系统,再到上层的ETL系统第一版都是基于Google Cloud Platform来搭建的,GKE + PubSub + DataFlow + FireStore,数据服务部分,负载均衡也是Google App Engine的体系。
GCP的产品在接入的环节做的都很便捷,门槛低,对应产品主流开发语言的SDK一应俱全,在项目排期紧张的情况下,整套系统架构基于GCP进行开发、部署与使用都很方便。
那为什么要做一次迁移呢?经过这段时间的折腾,回头想想主要原因有以下几点:
那么我们在迁移中遇到了什么挑战呢?我认为最有挑战性的有两块:云计算圈子有一句话是存储在哪里,用户就在哪里,由此可见存储做的好对用户的粘性有多大了吧。那遇到的最大的挑战就是整体存储的迁移,从原来的Firestore迁移到mongodb,批处理数据导入hbase;其次是整套数据处理框架的改造,由原来的Apache beam改为Flink framework。
接下来结合迁移计划,我将对v1与v2两个版本的大数据系统做更详细的介绍与分析。
###迁移计划
从架构图可以看出ETL的链路比较长,从源数据队列消费后需要上传一次阿里云OSS,再依赖OSS callback功能触发GCP上的pipeline程序进行GetObject的操作,最后再把数据导入GCP上的ETL系统做数据清洗、处理和聚合入库。
团队历史原因导致的整体链路冗长,这里忽略1w字。存储的选型上,我们选择了Firestore,这里firestore可能对国内的同学比较陌生,其实可以当其为分布式分片存储的mongodb集群,但是Firestore的subcollection功能特别符合我们的场景,我们的数据是基于用户id存储的document,subcollection可以让我们方便的在doc内创建子collection,这样就很容易基于某个用户id做用户维度下的二级索引,做用户维度下的二级甚至N级维度的数据聚合。
数据处理模型上我们选用是Dataflow进行流处理,因为我们的使用场景里,一个时间窗口内不会有重复的用户id出现,所以处理方式是获取到数据后,做1次IO+中间值处理。中间的operator也比较简单,第一版的设计只是依赖Dataflow的worker管理能力,我们的业务层逻辑只有一个worker,因此整体的逻辑处理模型如下:
整个项目上线10天左右的时间,数据处理层在没有过多优化的情况下,在流量高峰时是要做横向扩容才能保证处理速度的。那在第一版里最大的挑战是在于整个链路如何保障速度和稳定性,特别是在跨洋处理的这一段。
我们做了以下几个处理:
迁移后的数据系统,主要的变化有三个:
存储选型的一些思考:
一般使用Shard Cluster MongoDB, 主要考虑几个因素:
设计shard key主要目标有:
一般通过document里的某个字段作hash来完成sharding,这里需要考虑到jumbo chunk的问题。如果只是基于某个字段进行分片,逻辑中再根据时间进行query,这样还是会导致基于某个字段的数据都集中在一个分片中,因此根据请求和业务逻辑,可以通过(key1, key2, key3…)这样的方式进行shard key设计。
chunk达到size之后会分裂,除了jumbo chunk的情况。底层的balancer程序会根据shard的情况迁移chunk。具体mongo分片的逻辑可以参考https://docs.mongodb.com/manual/core/sharding-shard-key/
根据数据的访问频次和处理方式,我们还选择了HBase用于批处理的数据存储,按用户一天、一周与一个月的数据进行聚合;OSS存储原始数据考虑到部分数据并非热点数据,并且是原始数据格式的format后直接显示,所以不用转存一份,这里我们是使用MongoDB来做二级索引,比如某个用户ID下的所有Match ID。另外OSS上的数据也是按天存储,批处理模型也直接是从OSS读数据作为datasource进行批处理计算。
那么之前Firestore SubCollection的数据怎么处理?
V1的架构中,我们的处理模型是一个wokrer处理所有的数据在分别写Firestore collection里的documents。
在V2的架构里,我们从Dataflow迁移到了Flink的集群,所以在operator的拆分上也做了一些优化,即读取原始数据后写入不同的flatmap算子给到不用维度的数据使用,在分别聚合和sink。
这么做的好处是我们在基于flink自身check points和自己做savepoints时可以更细粒度的做恢复,且之前dataflow里如果要更新集群就需要drain dataflow,屏蔽source读取,把已有的数据处理完,再开启新的集群处理新的数据。
搭建完系统后,两套系统同时在线进行双写,之后再开始数据迁移的工作,这里我们的做法还是覆盖overview数据并记录迁移时间,在API层通过全局的overview已聚合数据再聚合迁移时间之后产生的数据。
所以迁移过程没有那么复杂,需要注意的是迁移过程因为也是跨洋迁移,做了以下几点优化:
是实现的逻辑是遍历collection的doc一个个删除,还要保持在线,稳定性差,耗时也很长;第三删除之后的doc index还会存在console上,因此无法判断是删除成功还是失败。