DataX3.0实现Mysql数据传递至HDFS与Hive

一、须要在hive中先行创建数据库和要写入的表。python

CREATE TABLE datax

   (id  BIGINT,

   test1  VARCHAR(25),

   test2  INT,

   test3  INT) ROW FORMAT DELIMITED

   FIELDS TERMINATED BY '\t'

   STORED AS TEXTFILE;

二、配置mysql写入hdfs的做业配置文件,json格式。mysql

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.xxx.x.x(数据库地址):3306(数据库端口)/data(数据库名称)"
                                ],
                                "table": [
                                    "tb_sensorstate"
                                ]
                            }
                        ],
                        "password": "hivedb",
                        "username": "xxxxx"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "BIGINT"
                            },
                            {
                                "name": "test1",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "test2",
                                "type": "INT"
                            },
                            {
                                "name": "test3",
                                "type": "INT"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.xxx.x.xxx(HDFS地址):9000(HDFS端口)",
                        "fieldDelimiter": "\t",
                        "fileName": "dataxtest",
                        "fileType": "text",
                        "path": "/usr/local/hadoop/hive/warehouse/datax(hive仓库地址)",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

三、运行job做业:git

   命令:python datax.py ../job/mysqlreader-hdfswriter.jsonsql

四、运行成功结果:数据库

Job配置项信息能够参看从git上下载的源码文件中的指定插件文件夹下的doc文件夹中的插件说明。json

本例中:并发

   一、mysqlreader中的column配置项中以[“*”]代替全部要从mysql指定表中读取的行。运行时:app

 您的配置文件中的列配置存在必定的风险. 由于您未配置读取数据库表的列,当您的表字段个数、类型有变更时,可能影响任务正确性甚至会运行出错。请检查您的配置并做出修改。oop

实际应用中应该依据实际状况配置相应的字段名和类型。测试

  二、本例配置项说明

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "插件名",
                    "parameter": {
                        "column": [
                            要读取的数据库表中的列的字段名和类型。
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "须要读取的mysql数据库地址"
                                ],
                                "table": [
                                    "须要读取的表名"
                                ]
                            }
                        ],
                        "password": "数据库链接密码",
                        "username": "数据库链接用户名"
                    }
                },
                "writer": {
                    "name": "插件名",
                    "parameter": {
                        "column": [
                           要写入的数据库表中的列的字段名和类型。
                        ],
                        "compress": "压缩格式",
                        "defaultFS": "HDFS的IP和端口",
                        "fieldDelimiter": "hive先行建表的时候使用的分隔符",
                        "fileName": "写入HDFS时的文件名前缀",
                        "fileType": "文件格式",
                        "path": "hadoop上hive仓库的地址",
                        "writeMode": "append"//append,写入前不作任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "2"//应该是设置并发传输通道数,可是datax的用户指导中未见详细说明。
            }
        }
    }
}

追加写入

Mysqlreader 中读取更新数据能够配置where或querysql配置项。

如下引用官方文档:

* **where***   
描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。
例如在作测试时,能够将where条件指定为limit 10;在实际业务场景中,每每会选择当天的数据进行同步,
能够将where条件指定为gmt_create > $bizdate 。<br />。
where条件能够有效地进行业务增量同步。若是不填写where语句,包括不提供where的key或者value,
DataX均视做同步全量数据。
* 必选:否 <br />
* 默认值:无 <br />

例:

"where":[
     "id>22"
   ]

再job中加入where项,便可实现id大于22的数据更新。

如下引用官方文档:

* **querySql*** 
描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户能够经过该配置型来自定义筛选SQL。
当用户配置了这一项以后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,
例如须要进行多表join后同步数据,
使用select a,b from table_a join table_b on table_a.id = table_b.id <br />
当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,
querySql优先级大于table、column、where选项。
* 必选:否 <br />
* 默认值:无 <br />
相关文章
相关标签/搜索