使用Spark加载数据到SQL Server列存储表

原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/html

介绍

SQL Server的Bulk load默认为串行,这味着例如,一个BULK INSERT语句将生成一个线程将数据插入表中。可是,对于并发负载,您可使用多个批量插入语句插入同一张表,前提是须要阅读多个文件。node

考虑要求所在的情景:git

  • 从大文件加载数据(好比,超过 20 GB)
  • 拆分文件不是一个选项,由于它将是整个大容量负载操做中的一个额外步骤。
  • 每一个传入的数据文件大小不一样,所以很难识别大块数(将文件拆分为)并动态定义为每一个大块执行的批量插入语句。
  • 要加载的多个文件跨越多个 GB(例如超过 20 GB 及以上),每一个GB 包含数百万条记录。

在这种状况下,使用 Apache Spark是并行批量数据加载到 SQL 表的流行方法之一。github

在本文中,咱们使用 Azure Databricks spark engine使用单个输入文件将数据以并行流(多个线程将数据加载到表中)插入 SQL Server。目标表多是HeapClustered IndexClustered Columnstore Index。本文旨在展现如何利用Spark提供的高度分布式框架,在加载到 SQL Server或 Azure SQL的汇集列存储索引表以前仔细对数据分区。sql

本文中分享的最有趣的观察是展现使用Spark默认配置时列存储表的行组质量下降,以及如何经过高效使用Spark分区来提升质量。从本质上讲,提升行组质量是决定查询性能的重要因素。数据库

 

环境设置

数据集:apache

  • 单张表的一个自定义数据集。一个 27 GB 的 CSV 文件,110 M 记录,共 36 列。其中列的类型有int, nvarchar, datetime等。

数据库:性能优化

  • Azure SQL Database – Business Critical, Gen5 80vCores

ELT 平台:架构

  • Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)
  • Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)

存储:并发

  • Azure Data Lake Storage Gen2

先决条件:

在进一步浏览本文以前,请花一些时间了解此处将数据加载到汇集列存储表中的概述:Data Loading performance considerations with Clustered Columnstore indexes

在此测试中,数据从位于 Azure Data Lake Storage Gen 2的 CSV 文件中加载。CSV 文件大小为 27 GB,有 110 M 记录,有 36 列。这是一个带有随机数据的自定义数据集。

批量加载或预处理(ELT\ETL)的典型架构看起来与下图类似:

使用BULK INSERTS    

在第一次测试中,单个BULK INSERT用于将数据加载到带有汇集列存储索引的 Azure SQL 表中,这里没有意外,根据所使用的 BATCHSIZE,它花了 30 多分钟才完成。请记住,BULK INSERT是一个单一的线程操做,所以单个流会读取并将其写入表中,从而下降负载吞吐量

 

使用Azure Databricks

为了实现写入到 SQL Server和读取ADLS (Azure Data Lake Storage) Gen 2的最大并发性和高吞吐量,Azure Databricks 被选为平台的选择,尽管咱们还有其余选择,即 Azure Data Factory或其余基于Spark引擎的平台。

使用Azure Databricks加载数据的优势是 Spark 引擎经过专用的 Spark API并行读取输入文件。这些 API将使用必定数量的分区,这些分区映射到单个或多个输入文件,映射是在文件的一部分或整个文件上完成的。数据读入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在这种状况下,数据被加载到DataFrame中,而后进行转换(设置与目标表匹配的DataFrame schema),而后数据准备写入 SQL 表。

要将DataFrame中的数据写入 SQL Server中,必须使用Microsoft's Apache Spark SQL Connector。这是一个高性能的链接器,使您可以在大数据分析中使用事务数据,和持久化结果用于即席查询或报告。链接器容许您使用任何 SQL Server(本地数据库或云中)做为 Spark 做业的输入数据源或输出目标。

  GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks

请注意,目标表具备汇集列存储索引,以实现高负载吞吐量,可是,您也能够将数据加载到Heap,这也将提供良好的负载性能。对于本文的相关性,咱们只讨论加载到列存储表。咱们使用不一样的 BATCHSIZE 值将数据加载到Clustered Columnstore Index中 -请参阅此文档,了解 BATCHSIZE 在批量加载到汇集列存储索引表期间的影响。

如下是Clustered Columnstore Index上的数据加载测试运行,BATCHSIZE为 102400 和 1048576:

 

 

 

 

请注意,咱们正在使用 Azure Databricks使用的默认并行和分区,并将数据直接推至 SQL Server汇集列存储索引表。咱们没有调整 Azure Databricks使用的任何默认配置。不管所定义的批次大小,咱们全部的测试都大体在同一时间完成。

将数据加载到 SQL 中的 32 个并发线程是因为上述已提供的数据砖群集的大小。该集群最多有 8 个节点,每一个节点有 4 个内核,即 8*4 = 32 个内核,最多可运行 32 个并发线程。

查看行组(Row Groups)

有关咱们使用 BATCHSIZE 1048576 插入数据的表格,如下是在 SQL 中建立的行组数:

行组总数:

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
216 

行组的质量:

SELECT *
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')

在这种状况下,咱们只有一个delta store在OPEN状态 (total_rows = 3810) 和 215 行组处于压缩状态, 这是有道理的, 由于若是插入的批次大小是>102400 行, 数据再也不delta store存储, 而是直接插入一个压缩行组的列存储。在这种状况下,压缩状态中的全部行组都有 >102400 条记录。如今,有关行组的问题是:

为何咱们有216行组?

为何当咱们的BatchSize设置为 1048576 时,每一个行组的行数不一样?

请注意,每一个行组的数据大约等于上述结果集中的 500000 条记录。

这两个问题的答案是 Azure Databricks Spark引擎对数据分区控制了写入汇集列存储索引表行组的数据行数。让咱们来看看 Azure Databricks为有关数据集建立的分区数:

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216

所以,咱们为数据集建立了 216 个分区。请记住,这些是分区的默认数。每一个分区都有大约 500000 条记录。

# Number of records in each partition
from pyspark.sql.functions
import spark_partition_id
df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

将Spark分区中的记录数与行组中的记录数进行比较,您就会发现它们是相等的。甚至分区数也等于行组数。所以,从某种意义上说,1048576 的 BATCHSIZE 正被每一个分区中的行数过分拉大。

sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn")
sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser")
sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd")

servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";"
table_name = "<Your Table Name>"

# Write data to SQL table with BatchSize 1048576
df_gl.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", sqldbuser) \
.option("password", sqldbpwd) \
.option("schemaCheckEnabled", False) \
.option("BatchSize", 1048576) \
.option("truncate", True) \
.save()

行组质量

行组质量由行组数和每一个行组记录决定。因为汇集列存储索引经过扫描单行组的列段扫描表,则最大化每一个行组中的行数可加强查询性能。当行组具备大量行数时,数据压缩会改善,这意味着从磁盘中读取的数据更少。为了得到最佳的查询性能,目标是最大限度地提升汇集列索引中每一个行组的行数。行组最多可有 1048576 行。可是,须要注意的是,因为汇集列索引,行组必须至少有 102400 行才能实现性能提高。此外,请记住,行组的最大大小(100万)可能在每个状况下都达到,文件行组大小不仅是最大限制的一个因素,但受到如下因素的影响。

  • 字典大小限制,即 16 MB
  • 插入指定的批次大小
  • 表的分区方案,由于行组不跨分区
  • 内存压力致使行组被修剪
  • 索引重组,重建

话虽如此,如今一个重要的考虑是让行组大小尽量接近 100 万条记录。在此测试中,因为每一个行组的大小接近 500000 条记录,咱们有两个选项能够达到约 100 万条记录的大小:

  • 在Spark中,更改分区数,使每一个分区尽量接近 1048576 条记录,
  • 保持Spark分区(默认值),一旦数据加载到表中,就运行 ALTER INDEX REORG,将多个压缩行组组合成一组。

选项#1很容易在Python或Scala代码中实现,该代码将在Azure Databricks上运行,负载至关低。

选项#2是数据加载后须要采起的额外步骤,固然,这将消耗 SQL 上的额外 CPU ,并增长整个加载过程所需的时间。

为了保持本文的相关性,让咱们来讨论更多关于Spark分区,以及如何从其默认值及其在下一节的影响中更改它。

 

Spark Partitioning

Spark 引擎最典型的输入源是一组文件,这些文件经过将每一个节点上的适当分区划分为一个或多个 Spark API来读取这些文件。这是 Spark 的自动分区,将用户从肯定分区数量的忧虑中抽象出来,若是用户想挑战,就需控制分区的配置。根据环境和环境设置计算的分区的默认数一般适用于大多数状况下。可是,在某些状况下,更好地了解分区是如何自动计算的,若是须要,用户能够更改分区计数,从而在性能上产生明显差别。

注意:大型Spark群集能够生成大量并行线程,这可能致使 Azure SQL DB 上的内存授予争议。因为内存超时,您必须留意这种可能性,以免提早修剪。请参阅本文以了解更多详细信息,了解表的模式和行数等也可能对内存授予产生影响。

spark.sql.files.maxPartitionBytes是控制分区大小的重要参数,默认设置为128 MB。它能够调整以控制分区大小,所以也会更改由此产生的分区数。

spark.default.parallelism这至关于worker nodes核心的总数。

最后,咱们有coalesce()repartition(),可用于增长/减小分区数,甚至在数据已被读入Spark。

只有当您想要减小分区数时,才能使用coalesce() 由于它不涉及数据的重排。请考虑此data frame的分区数为 16,而且您但愿将其增长到 32,所以您决定运行如下命令。

df = df.coalesce(32)
print(df.rdd.getNumPartitions())

可是,分区数量不会增长到 32 个,而且将保持在 16 个,由于coalesce()不涉及数据重排。这是一个性能优化的实现,由于无需昂贵的数据重排便可减小分区。

若是您想将上述示例的分区数减小到 8,则会得到预期的结果。

df = df.coalesce(8)
print(df.rdd.getNumPartitions())

这将合并数据并产生 8 个分区。

repartition() 另外一个帮助调整分区的函数。对于同一示例,您可使用如下命令将数据放入 32 个分区。

df = df.repartition(32)
print(df.rdd.getNumPartitions())

最后,还有其余功能能够改变分区数,其中是groupBy(), groupByKey(), reduceByKey() join()。当在 DataFrame 上调用这些功能时,会致使跨机器或一般跨执行器对数据进行重排,最终在默认状况下将数据从新划分为 200 个分区。此默认 数字可使用spark.sql.shuffle.partitions配置进行控制。

 

数据加载

如今,了解分区在 Spark 中的工做原理以及如何更改分区,是时候实施这些学习了。在上述实验中,分区数为 216默认状况下),这是由于文件的大小为 27 GB,所以将 27 GB 除以 128 MB(默认状况下由 Spark 定义的最大分区字节)提供了216 个分区

Spark从新分区的影响

对 PySpark 代码的更改是从新分区数据并确保每一个分区如今有 1048576 行或接近它。为此,首先在DataFrame中获取记录数量,而后将其除以 1048576。此划分的结果将是用于加载数据的分区数,假设分区数为n可是,可能有一些分区如今有 >=1048576 行,所以,为了确保每一个分区都<=1048576行,咱们将分区数做为n+1使用n+1在分区结果为 0 的状况下也很重要。在这种状况下,您将有一个分区。

因为数据已加载到DataFrame中,而 Spark 默认已建立分区,咱们如今必须再次从新分区数据,分区数等于n+1。

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216
 
# Get the number of rows of DataFrame and get the number of partitions to be used.
rows = df_gl.count()
n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)

所以,在从新划分分区后,分区数量从216 个减小到 105 (n+1),所以每一个分区如今都有接近1048576行。

此时,让咱们将数据再次写入 SQL 表中,并验证行组质量。这一次,每一个行组的行数将接近每一个分区中的行数(略低于 1048576)。让咱们看看下面:

从新分区后的行组

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats 
WHERE object_id = OBJECT_ID('largetable110M_1048576')
105

从新分区后的行组质量

从本质上讲,此次总体数据加载比以前慢了 2 秒,但行组的质量要好得多。行组数量减小到一半,行组几乎已填满到最大容量。请注意,因为DataFrame的从新划分,将消耗额外的时间,这取决于数据帧的大小和分区数。

请注意,您不会老是得到每row_group 100 万条记录。它将取决于数据类型、列数等,以及以前讨论的因素-请参阅sys.dm_db_column_store_row_group_physical_stats

 

关键点

  1. 建议在将数据批量加载到 SQL Server时使用BatchSize(不管是 CCI 仍是Heap)。可是,若是 Azure Databricks 或任何其余 Spark 引擎用于加载数据,则数据分区在肯定汇集列存储索引中的行组质量方面起着重要做用。
  2. 使用BULK INSERT命令加载数据将遵照命令中提到的BATCHSIZE,除非其余因素影响插入行组的行数。
  3. Spark 中的数据分区不该基于某些随机数,最好动态识别分区数,并将n+1 用做分区数
  4. 因为汇集列存储索引经过扫描单行组的列段扫描表,则最大化每一个行组中的记录数可加强查询性能。为了得到最佳的查询性能,目标是最大限度地提升汇集列存储索引中每一个行组的行数。
  5. Azure Databricks的数据加载速度在很大程度上取决于选择的集群类型及其配置。此外,请注意,到目前为止,Azure Databricks链接器仅支持Apache Spark 2.4.5。微软已经发布了对Spark 3.0的支持,它目前在预览版中,咱们建议您在开发测试环境中完全测试此链接器。
  6. 根据data frame的大小、列数、数据类型等,进行从新划分的时间会有所不一样,所以您必须从端端角度考虑此次对总体数据加载的考虑。

 

Azure Data Factory

这是一篇很是好的数据ETL文章,Spark和SQL Server列存储表功能的组合。

Azure Data Factory是当前最成熟,功能最强大的ETL/ELT数据集成服务。其架构就是使用Spark做为计算引擎。

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory

相关文章
相关标签/搜索