在Azure 数据工程中,可使用Copy Data 活动把数据从on-premises 或云中复制到其余存储中。Copy Data 活动必须在一个IR(Integration Runtime)上运行,对于把存储在on-premises中的数据复制到其余存储时,必须建立一个self-hosted Integration Runtime。sql
建立一个Pipeline,从Activities列表中找到“Copy data”,拖放到Pipeline画布中,以下图所示:安全
在General选项卡中,设置Activity的常规属性架构
Source选项卡用于设置Copy data Activity的源属性,并发
1,Source 的常规设置app
Source dataset:设置源的dataset性能
use query:Table选项表示整个表做为一个数据源,Query或 Store procedure选项表示使用查询语句或存储过程来获取数据源。spa
Query timeout(minutes):表示查询超时的时间日志
Isolation level:设置查询隔离级别,做用于数据源。code
2,Partition optionorm
指定从SQL Server加载数据的分区选项,当启用分区选项时(不是None),从SQL Server 同时加载数据的并发度由Copy data Activity的Degree of copy parallelism属性设置。Physical Partitions Of Table表示数据工厂根据原始表的分区定义来肯定分区列和分区机制;当选择Dynamic range选项时,用户还须要设置Partition column name、Partition upper bound 和Partition lower bound三项,手动设置分区列和分区机制。
3,Additional columns
添加额外的列,Value由三种类型:Add dynamic content、$$COLUMN和Custom。
$$COLUMN:表示把源的指定列复制为另外一列
Custom:表示添加一列,列指是常量
Add dynamic content,表示添加动态上下文(Dynamic Content),动态上下文是指数据工厂的上下文,这些动态上下文由系统变量(System variables)来提供:
Sink是Copy Data Activity复制数据的目标数据集,Data Factory 使用 Sink dataset来设置目标。
1,Store procedure name
从Sink dataset中选择存储过程,该存储过程定义了如何把元数据应用于目标表。该存储过程每一个batch调用一次,对于仅运行一次且与源数据无关的操做,请使用 Pre-copy script 属性。
若是使用Pre-copy script 属性,一般意味着数据是全量更新,重写整个表,好比如下脚本:
truncate table staging_table
Copy data activity的执行过程是:每次执行Copy data activity,数据工厂首先执行Pre-copy script,而后使用最新的数据插入数据到target table。
若是使用存储过程,一般是对数据进行增量更新,要实现增量更新,其实是把数据集做为参数传递给存储过程,这就意味着存储过程的一个参数必须是表变量类型,存储过程的代码实现以下脚本所示,
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
2,Table option
若是设置为Auto create table,那么当目标表不存在时,数据工厂根据Source 的元数据自动建立目标表。
3,常规设置
Write batch timeout:每一个batch数据写入的超时时间
Write batch size:每一个batch的数据行数量
Max concurrent connections:访问数据存储的最大的并发链接数量
在Mapping选项卡中,主要设置Source 和 Sink之间的列映射
1,Type conversion settings用于设置类型转换
2,列映射
设置列与列之间的映射关系,用户须要点击“Import schemas”来导入架构元数据。
配置Copy data Activity的设置
1,常规的设置
2,设置Fault tolerance
当设置Fault tolerance (错误容忍)以后,用户能够忽略在复制数据过程当中出现的一些错误,能够忽略的错误类型主要有三个:
数据更新的方式主要有:全量更新、追加数据、增量更新。
1,数据的全量更新和追加更新
若是使用Pre-copy script 属性,一般意味着数据是全量更新和追加更新。
在插入数据以前,若是先清空目标表,再向目标表插入数据,这种方式是全量更新;若是不清空目标表,只是向目标表插入新的数据,那么就是追加更新,前提是保证数据是无重复的新数据。
2,经过存储过程来实现Copy data Activity的增量更新
若是Sink属性使用存储过程,那么是对数据进行增量更新。实现数据的增量更新,其实是把数据集做为参数传递给存储过程,这就意味着存储过程的一个参数必须是表变量类型。
因为存储过程在链接表变量时,性能较差,建议对分batch插入,每一个batch进行一次插入操做。
建立一个表类型,做为存储过程的参数,表的架构和输入数据的架构相同:
CREATE TYPE [dbo].[MarketingType] AS TABLE
( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )
建立存储过程,第一个变量是表变量,该存储过程的做用是把表变量的数据更新到Sink指定的target table中。
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
3,使用临时表来实现增量更新
先把数据加载到临时表,经过merge语句把临时数据归并到product table。
参考文档:
Copy data to and from SQL Server by using Azure Data Factory