sqoop用法之mysql与hive数据导入导出

一. Sqoop介绍

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,能够将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到HadoopHDFS中,也能够将HDFS的数据导进到关系型数据库中。对于某些NoSQL数据库它也提供了链接器。Sqoop,相似于其余ETL工具,使用元数据模型来判断数据类型并在数据从数据源转移到Hadoop时确保类型安全的数据处理。Sqoop专为大数据批量传输设计,可以分割数据集并建立Hadoop任务来处理每一个区块。java

本文版本说明mysql

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6web

二. Mysql 数据导入到 Hive

1). 将mysqlpeople_access_log表导入到hiveweb.people_access_log,而且hive中的表不存在。
mysql中表people_access_log数据为:sql

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'

mysql数据导入hive的命令为:数据库

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log

该命令会启用一个mapreduce任务,将mysql数据导入到hive表,而且指定了hive表的分隔符为\t,若是不指定则为默认分隔符^A(ctrl+A)安全

参数说明bash

参数 说明
--connect mysql的链接信息
--username mysql的用户名
--password mysql的密码
--table 被导入的mysql源表名
-m 并行导入启用的map任务数量,与--num-mapper含义同样
--hive-import 插入数据到hive当中,使用hive默认的分隔符,可使用--fields-terminated-by参数来指定分隔符。
-- hive-table hive当中的表名

2). 也能够经过--query条件查询Mysql数据,将查询结果导入到Hiveapp

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
参数 说明
--query 后接查询语句,条件查询须要\$CONDITIONS and链接查询条件,这里的\$表示转义$,必须有.
--delete-target-dir 若是目标hive表目录存在,则删除,至关于overwrite.

三. Hive数据导入到Mysql

仍是使用上面的hiveweb.people_access_log,将其导入到mysql中的people_access_log_out表中.工具

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1

注意:mysqlpeople_access_log_out须要提早建好,不然报错:ErrorException: Table 'test.people_access_log_out' doesn't exist。若是有id自增列,hive表也须要有,hive表与mysql表字段必须彻底相同。oop

create table people_access_log_out like people_access_log;

执行完一个mr任务后,成功导入到mysqlpeople_access_log_out中.

四. mysql数据增量导入hive

实际中mysql数据会不断增长,这时候须要用sqoop将数据增量导入hive,而后进行海量数据分析统计。增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)。有几个核心参数:

  • –check-column:用来指定一些列,这些列在增量导入时用来检查这些数据是否做为增量数据进行导入,和关系型数据库中的自增字段及时间戳相似.注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不能够的,同时–check-column能够去指定多个列
  • –incremental:用来指定增量导入的模式,两种模式分别为AppendLastmodified
  • –last-value:指定上一次导入中检查列指定字段最大值

1. 基于递增列Append导入

接着前面的日志表,里面每行有一个惟一标识自增列ID,在关系型数据库中以主键形式存在。以前已经将id在0~6之间的编号的订单导入到Hadoop中了(这里为HDFS),如今一段时间后咱们须要将近期产生的新的订 单数据导入Hadoop中(这里为HDFS),以供后续数仓进行分析。此时咱们只须要指定–incremental 参数为append–last-value参数为6便可。表示只从id大于6后即7开始导入。

1). 建立hive

首先咱们须要建立一张与mysql结构相同的hive表,假设指定字段分隔符为\t,后面导入数据时候分隔符也须要保持一致。

2). 建立job

增量导入确定是屡次进行的,可能每隔一个小时、一天等,因此须要建立计划任务,而后定时执行便可。咱们都知道hive的数据是存在hdfs上面的,咱们建立sqoop job的时候须要指定hive的数据表对应的hdfs目录,而后定时执行这个job便可。

当前mysql中数据,hive中数据与mysql同样也有6条:

id user_id access_time ip url
1 15110101010 1577003281739 112.168.1.2 https://www.baidu.com
2 15110101011 1577003281749 112.16.1.23 https://www.baidu.com
3 15110101012 1577003281759 193.168.1.2 https://www.taobao.com
4 15110101013 1577003281769 112.18.1.2 https://www.baidu.com
5 15110101014 1577003281779 112.168.10.2 https://www.baidu.com
6 15110101015 1577003281789 11.168.1.2 https://www.taobao.com

增量导入有几个参数,保证下次同步的时候能够接着上次继续同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1

这里经过sqoop job --create job_name命令建立了一个名为mysql2hive_jobsqoop job

3). 执行job

建立好了job,后面只须要定时周期执行这个提早定义好的job便可。咱们先往mysql里面插入2条数据。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');

这样mysql里面就会多了2条数据。此时hive里面只有id1 ~ 6的数据,执行同步job使用如下命令。

sqoop job -exec mysql2hive_job

执行完成后,发现刚才mysql新加入的id7 ~ 8的两条数据已经同步到hive

hive> select * from web.people_access_log;
OK
1	15110101010	1577003281739	112.168.1.2	https://www.baidu.com
2	15110101011	1577003281749	112.16.1.23	https://www.baidu.com
3	15110101012	1577003281759	193.168.1.2	https://www.taobao.com
4	15110101013	1577003281769	112.18.1.2	https://www.baidu.com
5	15110101014	1577003281779	112.168.10.2	https://www.baidu.com
6	15110101015	1577003281789	11.168.1.2	https://www.taobao.com
7	15110101016	1577003281790	112.168.1.3	https://www.qq.com
8	15110101017	1577003281791	112.1.1.3	https://www.microsoft.com

因为实际场景中,mysql表中的数据,好比订单表等,一般是一致有数据进入的,这时候只须要将sqoop job -exec mysql2hive_job这个命令定时(好比说10分钟频率)执行一次,就能将数据10分钟同步一次到hive数据仓库。

2. Lastmodified 导入实战

append适合业务系统库,通常业务系统表会经过自增ID做为主键标识惟一性。Lastmodified适合ETL的数据根据时间戳字段导入,表示只导入比这个时间戳大,即比这个时间晚的数据。

1). 新建一张表

mysql中新建一张表people_access_log2,而且初始化几条数据:

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '访客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入数据:

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');

mysql里面的数据就是这样:

id user_id access_time ip url
1 15110101010 2019-12-28 16:23:10 112.168.1.200 https://www.baidu.com
2 15110101011 2019-12-28 16:23:33 112.16.1.2 https://www.baidu.com
3 15110101012 2019-12-28 16:23:41 112.168.1.2 https://www.taobao.com
4 15110101013 2019-12-28 16:23:46 112.168.10.2 https://www.baidu.com
5 15110101014 2019-12-28 16:23:52 112.168.1.2 https://www.jd.com
6 15110101015 2019-12-28 16:23:56 112.168.12.4 https://www.qq.

2). 初始化hive表:

初始化hive数据,将mysql里面的6条数据导入hive中,而且能够自动帮助咱们建立对应hive表,何乐而不为,不然咱们须要本身手动建立,完成初始化工做。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2

能够看到执行该命令后,启动了二一个mapreduce任务,这样6条数据就进入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1	15110101010	2019-12-28 16:23:10.0	112.168.1.200	https://www.baidu.com
2	15110101011	2019-12-28 16:23:33.0	112.16.1.2	https://www.baidu.com
3	15110101012	2019-12-28 16:23:41.0	112.168.1.2	https://www.taobao.com
4	15110101013	2019-12-28 16:23:46.0	112.168.10.2	https://www.baidu.com
5	15110101014	2019-12-28 16:23:52.0	112.168.1.2	https://www.jd.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)

3). 增量导入数据:

咱们再次插入一条数据进入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此时,mysql表里面已经有7条数据了,咱们使用incremental的方式进行增量的导入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \

2019-12-28 16:23:56就是第6条数据的时间,这里须要指定。报错了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:能够看到--merge-key or --append is required when using --incremental lastmodified意思是,这种基于时间导入模式,须要指定--merge-key或者--append参数,表示根据时间戳导入,数据是直接在末尾追加(append)仍是合并(merge),这里使用merge方式,根据id合并:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id

执行该命令后,与直接导入不一样,该命令启动了2个mapreduce任务,这样就把数据增量merge导入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1	15110101010	2019-12-28 16:23:10.0	112.168.1.200	https://www.baidu.com
2	15110101011	2019-12-28 16:23:33.0	112.16.1.2	https://www.baidu.com
3	15110101012	2019-12-28 16:23:41.0	112.168.1.2	https://www.taobao.com
4	15110101013	2019-12-28 16:23:46.0	112.168.10.2	https://www.baidu.com
5	15110101014	2019-12-28 16:23:52.0	112.168.1.2	https://www.jd.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
7	15110101016	2019-12-28 16:28:24.0	112.168.12.45	https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)

能够看到id=6的数据,有2条,它的时间恰好是--last-value指定的时间,则会导入大于等于--last-value指定时间的数据,这点须要注意。

相关文章
相关标签/搜索