Sqoop(四)增量导入、全量导入、减量导入

增量导入mysql

1、说明sql

  当在生产环境中,咱们可能会按期从与业务相关的关系型数据库向Hadoop导入数据,导入数仓后进行后续离线分析。这种状况下咱们不可能将全部数据从新再导入一遍,因此此时须要数据增量导入。数据库

  增量导入数据分为两种方式:app

    一是基于递增列的增量数据导入(Append方式)。oop

    二是基于时间列的数据增量导入(LastModified方式)。spa

2、增量导入3d

方式一:Append方式code

  好比:有一个订单表,里面每一个订单有一个惟一标识自增列ID,在关系型数据库中以主键形式存在,以前已经将id在1-3的编号的订单导入到了Hive中,如今一段时间后咱们须要将近期产生的新的订单数据(id为四、5的两条数据)导入Hive,供后续数仓进行分析。此时咱们只须要指定-incremental参数为append,-last-value参数为3便可。表示只从大于3后开始导入。blog

一、MYSQL建表rem

CREATE TABLE `appendTest` (
  `id` int(11) ,
  `name` varchar(255)
) 

二、导入数据

insert into appendTest(id,name) values(1,'name1');
insert into appendTest(id,name) values(2,'name2');
insert into appendTest(id,name) values(3,'name3');

三、建立一张跟mysql中的appendTest表同样的hive表appendTest

sqoop create-hive-table \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--password 010209 \
--table appendTest \
--hive-table appendTest

四、进行导入,将id>0的三条数据进行导入

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table appendTest \
--hive-import \
-m 1  \
--hive-table appendTest \
--incremental append \ --check-column id \ --last-value 0

结果:

五、查看

 六、向mysql表appendTest再次插入数据

insert into appendTest(id,name) values(4,'name4');
insert into appendTest(id,name) values(5,'name5');

七、再次执行增量导入

因为上一次导入的时候,,将--last-value设置为0,将id>0的三条数据导入后,如今进行导入了时候须要将last-value设置为3
sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table appendTest \ --hive-import \ -m 1 \
--hive-table appendTest \
--incremental append \ --check-column id \ --last-value 3

结果:

八、查看hive表appendTest

重要参数说明:

九、说明

说明:
增量抽取,须要指定--incremental append,同时指定按照源表中哪一个字段进行增量--check-column id,
并指定hive表appendTest当前最大值--last-value 3。建立sqoop job的目的是,每次执行job之后,sqoop会自动记录appedndTest的last-value,
下次再执行时,就会自动指定last-value,不须要手工去改了。

方式二:lastModify方式

 基于lastModify的方式,要求原表中有time字段,它能指定一个时间戳,让SQoop把该时间戳以后的数据导入至Hive,由于后续订单可能状态会发生变化,变化后time字段时间戳也会发生变化,此时SQoop依然会将相同状态更改后的订单导入Hive,固然咱们能够指定merge-key参数为id,表示将后续新的记录与原有记录合并。

 一、Mysql建表

CREATE TABLE lastModifyTest (
id INT,
name VARCHAR (20),
last_mod TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

二、导入数据

insert into lastModifyTest(id,name) values(1,'enzo');
insert into lastModifyTest(id,name) values(2,'din');
insert into lastModifyTest(id,name) values(3,'fz');
insert into lastModifyTest(id,name) values(4,'dx');
insert into lastModifyTest(id,name) values(5,'ef');

三、HIve建表

 
 
sqoop create-hive-table \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \ --password 010209 \ --table lastModifyTest \ --hive-table lastModifyTest 

四、导入数据,将时间之后的数据进行导入

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table lastModifyTest \
--hive-import \
-m 1  \
--hive-table lastModifyTest \
--incremental lastmodified \
--check-column last_mod \
--last-value "2019-05-14 15:17:23"

结果:

五、查看数据导入结果

 六、参数说明

 

全量导入

将mysql表中所有数据都导入Hive,下面来查看实例:

一、MYSQL数据

二、一次性将mysql表im数据全量导入hive中

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--password 010209 \
--table im \
--hive-import \
--hive-table im \
-m 1

减量导入

设置where条件,经过判断条件能够判断减小的数据和增长的数据,控制更加灵活。

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table appendTest \
--hive-import \
-m 1  \
--incremental append \
--where "age>30"
--check-column id \
--last-value 0
相关文章
相关标签/搜索