简介: 获取更详细的 Databricks 数据洞察相关信息,可至产品详情页查看:https://www.aliyun.com/produc...
做者
高爽,基智科技数据中心负责人
尚子钧,数据研发工程师sql
北京基智科技有限公司是一家提供智能营销服务的科技公司。公司愿景是基于 AI 和大数据分析为 B2B 企业提供全流程的智能营销服务。公司秉承开放,挑战,专业,创新的价值观从线索挖掘到 AI 智达、CRM 客户管理覆盖客户全生命周期,实现全渠道的营销和数据分析决策,帮助企业高效引流,精准拓客,以更低的成本获取更多的商机。截至目前,基智科技已与包括房产、教育、汽车、企业服务等领域展开普遍合做。安全
在基智科技目前的离线计算任务中,大部分数据源都是来自于业务 DB(MySQL) 。业务 DB 数据接入的准确性、稳定性和及时性,决定着下游整个离线计算 pipeline 的准确性和及时性。最初咱们在 ECS 上搭建了本身的 Hadoop 集群,天天使用 Sqoop 同步 MySQL 数据,再经由 Spark ETL 任务,落表写入 Hive ,ES,MongoDB 、MySQL ,经过调用 Service API 作页签的展现。架构
咱们的 ETL 任务通常在凌晨1点开始运行,数据处理阶段约1h, Load 阶段1h+,总体执行时间为2-3h,下图为咱们的 ETL 过程:运维
上面的架构在使用的过程当中如下几个问题比较突出:dom
随着业务数据的增加,受 DB 性能瓶颈影响突出。
须要维护多套数据源,数据冗杂,容易造成数据孤岛使用不方便。
天级 ETL 任务耗时久,影响下游依赖的产出时间。
数据主要存储在 HDFS 上,随着数据的增长,须要增长集群,成本这一块也是不小的开销。
大数据平台运维成本高。oop
为了解决天级 ETL 逐渐尖锐的问题,减小资源成本、提早数据产出,咱们决定将T+1级 ETL 任务转换成T+0实时数据入库,在保证数据一致的前提下,作到数据落地便可用。性能
考虑过使用 Lambda 架构在离线、实时分别维护一份数据但在实际使用过程当中没法保证事务性,随着数据量增大查询性能低,操做较复杂维护成本比较高等问题最终没能达到理想化使用。大数据
后来咱们决定选择数据湖架构,紧接着考察了市场上主流的数据湖架构:Delta Lake(开源和商业版)& Hudi。两者都支持了 ACID 语义、Upsert、Schema 动态变动、Time Travel 等功能,但也存在差别好比:优化
Delta Lake 优点:阿里云
Delta Lake 不足:
Hudi 优点:
Hudi 不足:
综合以上指标,加上咱们以前的平台就是基于阿里云平台搭建,选型时阿里云还没有支持 Hudi ,最终咱们选择了阿里云 Databricks 数据洞察(商业版 Delta Lake 专业性更强)。同时 Databricks 数据洞察提供全托管服务,可以免去咱们的运维成本。
总体的架构如上图所示。咱们接入的数据会分为两部分,存量历史数据和实时数据,存量数据使用 Spark 将 MySQL 全量数据导入 Delta Lake 的表中, 实时数据使用 Binlog 采集实时写入到 Delta Lake 表中,这样实时数据和历史数据都同步到同一份表里面真正实现批流一体操做。
前期在阿里同事的协助下咱们完成了数据迁移的工做,实如今Databricks数据洞察架构下数据开发工做,咱们的前期作的准备以下:
天天作ETL数据清洗,作表的merge操做 ,delta表结构为:
%sql CREATE TABLE IF NOT EXISTS delta.delta_{table_name}( id bigint, uname string, dom string, email string, update timestamp, created timestamp ) USING delta LOCATION '------/delta/'
%sql MERGE INTO delta.delta_{table_name} AS A USING (SELECT * FROM rds.table_{table_name} where day= date_format (date_sub (current_date,1), 'yyyy-mm-dd') AS B ON A.id=B.id WHEN MATCHED THEN update set A.uname=B.name, A.dom=B.dom, A.email=B.email, A.updated=current_timestamp() WHEN NOT MATCHED THEN INSERT (A.uname,A.dom,A.email,A.update,A.created) values (B.name,B.dom,B.email,current_timestamp(),current_timestamp())
因为 Delta Lake 的数据仅接入实时数据,对于存量历史数据咱们是经过 SparkSQL 一次性 Sink Delta Lake 的表中,这样咱们流和批处理时只维护一张 Delta 表,因此咱们只在最初对这两部分数据作一次 merge 操做。
同时为了保证数据的高安全,咱们使用 Databricks Deep Clone 天天会定时更新来维护一张从表以备用。对于每日新增的数据,使用 Deep Clone 一样只会对新数据 Insert 对须要更新的数据 Update 操做,这样能够大大提升执行效率。
CREATE OR REPLACE TABLE delta.delta_{table_name}_clone DEEP CLONE delta.delta_{table_name};
原文连接本文为阿里云原创内容,未经容许不得转载。