Sqoop 是 Cloudera 公司创造的一个数据同步工具,如今已经彻底开源了。 html
目前已是 hadoop 生态环境中数据迁移的首选,另外还有 ali 开发的 DataX 属于同类型工具,因为社区的普遍使用和文档的健全,调研以后决定使用 Sqoop 来作咱们以后数据同步的工具。java
咱们首先来看下 Sqoop 的工做流mysql
他将咱们传统的关系型数据库 | 文件型数据库 | 企业数据仓库 同步到咱们的 hadoop 生态集群中。sql
同时也能够将 hadoop 生态集群中的数据导回到传统的关系型数据库 | 文件型数据库 | 企业数据仓库中。shell
那么 Sqoop 如何抽取数据呢数据库
1. 首先 Sqoop 去 rdbms 抽取元数据。segmentfault
2. 当拿到元数据以后将任务切成多个任务分给多个 map。api
3. 而后再由每一个 map 将本身的任务完成以后输出到文件。缓存
Sqoop import Command:服务器
先从最简单的任务开始
sqoop import\
--connect jdbc:mysql://10.66.38.125:3306/user_db \
--username cloudera \
--password secretkey \
--table department \
--target-dir /sqoopdata/departments \ # HDFS 的目标存储位置
--where "department_id = 1000" \ # 指定条件,只有达成这个条件的才会被 import 进来
-- m 1
就这个语句就能够将咱们关系型数据库中的某个表 import 进 HDFS 的某个位置。
一样咱们能够 import 某些字段进来生成文件
sqoop import \ --connect jdbc:mysql://localhost:3306/retry_db \ --username cloudera \ --password secret \ --table departments \ --columns "dept_id, name" \ # 指定须要的字段 --as-avrodatafile # 指定存成 avro 数据文件
若是咱们要 import 一个库里面的全部表可使用
sqoop import-all-tables \ --connect jdbc:mysql://localhost:3306/retry_db \ --username cloudera \ --password secret \ --warehouse-dir /mydata # HDFS parent for table 这个会将全部这些表都放到 HDFS 这个文件夹下面
Sqoop import Command:
咱们将数据从 Hadooop HDFS 导出向 RDBMS
sqoop export \ --connect jdbc:mysql://localhost:3306/retry_db \ --username cloudera \ --password departments \ --export-dir /sqoopdata/departments \ # HDFS source path for the export --table departments
Sqoop Job:
Sqoop 提供一种能力,能够把咱们常常会执行的任务存储成 jobs. 这些 jobs 能够在将来任何一个时间点被咱们拿来使用。
sqoop job \ --create job_name \ --import \ --connect jdbc:mysql://localhost:3306/retry_db \ --username cloudera \ --password departments
经常使用姿式上面就介绍完了,当咱们须要将 MySQL 数据同步到 Hive 去的时候若是表尚未建立咱们只须要执行:
sudo-u hive sqoop import \ --connect jdbc:mysql://10.66.38.15:3306/user \ # 链接须要被同步的 MySQL --username xxx \ --password xxx \ --table user \ # 须要被同步的表 --delete-target-dir \ # 以前有同步的文件已经存在删除掉- m 1 \ # 开一个 map 这个值得注意,不是每一个 source 表均可以轻松被分为多个 map 的。若是你这里要填写非 1 的数最好去熟悉一些细节 --hive-import \ --hive-tableuser.user \
--create-hive-table \ # 建立 hive 表
--hive-drop-import-delims # Drops \n, \r, and \01 from string fields when importing to Hive.
若是是表已经建立好而须要全量同步数据执行:
sudo -u hive sqoop import\ --connect jdbc:mysql://10.66.38.125:16033/user \ --username xxx \ --password xxx \ --table user \ --delete-target-dir \ --hive-overwrite \ # 全量重写数据 - m 1 \ --hive-import \ --hive-table user.user \ --hive-drop-import-delims
一样的 Sqoop 还支持 Hive 的增量同步。可是基于 mapreduce 的全量同步速度也快得超出想象。实测在三机集群上(12核 | 32内存)机器上1分钟基本能完成对 20 个字段左右的表 250w 条记录的抽取。而且对目标数据库机器的压力不算大。是很是理想的大数据同步工具。
Sqoop 的配置参数很是之多,在使用的时候建议先撸一遍文档(文档不长大概撸一次 2 3 个小时左右),找到本身须要注意的地方和更多适合本身的功能在使用的时候就能避免踩坑。好比上面使用的 hive-drop-import-delims 参数的使用就是还没看完文档就使用形成的坑。咱们数据库中有字段没有过滤 \n 。有个用户的名字被误操做使用 \n 开头致使 hive 误觉得遇到了换行符,该数据不只错乱并且后面字段所有被置为 NULL。要避免这种问题一方面要对这个使用链上各个组件有所了解,更是应该读一读文档能够最大程度的避免踩坑。
----------------------------------------------------------分割线----------------------------------------------------------
下面将纪录一下我全量阅读 Sqoop 文档以为须要纪录的一些东西。
首先咱们上面看到命令 Sqoop Command 这个 Command 实际上是指定 Sqoop 使用哪一种 Tool 。
$ sqoop help usage: sqoop COMMAND [ARGS] Available commands: codegen Generate code to interact with database records create-hive-table Import a table definition into Hive eval Evaluate a SQL statement and display the results export Export an HDFS directory to a database table help List available commands import Import a table from a database to HDFS import-all-tables Import tables from a database to HDFS import-mainframe Import mainframe datasets to HDFS list-databases List available databases on a server list-tables List available tables in a database version Display version information See 'sqoop help COMMAND' for information on a specific command.
能够看到我上面举例的全部内容都只是简单的使用到了 export 和 import 还有 import-all-tables 工具。 还有很是多的工具没有使用到。
由于 sqoop 是依赖 hadoop 生态的关系,因此也有响应的查找链,由于使用了 CDH 大礼包,因此我只是简单的安装了一下,相关的依赖都已经被配置好了包括 path
lrwxrwxrwx 1 root root 23 Nov 13 20:55 /usr/bin/sqoop -> /etc/alternatives/sqoop
下面咱们在使用 import tool 的时候遵循这个原则:
sqoop import (generic-args) (import-args) sqoop-import (generic-args) (import-args) While the Hadoop generic arguments must precede any import arguments, you can type the import arguments in any order with respect to one another.
当咱们在写语句的时候应该首先使用了 generic-args 参数能够是如下的参数。
Argument | Description |
---|---|
--connect <jdbc-uri> |
Specify JDBC connect string |
--connection-manager <class-name> |
Specify connection manager class to use |
--driver <class-name> |
Manually specify JDBC driver class to use |
--hadoop-mapred-home <dir> |
Override $HADOOP_MAPRED_HOME |
--help |
Print usage instructions |
--password-file |
Set path for a file containing the authentication password |
-P |
Read password from console |
--password <password> |
Set authentication password |
--username <username> |
Set authentication username |
--verbose |
Print more information while working |
--connection-param-file <filename> |
Optional properties file that provides connection parameters |
--relaxed-isolation |
Set connection transaction isolation to read uncommitted for the mappers. |
后面的 import args 可选项就很是丰富。
好比能够导入校验使用的 class 删除控制参数啥的。
Argument | Description |
---|---|
--validate |
Enable validation of data copied, supports single table copy only. |
--validator <class-name> |
Specify validator class to use. |
--validation-threshold <class-name> |
Specify validation threshold class to use. |
--validation-failurehandler <class-name> |
Specify validation failure handler class to use. |
Argument | Description |
---|---|
--append |
Append data to an existing dataset in HDFS |
--as-avrodatafile |
Imports data to Avro Data Files |
--as-sequencefile |
Imports data to SequenceFiles |
--as-textfile |
Imports data as plain text (default) |
--as-parquetfile |
Imports data to Parquet Files |
--boundary-query <statement> |
Boundary query to use for creating splits |
--columns <col,col,col…> |
Columns to import from table |
--delete-target-dir |
Delete the import target directory if it exists |
--direct |
Use direct connector if exists for the database |
--fetch-size <n> |
Number of entries to read from database at once. |
--inline-lob-limit <n> |
Set the maximum size for an inline LOB |
-m,--num-mappers <n> |
Use n map tasks to import in parallel |
-e,--query <statement> |
Import the results of statement . |
--split-by <column-name> |
Column of the table used to split work units. Cannot be used with --autoreset-to-one-mapper option. |
--split-limit <n> |
Upper Limit for each split size. This only applies to Integer and Date columns. For date or timestamp fields it is calculated in seconds. |
--autoreset-to-one-mapper |
Import should use one mapper if a table has no primary key and no split-by column is provided. Cannot be used with --split-by <col> option. |
--table <table-name> |
Table to read |
--target-dir <dir> |
HDFS destination dir |
--temporary-rootdir <dir> |
HDFS directory for temporary files created during import (overrides default "_sqoop") |
--warehouse-dir <dir> |
HDFS parent for table destination |
--where <where clause> |
WHERE clause to use during import |
-z,--compress |
Enable compression |
--compression-codec <c> |
Use Hadoop codec (default gzip) |
--null-string <null-string> |
The string to be written for a null value for string columns |
--null-non-string <null-string> |
The string to be written for a null value for non-string columns |
包括支持 free-form query .使用 --query 参数而后写一个 sql 来过滤本身想要 import 的数据 just like
$ sqoop import \ --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \ --split-by a.id --target-dir /user/foo/joinresults
这个使用方法必需要使用 --target-dir
若是须要控制并行操做广泛使用的是 -m 参数,--num-mapers参数。咱们能够显示的指定使用的用来并行分配的键,使用例如 --split-by employee_id 达到目标。
若是说咱们没有使用 --split-by 参数主键也不是 int 型,可能会致使指定 -m 大于 1 的时候出问题。由于程序没有办法知道应该根据哪一个键来分配 map 任务。
另外咱们可使用 --autoreset-to-one-mapper
选项 --autoreset-to-one-mapper
Import should use one mapper if a table has no primary key and no split-by column is provided. Cannot be used with --split-by <col>
option.
使用 Oozie 调起 Sqoop job 执行任务的时候要注意一个 Controlling Distributed Cache 的问题。在第一个Sqoop做业期间,Oozie只会在每一个工做节点上对Sqoop依赖项进行一次本地化,并会在工做节点上重用jar来执行子节点做业。在Oozie启动Sqoop命令时使用option - skip-dist-cache,能够跳过Sqoop将依赖项复制到做业缓存并保存大量I/O的步骤。达到优化的目的。
在控制导入的过程当中也有不少优化的地方能够作,例如咱们在对关系行数据库 MySQL 进行导入的时候,能够经过使用关键字 --direct 加速导入的速度。他的原理是默认状况下咱们会使用 JDBC 对数据库进行链接,可是有一些数据库提供了更高性能能够指定数据库进行转移的工具。好比 MySQL 提供的 MySQL 提供的工具 mysqldump 使用 --direct 参数就能够尝试让 Sqoop 使用这种方式去导出数据,可能会获得更高的效能。当咱们在使用 --direct option 的时候还能够传递一些潜在的参数给这个命令相似于这样 将命令跟在 -- 后面
$ sqoop import --connect jdbc:mysql://server.foo.com/db --table bar \ --direct -- --default-character-set=latin1
就能够将后面的 --default-character-set=latin1 传递给 mysqldump 。
在 import 表的时候有两个指定路径的参数是冲突的 --warehouse-dir 和 --target-dir 都用于指定将生成的表放到指定的这个目录下面。他们俩是冲突的,指定其中一个 option 便可。
在默认状况下 import 这个工具都会将表导到一个新的路径下面。若是路径下面已经有相同名字的文件存在了,将会被拒绝导入。
若是使用 --append 参数 Sqoop将会将文件导入到临时的文件目录,而后重命名该文件成不与目标文件夹里面名字冲突的名字。
7.2.7 掌控事务隔离级别(Controlling transaction isolation)
Sqoop 提供读取数据库 read-uncommitted 事务的能力,只须要带上参数 --relaxed-isolation 便可。这个操做真是很是骚啊,通常应该不会用到并且也不是全部数据库都支持,好比官方文档说 ORACLE 就是不支持的。
7.2.8 掌控 mapping 时候的字段类型(Controlling type mapping)
能够对指定同步的表进行 schema 的映射转换,而且能够指定经过 java 或者 hive 类型的转换。例如:
Argument Description --map-column-java <mapping> Override mapping from SQL to Java type for configured columns. --map-column-hive <mapping> Override mapping from SQL to Hive type for configured columns. Sqoop is expecting comma separated list of mapping in form <name of column>=<new type>. For example: $ sqoop import ... --map-column-java id=String,value=Integer
另外须要注意的是 --map-column-hive 使用该参数须要使用 urlencode 对转换 key value 进行转换。例如
use DECIMAL(1%2C%201) instead of DECIMAL(1, 1)
若是转换不正确,Sqoop 会 raise exception
7.2.10 增量更新(Incremental Imports)
关于使用 Sqoop 进行增量更新处理, Sqoop 提供了三个字段来处理增量更新相关的内容
Argument | Description |
---|---|
--check-column (col) |
Specifies the column to be examined when determining which rows to import. (the column should not be of type CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR) |
--incremental (mode) |
Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified . |
--last-value (value) |
Specifies the maximum value of the check column from the previous import. |
Sqoop 自己支持两种不一样的方式进行增量更新,分别是 append 和 lastmodified 咱们使用 --incremental 参数去指定要使用的增量更新类型。
增量更新的文章有不少基本上创建在两个基础上。(以前的数据若是被 update 没有办法经过这两种增量更新机制被更新)
1. 能够提供相似于自增 id 这样的字段,而且小于这个点的字段能够从上次这个点位继续日后增长。使用 --last-value 须要注意的是可使用 Sqoop job 在第一次指定了开始的 last-value 值以后 Sqoop 会保存下来此次执行完以后 last-value 值的节点,下次执行的时候会基于这个继续执行。
2. 能够提供一个最后修改的字段,例如 update_time 这样的字段,全部大于这个 update_time 时间的字段将在下个节点被增量追加到后面。--check-column update_time
咱们一般导入两种格式的文件形式,一种是 textfile 也是默认类型。还有一种是 SequenceFiles
咱们能够经过指定 --as-textfile 参数显示指定使用 textfile 导入。textfile 又称 delimited text 在非二进制数据状况下很是通用,并且很容易支持相似于像 Hive 这种数据库表的生成。
SequenceFiels 是一种二进制格式用于往自定义的记录指定的 data types 中存储独立的记录。这些 data types 表现为 java 的类。
另外咱们也可使用表协议 好比咱们可使用 Apache Avro。
默认状况下 Sqoop 不会帮咱们压缩文件使用 -z 或者 --compress 参数或者使用其余压缩参数好比 --compression-codec 对 SequenceFile text 或者 Avro 文件进行压缩。
Sqoop 对 blob 和 clob columns 都有特别的处理方式。他们尽可能不要像常规字段这样所有 load 进内存进行操做。而是使用流式的方法来进行处理,而且和其余数据进行内联存储。(这一块我彻底没有看懂是什么意思,水平不够能够自行前往官方文档查看。。。。。。)
Table 6. Output line formatting arguments:
Argument | Description |
---|---|
--enclosed-by <char> |
Sets a required field enclosing character |
--escaped-by <char> |
Sets the escape character |
--fields-terminated-by <char> |
Sets the field separator character |
--lines-terminated-by <char> |
Sets the end-of-line character |
--mysql-delimiters |
Uses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ' |
--optionally-enclosed-by <char> |
Sets a field enclosing character |
默认状况下 Sqoop 会使用逗号 comma(,) 来做为字段之间的分隔符,使用换行符 \n 来区别每一条记录。
Sqoop 官方文档推荐咱们使用 unambiguous 也就是显示清晰的去指定字段分隔符和行分隔符。好比直接使用 --mysql-delimiters
下面的叙述我想了好久想翻译成中文我都以为不是很直接 因此仍是直接贴文档吧。
If unambiguous delimiters cannot be presented, then use enclosing and escaping characters. The combination of (optional) enclosing and escaping characters will allow unambiguous parsing of lines. For example, suppose one column of a dataset contained the following values:
Some string, with a comma. Another "string with quotes"The following arguments would provide delimiters which can be unambiguously parsed:
$ sqoop import --fields-terminated-by , --escaped-by \\ --enclosed-by '\"' ...(Note that to prevent the shell from mangling the enclosing character, we have enclosed that argument itself in single-quotes.)
The result of the above arguments applied to the above dataset would be:
"Some string, with a comma.","1","2","3"... "Another \"string with quotes\"","4","5","6"...Here the imported strings are shown in the context of additional columns (
"1","2","3"
, etc.) to demonstrate the full effect of enclosing and escaping. The enclosing character is only strictly necessary when delimiter characters appear in the imported text. The enclosing character can therefore be specified as optional:$ sqoop import --optionally-enclosed-by '\"' (the rest as above)...Which would result in the following import:
"Some string, with a comma.",1,2,3... "Another \"string with quotes\"",4,5,6...
Note Even though Hive supports escaping characters, it does not handle escaping of new-line character. Also, it does not support the notion of enclosing characters that may include field delimiters in the enclosed string. It is therefore recommended that you choose unambiguous field and record-terminating delimiters without the help of escaping and enclosing characters when working with Hive; this is due to limitations of Hive’s input parsing abilities.
The
--mysql-delimiters
argument is a shorthand argument which uses the default delimiters for themysqldump
program. If you use themysqldump
delimiters in conjunction with a direct-mode import (with--direct
), very fast imports can be achieved.While the choice of delimiters is most important for a text-mode import, it is still relevant if you import to SequenceFiles with
--as-sequencefile
. The generated class'toString()
method will use the delimiters you specify, so subsequent formatting of the output data will rely on the delimiters you choose.
Table 8. Hive arguments:
Argument | Description |
---|---|
--hive-home <dir> |
Override $HIVE_HOME |
--hive-import |
Import tables into Hive (Uses Hive’s default delimiters if none are set.) |
--hive-overwrite |
Overwrite existing data in the Hive table. |
--create-hive-table |
If set, then the job will fail if the target hive |
table exists. By default this property is false. | |
--hive-table <table-name> |
Sets the table name to use when importing to Hive. |
--hive-drop-import-delims |
Drops \n, \r, and \01 from string fields when importing to Hive. |
--hive-delims-replacement |
Replace \n, \r, and \01 from string fields with user defined string when importing to Hive. |
--hive-partition-key |
Name of a hive field to partition are sharded on |
--hive-partition-value <v> |
String-value that serves as partition key for this imported into hive in this job. |
--map-column-hive <map> |
Override default mapping from SQL type to Hive type for configured columns. If specify commas in this argument, use URL encoded keys and values, for example, use DECIMAL(1%2C%201) instead of DECIMAL(1, 1). |
咱们想要使用 Sqoop 抽取 RDBMS 的数据到 Hive 多是再常见不过的情形了,因此这一部分很重要也多是咱们最常使用的部分。
Sqoop 抽取 RDBMS 的数据到 Hive 会先将数据抽取出来在 HDFS 上的指定路径上放一下。若是指定路径上已经有文件,可是 Hive 里面却没有你的表你还须要指定 --delete-target-dir 来删除 HDFS 的文件,而后从新上传一份。当上传到 HDFS 结束以后,会使用 Hive 的命令 LOAD DATA INPATH 将文件移动到 Hive 的 warehouse 目录下面若是指定了 Hive 表的建立表参数会直接建立 Hive 表而且映射上数据。
若是表已经存在了 可使用 --hive-overwrite 将数据直接覆盖。虽然Hive支持转义字符,但它不处理换行字符。此外,它不支持在封闭字符串中包含字段分隔符的封闭字符的概念。所以,在使用Hive时,建议您选择明确的字段和记录终止分隔符,而无需转义和包围字符;这是因为Hive的输入解析能力的限制。若是您在将数据导入到Hive时使用了--escapby,--enclosed-by, or -optionally-enclosed-by, Sqoop将打印一条警告消息。
Hive 默认会使用 \n 分割行,使用\01 分割字段。若是说咱们的数据里面有这些字段就可能会有冲突,咱们须要使用 --hive-drop-import-delims 把这些都替换掉。上面表能够参照这个 option 的意义。另外也可使用 --hive-delims-replacement 将冲突的字段给替换掉。
另外还有一个值得注意的地方 Hive 表默认将从 RDBMS 里面抽取出来的 NULL value 数据转换成 null string 。这个在使用的时候就会出现问题,由于以前是一个空,如今却变成了一个 null 字符串。因此咱们须要处理一下, Hive在本身的体系里面使用 \N 来表示 NULL 咱们使用 --null-string 和 --null-non-string 参数处理 import job 使用 --input-null-string 和 --input-null-non-string 处理 export job 。举个🌰
$ sqoop import ... --null-string '\\N' --null-non-string '\\N'
另外咱们可使用 --hive-partition-key 和 --hive-partition-value 参数来指定分区键提高 hive 的处理能力。
这一块 sqoop 只支持单分区导入。
这一块更详细的能够参考一下 hive 文档。
Problem: Sqoop is treating TINYINT(1) columns as booleans, which is for example causing issues with HIVE import. This is because by default the MySQL JDBC connector maps the TINYINT(1) to java.sql.Types.BIT, which Sqoop by default maps to Boolean.
Solution: A more clean solution is to force MySQL JDBC Connector to stop converting TINYINT(1) to java.sql.Types.BIT by adding tinyInt1isBit=false
into your JDBC path (to create something like jdbc:mysql://localhost/test?tinyInt1isBit=false
). Another solution would be to explicitly override the column mapping for the datatype TINYINT(1) column. For example, if the column name is foo, then pass the following option to Sqoop during import: --map-column-hive foo=tinyint. In the case of non-Hive imports to HDFS, use --map-column-java foo=integer.
不知不觉感受写了不少。。。发现才把 import 工具写完。其实 Sqoop 使用最多的场景也就是 import 工具的场景,其余场景使用的频率应该不高,我找时间再整理一篇文章来写别的工具的功能hh 这篇就到这里吧!
Reference:
https://archive.cloudera.com/cdh6/6.0.1/docs/sqoop-1.4.7-cdh6.0.1/SqoopUserGuide.html Sqoop User Guide (v1.4.7-cdh6.0.1)
https://blog.csdn.net/Gavin_chun/article/details/78314065 SQOOP从MySQL导入数据到Hive
https://segmentfault.com/a/1190000002532293 sqoop 导入关系数据库到 hive
https://blog.csdn.net/myrainblues/article/details/43673129 sqoop使用中文手册
https://blog.csdn.net/lyp5257918/article/details/53820690 sqoop抽取文本数据到hive因为存在空字符致使字段错位和丢失错误
https://www.youtube.com/watch?v=72M5lMP8dMg COSO IT Sqoop Tutorial
https://blog.csdn.net/taisenki/article/details/78974121 Sqoop 数据导入多分区Hive解决方法