DataPipeline CTO陈肃:从ETL到ELT,AI时代数据集成的问题与解决方案

引言:2018年7月25日,DataPipeline CTO陈肃在第一期公开课上做了题为《从ETL到ELT,AI时代数据集成的问题与解决方案》的分享,本文根据陈肃分享内容整理而成。html

 

你们好!很高兴今天有机会和你们分享一些数据集成方面的见解和应用经验。先自我介绍一下。我叫陈肃,博士毕业于中国科学院大学,数据挖掘研究方向。如今北京数见科技(DataPipeline)任 CTO。以前在中国移动研究院任职算法工程师和用户行为实验室技术经理,以后做为合伙人加入过一家互联网教育公司,从事智能学习方面的研发工做。git

 

 

在毕业后工做的这多年以来,我大部分时候在作大数据和机器学习相关的应用系统研发工做,数据的整合是其中一个很是重要的环节。加入 DataPipeline 后,公司研发的是一款企业级的数据集成产品,旨在帮助企业一站式解决数据集成和元数据管理问题。github

 

ELT 和 ETL 是数据集成的两种基本方式。前者专一于大数据的实时抽取和可靠传输,后者则包含了更丰富的数据转换功能。 因为今天是和 AI 前线的朋友们一块儿探讨数据集成,我主要结合 AI 应用的场景谈谈:为何 ELT 是更适合 AI 应用场景的数据集成方案、采用 Kafka 技术栈来构建 ELT 平台所具有的优点和问题以及咱们所作的一些优化工做。但愿可以对你们的工做和学习有所帮助。算法

 

 

今天个人分享主要内容如上图:数据库

 

首先,我会介绍一下 AI 应用中数据集成的典型场景,ETL 和 ELT 两种数据集成模式的异同点,以及为何 AI 应用下更适合采用 ELT 模式。而后,我会花一些篇幅介绍数据集成中须要重点考虑的基本问题,以及咱们所采用的底层平台——Kafka Connect 在解决这些问题上的优点和局限。apache

 

接下来,我会介绍 DataPipeline 对于 Kafka Connect 一些优化。有的是从底层作的优化,例如线程池的优化。有的则是从产品特性上的优化,例如错误数据队列。api

 

最后,咱们谈一谈 Kafka Connect 和 Kafka Stream 的结合,以及咱们用 Kafka Stream 作数据质量预警方面的一个应用 Case。缓存

 

1、AI 应用场景下的数据集成安全

 

数据集成是把不一样来源、格式、特色性质的数据在逻辑上或物理上有机地集中,为企业提供全面的数据共享。AI 是典型的数据驱动应用,数据集成在其中起着关键的基础性做用。微信

 

 

以一个你们所熟悉的在线推荐服务为例,一般须要依赖三类数据:用户的属性 (年龄、性别、地域、注册时间等)、商品的属性(分类、价格、描述等)、用户产生的各种行为(登陆、点击、搜索、加购物车、购买、评论、点赞、收藏、加好友、发私信等)事件数据。

 

随着微服务框架的流行,这三类数据一般会存在于不一样的微服务中:“用户管理服务”储存着用户的属性、好友关系、登陆等数据;“商品管理服务”存储的商品信息;“订单服务”存储着用户的订单数据;“支付服务”存储用户的支付数据;“评论服务”记录着用户的评论和点赞数据。为了实现一个推荐服务,咱们首先须要让服务可以访问到这些数据。这种数据访问应该是非侵入式的,也就是说不能对原有系统的性能、稳定性、安全性形成额外的负担。所以,推荐服务不该当直接访问这些分散的数据源,而是应该经过某种方式将这些数据从各个业务子系统中提取出来,聚集到一个逻辑上集中的数据库 / 仓库,而后才能方便地使用机器学习框架(例如 Spark MLlib)来读取数据、训练和更新模型。

 

1. ETL 和 ELT 的区别与联系

 

数据集成包含三个基本的环节:Extract(抽取)、Transform(转换)、Load(加载)。

 

 

抽取是将数据从已有的数据源中提取出来,例如经过 JDBC/Binlog 方式获取 MySQL 数据库的增量数据;转换是对原始数据进行处理,例如将用户属性中的手机号替换为匿名的惟一 ID、计算每一个用户对商品的平均打分、计算每一个商品的购买数量、将 B 表的数据填充到 A 表中造成新的宽表等;加载是将数据写入目的地。

 

根据转换转换发生的顺序和位置,数据集成能够分为 ETL 和 ELT 两种模式。ETL 在数据源抽取后首先进行转换,而后将转换的结果写入目的地。ELT 则是在抽取后将结果先写入目的地,而后由下游应用利用数据库的聚合分析能力或者外部计算框架,例如 Spark 来完成转换的步骤。

 

2. 为何 ELT 更适合 AI 应用场景

 

为何说 ELT 更适合 AI 的应用场景呢?

 

 

首先这是由 AI 应用对数据转换的高度灵活性需求决定的。 绝大多数 AI 应用使用的算法模型都包括一个特征提取和变换的过程。根据算法的不一样,这个特征提取多是特征矩阵的简单的归一化或平滑处理,也能够是用 Aggregation 函数或 One-Hot 编码进行维度特征的扩充,甚至特征提取自己也须要用到其它模型的输出结果。这使得 AI 模型很难直接利用 ETL 工具内建的转换功能,来完成特征提取步骤。此外,企业如今不多会从零构建 AI 应用。当应用包括 Spark/Flink MLlib 在内的机器学习框架时,内建的模型库自己每每包含了特征提取和变换的逻辑,这使得在数据提取阶段就作复杂变换的必要性进一步下降。

 

其次,企业常常会基于一样的数据构建不一样应用。 以我以前所在的一家在线教育公司为例,咱们构建了两个 AI 的应用:其中一个是针对各种课程的推荐应用,主要用于增长用户的购买转化率。另一个是自适应学习系统,用于评估用户的知识掌握程度和题目的难度和区分度,从而为用户动态地规划学习路径。两个应用都须要用户属性、作题记录、点击行为以及学习资料文本,但采用的具体模型的特征提取和处理方式彻底不一样。若是用 ETL 模式,咱们须要从源端抽取两遍数据。而采用 ELT 模式,全部数据存储在 HBase 中,不一样的应用根据模型须要过滤提取出所需的数据子集,在 Spark 集群完成相应的特征提取和模型计算,下降了对源端的依赖和访问频次。

 

最后,主流的机器学习框架,例如 Spark MLlib 和 Flink MLlib,对于分布式、并行化和容错都有良好的支持,而且易于进行节点扩容。 采用 ELT 模式,咱们能够避免构建一个专有数据转换集群(可能还伴随着昂贵的 ETL 产品 License 费用),而是用一个通用的、易于建立和维护的分布式计算集群来完成全部的工做,有利于下降整体拥有成本,同时提高系统的可维护性和扩展性。

 

2、从 ETL 和 ELT 面临的主要问题

 

采用 ELT 模式,意味着能够较少的关注数据集成过程当中的复杂转换,而将重点放在让数据尽快地传输上。然而,一些共性的问题依然须要获得解决:

1. 数据源的异构性: 传统 ETL 方案中,企业要经过 ETL 工具或者编写脚本的方式来完成数据源到目的地同步工做。当数据源异构的时候,须要特别考虑 Schema(能够简单理解为数据字段类型)兼容性带来的影响。不管是 ETL 仍是 ELT,都须要解决这一问题。

2. 数据源的动态性: 动态性有两方面含义。一是如何获取数据源的增量;二是如何应对数据源端的 Schema 变化,例如增长列和删除列。

3. 任务的可伸缩性: 当面对少许几个数据源,数据增量不过每日几百 MB 的时候,ELT 平台的可伸缩性不是什么大问题。当 ELT 面对的是成百上千个数据源,或者数据源数据增速很快时,ELT 平台的任务水平切分和多任务并行处理就成为一个必备的要求。平台不只要支持单节点的多任务并行,还须要支持节点的水平扩展。此外,ELT 的上游一般会遇到一些吞吐能力较差的数据源,须要可以对读取进行限速,避免对现有业务产生影响。

4. 任务的容错性:ELT 平台某些节点出现故障的时候,失败的做业必须可以迁移到健康的节点上继续工做。同时,做业的恢复须要实现断点重传,至少不能出现丢失数据,最好可以作到不产生重复的数据。

 

 

3、Kafka Connect 的架构

 

1. Kafka Connect:基于 Kafka 的 ELT 框架

 

可用于构建 ELT 的开源数据集成平台方案不止一种,较普遍采用的包括 Kafka Connect、DataX 等,也有公司直接采用 Flink 等流式计算框架。DataPipeline 做为一家提供企业数据集成产品的公司,咱们在 Kafka Connect 之上踩了许多坑而且也作了许多优化。

 

 

4、踩过的坑与优化的点

 

1. Kafka Connect 应用于ELT的关键问题1

 

下面咱们聊一聊 Kafka Connect 应用过程当中的几个关键问题。

 

首先是 任务的限速和数据缓存问题。从 Kafka Connect 设计之初,就听从从源端到目的地解耦性。当 Source 的写入速度长时间大于 Sink 端的消费速度时,就会产生 Kafka 队列中消息的堆积。若是 Kafka 的 Topic Retention 参数设置不当,有可能会形成数据在消费前被回收,形成数据丢失。Kafka Connect 框架自己并无提供 Connector 级别的限速措施,须要进行二次开发。

 

 

2. Kafka Connect 应用于ELT的关键问题2

 

用户有多个数据源,或者单一数据源中有大量的表须要进行并行同步时,任务的并行化问题 就产生了。Kafka Connect 的 rebalance 是牵一发动全身,一个新任务的开始和中止都会致使全部任务的 reload。当任务数不少的时候,整个 Kafka Connect 集群可能陷入长达数分钟的 rebalance 过程。

 

解决的方法,一是用 CDC(Change Data Capture)来捕获全局的数据增量;二是 在任务内部引入多线程轮询机制,减小任务数量并提升资源利用率。

 

 

3. Kafka Connect 应用于ELT的关键问题3

 

异构数据源同步会遇到 Schema 不匹配 的问题。在须要精确同步的场景下(例如金融机构的异构数据库同步),一般须要 Case by Case 的去定义映射规则。而在 AI 应用场景下,这个问题并非很突出,模型训练对于损失一点精度一般是可容忍的,一些数据库独有的类型也不经常使用。

 

 

4. Kafka Connect 应用于ELT的关键问题4

 

Source 端须要可以检测到 Schema 的变化,从而生成具备正确 Schema 格式的 Source Record。CDC 模式下,经过解析 DDL 语句能够获取到。非 CDC 模式下,须要保存一个快照才可以获取到这种变化。

 

下面我用一些时间对 DataPipeline 所作的优化和产品特性方面的工做。

 

DataPipeline 是一个底层使用 Kafka Connect 框架的 ELT 产品。首先,咱们在底层上引入了 Manager 来进行全局化的任务管理。Manager 负责管理 Source Connector 和 Sink Connector 的生命周期,与 Kafka Connect 的管理 API 经过 REST 进行交互。

 

系统的任何运行异常,都会进行统一的处理,并由通知中心发送给任务的负责人和运维工程师。咱们还提供了一个 Dashboard,用于图形化方式对任务进行生命周期管理、检索和状态监控。用户能够告别 Kafka Connect 的命令行。

 

 

5. DataPipeline的任务并行模型

 

DataPipeline 在任务并行方面作了一些增强。在 DataPipeline Connector 中,咱们在每一个 Task 内部定义和维护一个线程池,从而可以用较少的 Task 数量达到比较高的并行度,下降了 rebalance 的开销。 而对于 JDBC 类型的 Connector,咱们额外容许配置链接池的大小,减小上游和下游资源的开销。此外,每一个 Connector 还能够定义本身限速策略,以适应不一样的应用环境需求。

 

 

6. DataPipeline 的错误队列机制

 

经过产品中错误队列预警功能,用户能够指定面对错误数据暂存和处理逻辑,好比错误队列达到某个百分比的时候任务会暂停,这样的设置能够保证任务不会因少许异常数据而中断,被完整记录下来的异常数据能够被管理员很是方便地进行追踪、排查和处理。

 

相比之前经过日志来筛查异常数据,这种错误队列可视化功能可以大大提高管理员的工做效率。

 

 

7. DataPipeline 的数据转换

 

 

DataPipeline 实现了本身的 动态加载机制。提供了两种 可视化的转换器:基本转换器和高级转换器。前者提供包括字段过滤、字段替换和字段忽略等功能;后者基于 Java,能够更加灵活地对数据处理,而且校验处理结果的 Schema 一致性。DataPipeline 还提供了数据采样和动态调试能力,方便用户进行表级别的转换规则开发。

 

 

值得注意的是,Kafka 不只仅是一个消息队列系统,自己也提供了持久化能力。一个很天然的问题就是:可否不额外引入 Sink 端的外部存储,直接从 Kafka 中获取训练数据?

 

若是模型自己要用到某个 Topic 的全量数据或者最近一段时间的数据,那么经过设置合适的 retention 参数,能够直接将 Kafka 做为训练数据的来源。Kafka 的顺序读模式能够提供很是高的读取速度;若是模型要根据消息的内容作数据筛选,那么因为 Kafka 自己并不提供检索能力,须要遍历全部消息,这样就显得比较低效了。

 

当模型用于线上时,可能还须要引入流式计算来完成实时特征的提取工做。Kafka 自己就提供了这种流式计算能力。

 

8. 流式计算在 ELT 中的做用 - 数据质量预警

 

DataPipeline 也将流式计算引入到平台的质量预警功能中。在咱们的将来版本中,用户能够定义 Topic 级别的质量预警规则模型,例如“在 5 分钟时间内,数据记录的字段 1 均值超过历史均值记录的比率超过 70%”为异常,采起策略为“告警并暂停同步”。经过这种方式,能够在 ELT 的过程当中,尽早发现数据中的异常现象,避免大量异常数据进入数据目的地。

 

 

5、总结与展望

 

最后总结一下。数据集成并非什么新的概念,在过去二十多年间已经普遍应用于各个行业的信息系统。ELT 和 ETL 相比,最大的区别是“重抽取和加载,轻转换”,从而能够用更简单的技术栈、更轻量的方案搭建起一个知足现代企业应用的数据集成平台。AI 应用内在的特色也使得 ELT 特别适合这个场景。

 

Kafka Connect 自己是一个业界被普遍采用的 ELT 框架,针对容错、分布式、Schema 一致性等方面都提供了良好的支持,同时有大量的社区和商业资源可供参考和选择。DataPipeline 基于 Kafka Connect 作了大量数据集成场景下的优化,与 Kafka Stream 相结合,可以为包括 AI 在内的各类应用场景构建起一个完整的数据层支撑方案。

 

有其它关于数据集成的技术问题,也欢迎一块儿探讨、共同提升。

 

 

参考资料

 

· How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka

https://www.confluent.io/blog/  

 

· Kafka Connect 官方文档

https://docs.confluent.io/current/connect/index.html

 

· Machine Learning + Kafka Streams Examples

https://github.com/kaiwaehner  

 

· PredictionIO- 基于 Spark 的机器学习框架

http://predictionio.apache.org

 

Q & A

Q1:DataPipeline 避开了数据处理这个过程,并以此提升性能,这个思路很承认。可是有个问题:从数据生产到数据利用的环节中,总要有一步数据处理的步骤的,这个步骤,从产品角度,DataPipeline 是如何考虑的?

A1:ELT 的核心思想就是要利用下游数据存储性能大幅提高和机器学习应用的灵活性的优点,在数据流转的过程当中不作过于复杂的计算。若是真的须要作处理,也能够基于咱们的产品能够去写转换的代码。但这种处理都是无状态的。有状态处理,建议放到下游去作。这样才更符合 ELT 的理念。

 

Q2:请问数据的落地是自动的吗?

A2: 基于原生 Kafka Connector,须要命令行启动目标端类型的 Sink Connector,指定消费的 topic 列表,经过代码完成数据落地。基于 DataPipeline 产品,经过界面配置源和目的地后,落地是彻底自动的。

 

Q3:多线程读,对源端的数据表或用户权限有没有特定的要求?

A3:JDBC 模式的 Source Connector 使用的 RDBMS 用户,须要具备选择同步表的 select 权限。CDC 模式的各不相同,参照产品内详尽的权限配置说明。

 

Q4:如何保证生产和消费的 EOS 恰好一次语义?

A4: Kafka Connect 下的 Exactly Once Semantic 依赖于具体 Connector 实现,Kafka Connect 框架自己对此只提供了必要非充分的支持。咱们先来看 Source 端:假定 Source Connector 是从 MySQL 的 Binlog 中抽取数据到 Kafka,为了实现 EOS,首先 Source Connector 在每次提交记录到 Kafka 的时候,须要原子化的记录下来对应的 binlog position,这样才能保证任务异常中断、重启后可以从这个 position 继续读取。Kafka Connect 框架在 Source 端封装了 offset storage 的存储更新逻辑。offset storage 本质上是一个 Kafka 的 topic,利用 Kafka 的事务机制,理论上能够保证 offset 的修改和消息发送的原子性。再来看 Sink 端:若是 Sink Connector 能够将数据的输出和 Offset 的记录进行原子化操做,那么同理也可以作到 EOS。但这个原子化操做须要 Sink 端本身用某种机制实现,例如 Confluent 的 HDFS Connector 就用 WAL 日志保证了写入的 EOS。

 

PS.添加DataPipeline君微信:datapipeline2018,拉你进技术讨论群。

相关文章
相关标签/搜索