美图离线ETL实践

美图收集的日志须要经过 ETL 程序清洗、规整,并持久化地落地于 HDFS / Hive,便于后续的统一分析处理。mysql

 

640

图 1sql

 

 

 

ETL?即 Extract-Transform-Load,用来描述将数据历来源端通过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL?一词较经常使用在数据仓库,但其对象并不限于数据仓库。数据库

 

在美图特有的业务环境下,ETL 须要作到如下需求:json

1.大数据量、高效地清洗落地。美图业务繁多、用户基数大、数据量庞大,除此以外业务方但愿数据采集后就能快速地查询到数据。多线程

 

2.灵活配置、知足多种数据格式。因为不断有新业务接入,当有新业务方数据接入时要作到灵活通用、增长一个配置信息就能够对新业务数据进行清洗落地;同时每一个业务方的数据格式各式各样,ETL 须要兼容多种通用数据格式,以知足不一样业务的需求(如 json、avro、DelimiterText 等)。架构

 

3.约束、规范。须要知足数据库仓库规范,数据按不一样层(STG 层、ODS 层等)、不一样库(default.db、meipai.db 等)、不一样分区(必须指定时间分区)落地。app

 

4.容错性。考虑业务日志采集可能存在必定的脏数据,须要在达到特定的阈值时进行告警;而且可能出现?Hadoop 集群故障、Kafka 故障等各类情况,所以须要支持数据重跑恢复。框架

 

ETL 有两种形式:实时流 ETL 和 离线 ETL。oop

 

如图 2 所示,实时流 ETL?一般有两种形式:一种是经过 Flume 采集服务端日志,再经过 HDFS 直接落地;另外一种是先把数据采集到 Kafka,再经过 Storm 或 Spark streaming 落地 HDFS,实时流 ETL 在出现故障的时候很难进行回放恢复。美图目前仅使用实时流 ETL 进行数据注入和清洗的工做。大数据

 

640

图 2

 

根据 Lambda 结构,若是实时流 ETL 出现故障须要离线 ETL 进行修补。离线 ETL 是从 Kafka拉取消息,通过 ETL 再从 HDFS 落地。为了提升实时性及减轻数据压力,离线 ETL 是每小时 05 分调度,清洗上一个小时的数据。为了减轻? HDFS NameNode 的压力、减小小文件,日期分区下同个 topic&partition 的数据是 append 追加到同一个日志文件。

 

 

离线 ETL 采用 MapReduce 框架处理清洗不一样业务的数据,主要是采用了分而治之的思想,可以水平扩展数据清洗的能力;

 

640

图 3:离线 ETL 架构

 

如图 3 所示,离线 ETL 分为三个模块:

 

离线 ETL 工做流程

 

640

图 4

 

如图 4 所示是离线 ETL 的基本工做流程:

1.kafka-etl 将业务数据清洗过程当中的公共配置信息抽象成一个 etl schema ,表明各个业务不一样的数据;

2.在 kafka-etl 启动时会从 zookeeper 拉取本次要处理的业务数据 topic&schema 信息;

3.kafka-etl 将每一个业务数据按 topic、partition 获取的本次要消费的 offset 数据(beginOffset、endOffset),并持久化 mysql;

4.kafka-etl 将本次须要处理的 topic&partition 的 offset 信息抽象成 kafkaEvent,而后将这些 kafkaEvent 按照必定策略分片,即每一个 mapper 处理一部分 kafkaEvent;

5.RecordReader 会消费这些 offset 信息,解析 decode 成一个个 key-value 数据,传给下游清洗处理;

6.清洗后的 key-value 统一经过 RecordWriter 数据落地 HDFS。

 

离线 ETL 的模块实现

 

数据分片(Split)

咱们从 kafka 获取当前 topic&partition 最大的 offset 以及上次消费的截止 offset ,组成本次要消费的[beginOffset、endOffset]kafkaEvent,kafkaEvent 会打散到各个 Mapper 进行处理,最终这些 offset 信息持久化到 mysql 表中。

 

640

图 5

 

那么如何保证数据不倾斜呢?首先经过配置自定义mapper个数,并建立对应个数的ETLSplit。因为kafkaEevent包含了单个topic&partition以前消费的Offset以及将要消费的最大Offset,便可得到每一个 kafkaEvent 须要消费的消息总量。最后遍历全部的 kafkaEevent,将当前 kafkaEevent 加入当前最小的 ETLSplit(经过比较须要消费的数据量总和,便可得出),经过这样生成的 ETLSplit 能尽可能保证数据均衡。

 

数据解析清洗(Read)

 

640

图 6

 

如图 6 所示,首先每一个分片会有对应的 RecordReader 去解析,RecordReade 内包含多个 KafkaConsumerReader ,就是对每一个 KafkaEevent 进行消费。每一个 KafkaEevent 会对应一个?KafkaConsumer,拉取了字节数据消息以后须要对此进行 decode 反序列化,此时就涉及到 MessageDecoder 的结构。MessageDecoder 目前支持三种格式:

 

 

MessageDecoder 接收到?Kafka 的 key 和 value 时会对它们进行反序列化,最后生成 ETLKey 和 ETLValue。同时?MessageDecoder 内包含了 Injector,它主要作了以下事情:


过程当中还有涉及到 DebugFilter,它将 SDK 调试设备的日志过滤,不落地到 HDFS。

 

多文件落地(Write)

因为 MapReduce 自己的 RecordWriter 不支持单个落地多个文件,须要进行特殊处理,而且 HDFS 文件是不支持多个进程(线程)writer、append,因而咱们将 KafkaKey+ 业务分区+ 时间分区 + Kafka partition 定义一个惟一的文件,每一个文件都是会到带上 kafka partition 信息。同时对每一个文件建立一个?RecordWriter。

 

640

图 7


如图 7 所示,每一个?RecordWriter 包含多个?Writer ,每一个 Writer 对应一个文件,这样能够避免同一个文件多线程读写。目前是经过 guava cache 维护 writer 的数量,若是 writer 太多或者太长时间没有写访问就会触发 close 动做,待下批有对应目录的 kafka 消息在建立 writer 进行 append 操做。这样咱们能够作到在同一个 map 内对多个文件进行写入追加。?

 

检测数据消费完整性 (Commit)

640

图 8

 

MapReduce Counter 为提供咱们一个窗口,观察统计 MapReduce job 运行期的各类细节数据。而且它自带了许多默认 Counter,能够检测数据是否完整消费:

reader_records: 解析成功的消息条数;

decode_records_error: 解析失败的消息条数;

writer_records: 写入成功的消息条数;

...

最后经过本次要消费 topic offset 数量、reader_records 以及 writer_records 数量是否一致,来确认消息消费是否完整。

*容许必定比例的脏数据,若超出限度会生成短信告警

 

?/ ETL 系统核心特征 /?

 

数据补跑及其优化

 

ETL 是如何实现数据补跑以及优化的呢?首先了解一下须要重跑的场景:

 

640

*当用户调用 application kill 时会经历三个阶段:1) kill SIGTERM(-15) pid;2) Sleep for 250ms;3)kill SIGKILL(-9) pid 。

 

那么有哪些重跑的方式呢?

 

640

 

如图 9 所示是第三种重跑方式的总体流程,ETL 是按照小时调度的,首先将数据按小时写到临时目录中,若是消费失败会告警通知并重跑消费当前小时。若是落地成功则合并到仓库目录的目标文件,合并失败一样会告警通知并人工重跑,将小文件合并成目标文件。

 

640

图 9

 

优化后的重跑状况分析以下表所示:

 

640

 

自动水平扩展

 

如今离线 Kafka-ETL 是每小时 05 分调度,每次调度的 ETL 都会获取每一个 topic&partition 当前最新、最大的 latest offset,同时与上个小时消费的截止 offset 组合成本地要消费的 kafkaEvent。因为每次获取的 latest offset 是不可控的,有些状况下某些 topic&partition 的消息 offset 增加很是快,同时 kafka topic 的 partition 数量来不及调整,致使 ETL 消费处理延迟,影响下游的业务处理流程:

 

 

Kafka ETL 是否能自动水平扩展不强依赖于 kafka topic partition 的个数。若是某个 topic kafkaEvent 须要处理的数据过大,评估在合理时间范围单个 mapper 能消费的最大的条数,再将 kafkaEvent 水平拆分红多个子 kafkaEvent,并分配到各个 mapper 中处理,中老年品牌这样就避免单个 mapper 单次须要处理过大 kafkaEvent 而致使延迟,提升水平扩展能力。拆分的逻辑如图 10 所示:

 

640

图 10

 

后续咱们将针对如下两点进行自动水平扩展的优化:

640?wx_fmt=png


文章来源:http://www.javashuo.com/article/p-nclfpeix-mg.html

相关文章
相关标签/搜索