数据采集介绍
ETL基本上就是数据采集的表明,包括数据的提取(Extract)、转换(Transform)和加载(Load)。数据源是整个大数据平台的上游,数据采集是数据源与数仓之间的管道。在采集过程当中针对业务场景对数据进行治理,完成数据清洗工做。html
在大数据场景下,数据源复杂、多样,包括业务数据库、日志数据、图片、视频等多媒体数据等。数据采集形式也须要更加复杂,多样,包括定时、实时、增量、全量等。常见的数据采集工具也多种多样,能够知足多种业务需求。java
一个典型的数据加载架构:
python
常见的三个数据采集场景:mysql
- 场景1:从支持FTP、SFTP、 HTTP等 协议的数据源获取数据
- 场景2:从业务数据库获取数据,数据采集录入后需支撑业务系统
- 场景3:数据源经过Kafka等消息队列,须要实时采集数据
数据采集系统需求:git
- 数据源管理与状态监控
- 定时、实时、全量、增量等多模式的数据采集及任务监控
- 元数据管理、数据补采及数据归档
经常使用数据采集工具
Sqoop
Sqoop是经常使用的关系数据库与HDFS之间的数据导入导出工具,将导入或导出命令翻译成MapReduce程序来实现。因此经常使用于在Hadoop和传统的数据库(Mysq|、Postgresq|等)进行数据的传递。github
能够经过Hadoop的MapReduce把数据从关系型数据库中导入到Hadoop集群。使用Sqoop传输大量结构化或半结构化数据的过程是彻底自动化的。sql
Sqoop数据传输示意图:
数据库
Sqoop Import流程:
apache
- 获取源数据表的MetaData信息
- 根据参数提交MapReduce任务
- 表内每行做为一条记录,按计划进行数据导入
**Sqoop Export流程:***
json
- 获取目标数据表的MetaData信息
- 根据参数提交MapReduce任务
- 对HDFS文件内每行数据按指定字符分割,导出到数据库
Apache Flume
Apache Flume本质上是一个分布式、可靠的、高可用的日志收集系统,支持多种数据来源,配置灵活。Flume能够对海量日志进行采集,聚合和传输。
Flume系统分为三个组件,分别是Source(负责数据源的读取),Sink(负责数据的输出),Channel(做为数据的暂存通道),这三个组件将构成一个Agent。Flume容许用户构建一个复杂的数据流,好比数据流经多个Agent最终落地。
Flume数据传输示意图:
Flume多数据源多Agent下的数据传输示意图:
Flume多Sink多Agent下的数据传输示意图:
关于Flume的实操内容能够参考:
DataX
官方文档:
DataX是阿里开源的异构数据源离线同步工具,致力于实现关系数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、 HBase、 FTP等各类异构数据源之间高效稳定的数据同步功能。DataX将复杂的网状的同步链路变成了星型数据同步链路,具备良好的扩展性。
网状同步链路和DataX星型数据同步链路的对比图:
DataX的架构示意图:
Datax数据采集实战
官方文档:
到GitHub上的下载地址下载DataX,或者拉取源码进行编译:
将下载好的安装包,上传到服务器:
[root@hadoop ~]# cd /usr/local/src [root@hadoop /usr/local/src]# ls |grep datax.tar.gz datax.tar.gz [root@hadoop /usr/local/src]#
将安装包解压到合适的目录下:
[root@hadoop /usr/local/src]# tar -zxvf datax.tar.gz -C /usr/local [root@hadoop /usr/local/src]# cd ../datax/ [root@hadoop /usr/local/datax]# ls bin conf job lib plugin script tmp [root@hadoop /usr/local/datax]#
执行DataX的自检脚本:
[root@hadoop /usr/local/datax]# python bin/datax.py job/job.json ... 任务启动时刻 : 2020-11-13 11:21:01 任务结束时刻 : 2020-11-13 11:21:11 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
CSV文件数据导入Hive
检测没问题后,接下来简单演示一下将CSV文件中的数据导入到Hive中。咱们须要用到hdfswriter,以及txtfilereader。官方文档:
- https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
- https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
首先,到Hive中建立一个数据库:
0: jdbc:hive2://localhost:10000> create database db01; No rows affected (0.315 seconds) 0: jdbc:hive2://localhost:10000> use db01;
而后建立一张表:
create table log_dev2( id int, name string, create_time int, creator string, info string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as orcfile;
当库、表建立完成后,在HDFS中会有对应的目录文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db Found 1 items drwxr-xr-x - root supergroup 0 2020-11-13 11:30 /user/hive/warehouse/db01.db/log_dev2 [root@hadoop ~]#
准备测试数据:
[root@hadoop ~]# cat datax/db.csv 1,建立用户,1554099545,hdfs,建立用户 test 2,更新用户,1554099546,yarn,更新用户 test1 3,删除用户,1554099547,hdfs,删除用户 test2 4,更新用户,1554189515,yarn,更新用户 test3 5,删除用户,1554199525,hdfs,删除用户 test4 6,建立用户,1554299345,yarn,建立用户 test5
DataX经过json格式的配置文件来定义ETL任务,建立一个json文件:vim csv2hive.json
,咱们要定义的ETL任务内容以下:
{ "setting":{ }, "job":{ "setting":{ "speed":{ "channel":2 } }, "content":[ { "reader":{ "name":"txtfilereader", "parameter":{ "path":[ "/root/datax/db.csv" ], "encoding":"UTF-8", "column":[ { "index":0, "type":"long" }, { "index":1, "type":"string" }, { "index":2, "type":"long" }, { "index":3, "type":"string" }, { "index":4, "type":"string" } ], "fieldDelimiter":"," } }, "writer":{ "name":"hdfswriter", "parameter":{ "defaultFS":"hdfs://192.168.243.161:8020", "fileType":"orc", "path":"/user/hive/warehouse/db01.db/log_dev2", "fileName":"log_dev2.csv", "column":[ { "name":"id", "type":"int" }, { "name":"name", "type":"string" }, { "name":"create_time", "type":"INT" }, { "name":"creator", "type":"string" }, { "name":"info", "type":"string" } ], "writeMode":"append", "fieldDelimiter":",", "compress":"NONE" } } } ] } }
- datax使用json做为配置文件,文件能够是本地的也能够是远程http服务器上面
- json配置文件最外层是一个
job
,job
包含setting
和content
两部分,其中setting
用于对整个job
进行配置,content
是数据的源和目的 setting
:用于设置全局channe|配置,脏数据配置,限速配置等,本例中只配置了channel个数1,也就是使用单线程执行数据传输content
:- reader:配置从哪里读数据
name
:插件名称,须要和工程中的插件名保持-致parameter
:插件对应的输入参数path
:源数据文件的路径encoding
:数据编码fieldDelimiter
:数据分隔符column
:源数据按照分隔符分割以后的位置和数据类型
- writer:配置将数据写到哪里去
name
:插件名称,须要和工程中的插件名保持一致parameter
:插件对应的输入参数path
:目标路径fileName
:目标文件名前缀writeMode
:写入目标目录的方式
- reader:配置从哪里读数据
经过DataX的Python脚本执行咱们定义的ETL任务:
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/csv2hive.json ... 任务启动时刻 : 2020-11-15 11:10:20 任务结束时刻 : 2020-11-15 11:10:32 任务总计耗时 : 12s 任务平均流量 : 17B/s 记录写入速度 : 0rec/s 读出记录总数 : 6 读写失败总数 : 0
查看HDFS中是否已存在相应的数据文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev2 Found 1 items -rw-r--r-- 3 root supergroup 825 2020-11-15 11:10 /user/hive/warehouse/db01.db/log_dev2/log_dev2.csv__f19a135d_6c22_4988_ae69_df39354acb1e [root@hadoop ~]#
到Hive中验证导入的数据是否符合预期:
0: jdbc:hive2://localhost:10000> use db01; No rows affected (0.706 seconds) 0: jdbc:hive2://localhost:10000> show tables; +-----------+ | tab_name | +-----------+ | log_dev2 | +-----------+ 1 row selected (0.205 seconds) 0: jdbc:hive2://localhost:10000> select * from log_dev2; +--------------+----------------+-----------------------+-------------------+----------------+ | log_dev2.id | log_dev2.name | log_dev2.create_time | log_dev2.creator | log_dev2.info | +--------------+----------------+-----------------------+-------------------+----------------+ | 1 | 建立用户 | 1554099545 | hdfs | 建立用户 test | | 2 | 更新用户 | 1554099546 | yarn | 更新用户 test1 | | 3 | 删除用户 | 1554099547 | hdfs | 删除用户 test2 | | 4 | 更新用户 | 1554189515 | yarn | 更新用户 test3 | | 5 | 删除用户 | 1554199525 | hdfs | 删除用户 test4 | | 6 | 建立用户 | 1554299345 | yarn | 建立用户 test5 | +--------------+----------------+-----------------------+-------------------+----------------+ 6 rows selected (1.016 seconds) 0: jdbc:hive2://localhost:10000>
MySQL数据导入Hive
接下来演示一下将MySQL数据导入Hive中。为了实现该功能,咱们须要使用到mysqlreader来从MySQL中读取数据,其官方文档以下:
首先,执行以下SQL构造一些测试数据:
CREATE DATABASE datax_test; USE `datax_test`; CREATE TABLE `dev_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `create_time` int(11) DEFAULT NULL, `creator` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `info` varchar(2000) COLLATE utf8_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1069 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; insert into `dev_log`(`id`,`name`,`create_time`,`creator`,`info`) values (1,'建立用户',1554099545,'hdfs','建立用户 test'), (2,'更新用户',1554099546,'yarn','更新用户 test1'), (3,'删除用户',1554099547,'hdfs','删除用户 test2'), (4,'更新用户',1554189515,'yarn','更新用户 test3'), (5,'删除用户',1554199525,'hdfs','删除用户 test4'), (6,'建立用户',1554299345,'yarn','建立用户 test5');
而后到Hive的db01
数据库中再建立一张表:
create table log_dev( id int, name string, create_time int, creator string, info string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;
建立ETL任务的配置文件:
[root@hadoop ~]# vim datax/mysql2hive.json
文件内容以下:
{ "job":{ "setting":{ "speed":{ "channel":3 }, "errorLimit":{ "record":0, "percentage":0.02 } }, "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "username":"root", "password":"123456a.", "column":[ "id", "name", "create_time", "creator", "info" ], "where":"creator='${creator}' and create_time>${create_time}", "connection":[ { "table":[ "dev_log" ], "jdbcUrl":[ "jdbc:mysql://192.168.1.11:3306/datax_test?serverTimezone=Asia/Shanghai" ] } ] } }, "writer":{ "name":"hdfswriter", "parameter":{ "defaultFS":"hdfs://192.168.243.161:8020", "fileType":"text", "path":"/user/hive/warehouse/db01.db/log_dev", "fileName":"log_dev3.csv", "column":[ { "name":"id", "type":"int" }, { "name":"name", "type":"string" }, { "name":"create_time", "type":"INT" }, { "name":"creator", "type":"string" }, { "name":"info", "type":"string" } ], "writeMode":"append", "fieldDelimiter":",", "compress":"GZIP" } } } ] } }
- mysqlreader支持传入
where
条件来过滤须要读取的数据,具体参数能够在执行datax脚本时传入,咱们能够经过这种变量替换的方式实现增量同步的支持
mysqlreader默认的驱动包是5.x的,因为我这里的MySQL版本是8.x,因此须要替换一下mysqlreader中的驱动包:
[root@hadoop ~]# cp /usr/local/src/mysql-connector-java-8.0.21.jar /usr/local/datax/plugin/reader/mysqlreader/libs/ [root@hadoop ~]# rm -rf /usr/local/datax/plugin/reader/mysqlreader/libs/mysql-connector-java-5.1.34.jar
而后执行该ETL任务:
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547" ... 任务启动时刻 : 2020-11-15 11:38:14 任务结束时刻 : 2020-11-15 11:38:25 任务总计耗时 : 11s 任务平均流量 : 5B/s 记录写入速度 : 0rec/s 读出记录总数 : 2 读写失败总数 : 0
查看HDFS中是否已存在相应的数据文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev Found 1 items -rw-r--r-- 3 root supergroup 84 2020-11-15 11:38 /user/hive/warehouse/db01.db/log_dev/log_dev3.csv__d142f3ee_126e_4056_af49_b56e45dec1ef.gz [root@hadoop ~]#
到Hive中验证导入的数据是否符合预期:
0: jdbc:hive2://localhost:10000> select * from log_dev; +-------------+---------------+----------------------+------------------+---------------+ | log_dev.id | log_dev.name | log_dev.create_time | log_dev.creator | log_dev.info | +-------------+---------------+----------------------+------------------+---------------+ | 4 | 更新用户 | 1554189515 | yarn | 更新用户 test3 | | 6 | 建立用户 | 1554299345 | yarn | 建立用户 test5 | +-------------+---------------+----------------------+------------------+---------------+ 2 rows selected (0.131 seconds) 0: jdbc:hive2://localhost:10000>
数据治理简介
将数据采集到数仓后所面临的问题:
- 相比传统数仓大数据时代数据更加多样、更加复杂、数据量更大
- 随处可见的数据不统1、难以提高的数据质量、难以完成的数据模型梳理
- 多种采集工具、多种存储方式使数据仓库or数据湖逐渐变成数据沼泽
数据治理须要解决的问题:
- 数据不可知:用户不知道有哪些数据、不知道数据和业务的关系
- 数据不可控:没有统一的数据标准,数据没法集成和统一
- 数据不可取:用户不能便捷的取到数据,或者取到的数据不可用
- 数据不可联:数据之间的关系没有体现出来,数据深层价值没法体现
数据治理的目标:
- 创建统一数据标准与数据规范,保障数据质量
- 制定数据管理流程,把控数据整个生命周期
- 造成平台化工具,提供给用户使用
数据治理:
- 数据治理包括元数据管理、数据质量管理、数据血缘管理等
- 数据治理在数据采集、数据清洗、数据计算等各个环节
- 数据治理可贵不是技术,而是流程、协同和管理
元数据管理:
- 管理数据的库表结构等schema信息
- 数据存储空间、读写记录、权限归属及其余各种统计信息
数据血缘管理:
- 数据之间的血缘关系及生命周期
- B表的数据从A表汇总而来,那么B和A表就具备血缘关系
- 数据的业务属性信息和业务数据模型
数据治理步骤简述:
- 统一数据规范和数据定义,打通业务模型和技术模型
- 提高数据质量,实现数据全生命周期管理
- 挖掘数据价值,帮助业务人员便捷灵活的使用数据
数据治理与周边系统:
- ODS、DWD、DM等各层次元数据归入数据治理平台集中管理
- 数据采集及处理流程中产生的元数据归入数据治理平台,并创建血缘关系
- 提供数据管理的服务接口,数据模型变动及时通知上下游
Apache Atlas数据治理
常见的数据治理工具:
- Apache Atlas:Hortonworks主推的数据治理开源项目
- Metacat:Netflix开源的元数据管理、数据发现组件
- Navigator:Cloudera提供的数据管理的解决方案
- WhereHows:LinkedIn内部使用并开源的数据管理解决方案
Apache Altas:
- 数据分类:自动捕获、定义和注释元数据,对数据进行业务导向分类
- 集中审计:捕获全部步骤、应用及数据交互的访问信息
- 搜索与血缘:基于分类和审计关联数据与数据的关系,并经过可视化的方式展示
Apache Altas架构图:
- Type System:对须要管理的元数据对象抽象的实体,由类型构成
- Ingest\Export:元数据的自动采集和导出工具,Export能够做 为事件进行触发,使用户能够及时响应
- Graph Engine:经过图数据库和图计算弓|擎展示数据之间的关系
元数据捕获:
- Hook:来自各个组件的Hook自动捕获数据进行存储
- Entity:集成的各个系统在操做时触发事件进行写入
- 获取元数据的同时,获取数据之间的关联关系,构建血缘