DataX-On-Hadoop即便用hadoop的任务调度器,将DataX task(Reader->Channel->Writer)调度到hadoop执行集群上执行。这样用户的hadoop数据能够经过MR任务批量上传到MaxCompute、RDS等,不须要用户提早安装和部署DataX软件包,也不须要另外为DataX准备执行集群。可是能够享受到DataX已有的插件逻辑、流控限速、鲁棒重试等等。html
DataX https://github.com/alibaba/DataX 是阿里巴巴集团内被普遍使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、MaxCompute 等各类异构数据源之间高效的数据同步功能。 DataX同步引擎内部实现了任务的切分、调度执行能力,DataX的执行不依赖Hadoop环境。java
DataX-On-Hadoop是DataX针对Hadoop调度环境实现的版本,使用hadoop的任务调度器,将DataX task(Reader->Channel->Writer)调度到hadoop执行集群上执行。这样用户的hadoop数据能够经过MR任务批量上传到MaxCompute等,不须要用户提早安装和部署DataX软件包,也不须要另外为DataX准备执行集群。可是能够享受到DataX已有的插件逻辑、流控限速、鲁棒重试等等。node
目前DataX-On-Hadoop支持将Hdfs中的数据上传到公共云MaxCompute当中。mysql
运行DataX-On-Hadoop步骤以下:git
./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob ./bvt_case/speed.json
本例子的Hdfs Reader 和Odps Writer配置信息以下:github
{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/big_data*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "id", "name" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }
针对上面的例子,介绍几个性能、脏数据的参数:正则表达式
做业级别的性能参数配置位置示例:算法
{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": {} } ] } }
另外,介绍几个变量替换、做业命名参数:sql
"path": "/tmp/test_datax/dt=${dt}/abc.txt"
任务执行时能够配置以下传参,使得一份配置代码能够屡次使用:数据库
./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob datax.json -p "-Ddt=20170427 -Dbizdate=123" -t hdfs_2_odps_mr
-t hdfs_2_odps_mr
读写插件详细配置介绍,请见后续第二、3部分。
Hdfs Reader提供了读取分布式文件系统数据存储的能力。在底层实现上,Hdfs Reader获取分布式文件系统上文件的数据,并转换为DataX传输协议传递给Writer。
Hdfs Reader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不作压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,Hdfs Reader实现上类比TxtFileReader,有诸多类似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile作了优化。据官方文档介绍,这种文件格式能够提供一种高效的方法来存储Hive数据。Hdfs Reader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前Hdfs Reader支持的功能以下:
支持textfile、orcfile、rcfile、sequence file、csv和parquet格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量。
支持递归读取、支持正则表达式("*"和"?")。
支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。
支持sequence file数据压缩,目前支持lzo压缩方式。
多个File能够支持并发读取。
csv类型支持压缩格式有:gzip、bz二、zip、lzo、lzo_deflate、snappy。
咱们暂时不能作到:
{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": {} } ] } }
path
描述:要读取的文件路径,若是要读取多个文件,可使用正则表达式"*",注意这里能够支持填写多个路径。
当指定通配符,HdfsReader尝试遍历出多个文件信息。例如: 指定/*表明读取/目录下全部的文件,指定/yixiao/*表明读取yixiao目录下全部的文件。HdfsReader目前只支持"*"和"?"做为文件通配符。
特别须要注意的是,DataX会将一个做业下同步的全部的文件视做同一张数据表。用户必须本身保证全部的File可以适配同一套schema信息。而且提供给DataX权限可读。
必选:是
默认值:无
defaultFS
fileType
描述:文件的类型,目前只支持用户配置为"text"、"orc"、"rc"、"seq"、"csv"。
text表示textfile文件格式
orc表示orcfile文件格式
rc表示rcfile文件格式
seq表示sequence file文件格式
csv表示普通hdfs文件格式(逻辑二维表)
特别须要注意的是,HdfsReader可以自动识别文件是orcfile、rcfile、sequence file仍是textfile或csv类型的文件,该项是必填项,HdfsReader在作数据同步以前,会检查用户配置的路径下全部须要同步的文件格式是否和fileType一致,若是不一致则会抛出异常
另外须要注意的是,因为textfile和orcfile是两种彻底不一样的文件格式,因此HdfsReader对这两种文件的解析方式也存在差别,这种差别致使hive支持的复杂复合类型(好比map,array,struct,union)在转换为DataX支持的String类型时,转换的结果格式略有差别,好比以map类型为例:
orcfile map类型经hdfsreader解析转换成datax支持的string类型后,结果为"{job=80, team=60, person=70}"
textfile map类型经hdfsreader解析转换成datax支持的string类型后,结果为"job:80,team:60,person:70"
从上面的转换结果能够看出,数据自己没有变化,可是表示的格式略有差别,因此若是用户配置的文件路径中要同步的字段在Hive中是复合类型的话,建议配置统一的文件格式。
若是须要统一复合类型解析出来的格式,咱们建议用户在hive客户端将textfile格式的表导成orcfile格式的表
column
描述:读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
默认状况下,用户能够所有按照string类型读取数据,配置以下:
用户能够指定column字段信息,配置以下:
对于用户指定column信息,type必须填写,index/value必须选择其一。
"column": ["*"]
{ "type": "long", "index": 0 //从本地文件文本第一列获取int字段 }, { "type": "string", "value": "alibaba" //HdfsReader内部生成alibaba的字符串字段做为当前字段 }
必选:是
默认值:所有按照string类型读取
fieldDelimiter
另外须要注意的是,HdfsReader在读取textfile数据时,须要指定字段分割符,若是不指定默认为',',HdfsReader在读取orcfile时,用户无需指定字段分割符,hive自己的默认分隔符为 "\u0001";若你想将每一行做为目的端的一列,分隔符请使用行内容不存在的字符,好比不可见字符"\u0001" ,分隔符不能使用\n
encoding
nullFormat
描述:文本文件中没法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串能够表示为null。
例如若是用户配置: nullFormat:"\N",那么若是源头数据是"\N",DataX视做null字段。
必选:否
默认值:无
compress
csvReaderConfig
默认值:无
常见配置:
"csvReaderConfig":{ "safetySwitch": false, "skipEmptyRecords": false, "useTextQualifier": false }
全部配置项及默认值,配置时 csvReaderConfig 的map中请严格按照如下字段名字进行配置:
hadoopConfig
描述:hadoopConfig里能够配置与Hadoop相关的一些高级参数,好比HA的配置。
"hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.youkuDfs.namenode1": "", "dfs.namenode.rpc-address.youkuDfs.namenode2": "", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
必选:否
默认值:无
minInputSplitSize
若是用户同步的hdfs文件是rcfile,因为rcfile底层存储的时候不一样的数据类型存储方式不同,而HdfsReader不支持对Hive元数据数据库进行访问查询,所以须要用户在column type里指定该column在hive表中的数据类型,好比该column是bigint型。那么type就写为bigint,若是是double型,则填写double,若是是float型,则填写float。注意:若是是varchar或者char类型,则须要填写字节数,好比varchar(255),char(30)等,跟hive表中该字段的类型保持一致,或者也能够填写string类型。
若是column配置的是*,会读取全部column,那么datax会默认以string类型读取全部column,此时要求column中的类型只能为String,CHAR,VARCHAR中的一种。
RCFile中的类型默认会转成DataX支持的内部类型,对照表以下:
RCFile在Hive表中的数据类型 | DataX 内部类型 |
---|---|
TINYINT,SMALLINT,INT,BIGINT | Long |
FLOAT,DOUBLE,DECIMAL | Double |
String,CHAR,VARCHAR | String |
BOOLEAN | Boolean |
Date,TIMESTAMP | Date |
Binary | Binary |
若是column配置的是*, 会读取全部列; 此时Datax会默认以String类型读取全部列. 若是列中出现Double等类型的话, 所有将转换为String类型。若是column配置读取特定的列的话, DataX中的类型和Parquet文件类型的对应关系以下:
Parquet格式文件的数据类型 | DataX 内部类型 |
---|---|
int32, int64, int96 | Long |
float, double | Double |
binary | Binary |
boolean | Boolean |
fixed_len_byte_array | String |
textfile,orcfile,sequencefile:
因为textfile和orcfile文件表的元数据信息由Hive维护并存放在Hive本身维护的数据库(如mysql)中,目前HdfsReader不支持对Hive元数据数据库进行访问查询,所以用户在进行类型转换的时候,必须指定数据类型,若是用户配置的column为"*",则全部column默认转换为string类型。HdfsReader提供了类型转换的建议表以下:
DataX 内部类型 | Hive表 数据类型 |
---|---|
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY |
Boolean | BOOLEAN |
Date | Date,TIMESTAMP |
其中:
特别提醒:
Hive在建表的时候,能够指定分区partition,例如建立分区partition(day="20150820",hour="09"),对应的hdfs文件系统中,相应的表的目录下则会多出/20150820和/09两个目录,且/20150820是/09的父目录。了解了分区都会列成相应的目录结构,在按照某个分区读取某个表全部数据时,则只需配置好json中path的值便可。
好比须要读取表名叫mytable01下分区day为20150820这一天的全部数据,则配置以下:
"path": "/user/hive/warehouse/mytable01/20150820/*"
ODPSWriter插件用于实现往ODPS(即MaxCompute)插入或者更新数据,主要提供给etl开发同窗将业务数据导入MaxCompute,适合于TB,GB数量级的数据传输。在底层实现上,根据你配置的 项目 / 表 / 分区 / 表字段 等信息,经过 Tunnel写入 MaxCompute 中。支持MaxCompute中如下数据类型:BIGINT、DOUBLE、STRING、DATATIME、BOOLEAN。下面列出ODPSWriter针对MaxCompute类型转换列表:
DataX 内部类型 | MaxCompute 数据类型 |
---|---|
Long | bigint |
Double | double |
String | string |
Date | datetime |
Boolean | bool |
在底层实现上,ODPSWriter是经过MaxCompute Tunnel写入MaxCompute系统的,有关MaxCompute的更多技术细节请参看 MaxCompute主站: https://www.aliyun.com/product/odps
{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "col1", "col2" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }
accessId
accessKey
project
table
partition
column
truncate
描述:ODPSWriter经过配置"truncate": true,保证写入的幂等性,即当出现写入失败再次运行时,ODPSWriter将清理前述数据,并导入新数据,这样能够保证每次重跑以后的数据都保持一致。
truncate选项不是原子操做!MaxCompute SQL没法作到原子性。所以当多个任务同时向一个Table/Partition清理分区时候,可能出现并发时序问题,请务必注意!针对这类问题,咱们建议尽可能不要多个做业DDL同时操做同一份分区,或者在多个并发做业启动前,提早建立分区。
必选:是
默认值:无
odpsServer
线上公网地址为 http://service.cn.maxcompute.aliyun.com/api
tunnelServer
线上公网地址为 http://dt.cn-beijing.maxcompute.aliyun-inc.com
blockSizeInMB
原文连接 本文为云栖社区原创内容,未经容许不得转载。