SQL Server的Bulk load默认为串行,这意味着例如,一个BULK INSERT语句将生成一个线程将数据插入表中。可是,对于并发负载,您可使用多个批量插入语句插入同一张表,前提是须要阅读多个文件。node
考虑要求所在的情景:git
在这种状况下,使用 Apache Spark是并行批量数据加载到 SQL 表的流行方法之一。github
在本文中,咱们使用 Azure Databricks spark engine使用单个输入文件将数据以并行流(多个线程将数据加载到表中)插入 SQL Server。目标表多是Heap、Clustered Index或Clustered Columnstore Index。本文旨在展现如何利用Spark提供的高度分布式框架,在加载到 SQL Server或 Azure SQL的汇集列存储索引表以前仔细对数据分区。sql
本文中分享的最有趣的观察是展现使用Spark默认配置时列存储表的行组质量下降,以及如何经过高效使用Spark分区来提升质量。从本质上讲,提升行组质量是决定查询性能的重要因素。数据库
数据集:apache
数据库:性能优化
ELT 平台:架构
存储:并发
先决条件:
在进一步浏览本文以前,请花一些时间了解此处将数据加载到汇集列存储表中的概述:Data Loading performance considerations with Clustered Columnstore indexes
在此测试中,数据从位于 Azure Data Lake Storage Gen 2的 CSV 文件中加载。CSV 文件大小为 27 GB,有 110 M 记录,有 36 列。这是一个带有随机数据的自定义数据集。
批量加载或预处理(ELT\ETL)的典型架构看起来与下图类似:
在第一次测试中,单个BULK INSERT用于将数据加载到带有汇集列存储索引的 Azure SQL 表中,这里没有意外,根据所使用的 BATCHSIZE,它花了 30 多分钟才完成。请记住,BULK INSERT是一个单一的线程操做,所以单个流会读取并将其写入表中,从而下降负载吞吐量。
为了实现写入到 SQL Server和读取ADLS (Azure Data Lake Storage) Gen 2的最大并发性和高吞吐量,Azure Databricks 被选为平台的选择,尽管咱们还有其余选择,即 Azure Data Factory或其余基于Spark引擎的平台。
使用Azure Databricks加载数据的优势是 Spark 引擎经过专用的 Spark API并行读取输入文件。这些 API将使用必定数量的分区,这些分区映射到单个或多个输入文件,映射是在文件的一部分或整个文件上完成的。数据读入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在这种状况下,数据被加载到DataFrame中,而后进行转换(设置与目标表匹配的DataFrame schema),而后数据准备写入 SQL 表。
要将DataFrame中的数据写入 SQL Server中,必须使用Microsoft's Apache Spark SQL Connector。这是一个高性能的链接器,使您可以在大数据分析中使用事务数据,和持久化结果用于即席查询或报告。链接器容许您使用任何 SQL Server(本地数据库或云中)做为 Spark 做业的输入数据源或输出目标。
GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks
请注意,目标表具备汇集列存储索引,以实现高负载吞吐量,可是,您也能够将数据加载到Heap,这也将提供良好的负载性能。对于本文的相关性,咱们只讨论加载到列存储表。咱们使用不一样的 BATCHSIZE 值将数据加载到Clustered Columnstore Index中 -请参阅此文档,了解 BATCHSIZE 在批量加载到汇集列存储索引表期间的影响。
如下是Clustered Columnstore Index上的数据加载测试运行,BATCHSIZE为 102400 和 1048576:
请注意,咱们正在使用 Azure Databricks使用的默认并行和分区,并将数据直接推至 SQL Server汇集列存储索引表。咱们没有调整 Azure Databricks使用的任何默认配置。不管所定义的批次大小,咱们全部的测试都大体在同一时间完成。
将数据加载到 SQL 中的 32 个并发线程是因为上述已提供的数据砖群集的大小。该集群最多有 8 个节点,每一个节点有 4 个内核,即 8*4 = 32 个内核,最多可运行 32 个并发线程。
有关咱们使用 BATCHSIZE 1048576 插入数据的表格,如下是在 SQL 中建立的行组数:
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 216
SELECT * FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576')
在这种状况下,咱们只有一个delta store在OPEN状态 (total_rows = 3810) 和 215 行组处于压缩状态, 这是有道理的, 由于若是插入的批次大小是>102400 行, 数据再也不delta store存储, 而是直接插入一个压缩行组的列存储。在这种状况下,压缩状态中的全部行组都有 >102400 条记录。如今,有关行组的问题是:
为何咱们有216行组?
为何当咱们的BatchSize设置为 1048576 时,每一个行组的行数不一样?
请注意,每一个行组的数据大约等于上述结果集中的 500,000 条记录。
这两个问题的答案是 Azure Databricks Spark引擎对数据分区控制了写入汇集列存储索引表行组的数据行数。让咱们来看看 Azure Databricks为有关数据集建立的分区数:
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216
所以,咱们为数据集建立了 216 个分区。请记住,这些是分区的默认数。每一个分区都有大约 500000 条记录。
# Number of records in each partition from pyspark.sql.functions import spark_partition_id df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)
将Spark分区中的记录数与行组中的记录数进行比较,您就会发现它们是相等的。甚至分区数也等于行组数。所以,从某种意义上说,1048576 的 BATCHSIZE 正被每一个分区中的行数过分拉大。
sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn") sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser") sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd") servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";" table_name = "<Your Table Name>" # Write data to SQL table with BatchSize 1048576 df_gl.write \ .format("com.microsoft.sqlserver.jdbc.spark") \ .mode("overwrite") \ .option("url", url) \ .option("dbtable", table_name) \ .option("user", sqldbuser) \ .option("password", sqldbpwd) \ .option("schemaCheckEnabled", False) \ .option("BatchSize", 1048576) \ .option("truncate", True) \ .save()
行组质量由行组数和每一个行组记录决定。因为汇集列存储索引经过扫描单行组的列段扫描表,则最大化每一个行组中的行数可加强查询性能。当行组具备大量行数时,数据压缩会改善,这意味着从磁盘中读取的数据更少。为了得到最佳的查询性能,目标是最大限度地提升汇集列索引中每一个行组的行数。行组最多可有 1048576 行。可是,须要注意的是,因为汇集列索引,行组必须至少有 102400 行才能实现性能提高。此外,请记住,行组的最大大小(100万)可能在每个状况下都达到,文件行组大小不仅是最大限制的一个因素,但受到如下因素的影响。
话虽如此,如今一个重要的考虑是让行组大小尽量接近 100 万条记录。在此测试中,因为每一个行组的大小接近 500000 条记录,咱们有两个选项能够达到约 100 万条记录的大小:
选项#1很容易在Python或Scala代码中实现,该代码将在Azure Databricks上运行,负载至关低。
选项#2是数据加载后须要采起的额外步骤,固然,这将消耗 SQL 上的额外 CPU ,并增长整个加载过程所需的时间。
为了保持本文的相关性,让咱们来讨论更多关于Spark分区,以及如何从其默认值及其在下一节的影响中更改它。
Spark 引擎最典型的输入源是一组文件,这些文件经过将每一个节点上的适当分区划分为一个或多个 Spark API来读取这些文件。这是 Spark 的自动分区,将用户从肯定分区数量的忧虑中抽象出来,若是用户想挑战,就需控制分区的配置。根据环境和环境设置计算的分区的默认数一般适用于大多数状况下。可是,在某些状况下,更好地了解分区是如何自动计算的,若是须要,用户能够更改分区计数,从而在性能上产生明显差别。
注意:大型Spark群集能够生成大量并行线程,这可能致使 Azure SQL DB 上的内存授予争议。因为内存超时,您必须留意这种可能性,以免提早修剪。请参阅本文以了解更多详细信息,了解表的模式和行数等也可能对内存授予产生影响。
spark.sql.files.maxPartitionBytes是控制分区大小的重要参数,默认设置为128 MB。它能够调整以控制分区大小,所以也会更改由此产生的分区数。
spark.default.parallelism这至关于worker nodes核心的总数。
最后,咱们有coalesce()和repartition(),可用于增长/减小分区数,甚至在数据已被读入Spark。
只有当您想要减小分区数时,才能使用coalesce() ,由于它不涉及数据的重排。请考虑此data frame的分区数为 16,而且您但愿将其增长到 32,所以您决定运行如下命令。
df = df.coalesce(32) print(df.rdd.getNumPartitions())
可是,分区数量不会增长到 32 个,而且将保持在 16 个,由于coalesce()不涉及数据重排。这是一个性能优化的实现,由于无需昂贵的数据重排便可减小分区。
若是您想将上述示例的分区数减小到 8,则会得到预期的结果。
df = df.coalesce(8) print(df.rdd.getNumPartitions())
这将合并数据并产生 8 个分区。
repartition() 是另外一个帮助调整分区的函数。对于同一示例,您可使用如下命令将数据放入 32 个分区。
df = df.repartition(32) print(df.rdd.getNumPartitions())
最后,还有其余功能能够改变分区数,其中是groupBy(), groupByKey(), reduceByKey() 和 join()。当在 DataFrame 上调用这些功能时,会致使跨机器或一般跨执行器对数据进行重排,最终在默认状况下将数据从新划分为 200 个分区。此默认 数字可使用spark.sql.shuffle.partitions配置进行控制。
如今,了解分区在 Spark 中的工做原理以及如何更改分区,是时候实施这些学习了。在上述实验中,分区数为 216(默认状况下),这是由于文件的大小为 27 GB,所以将 27 GB 除以 128 MB(默认状况下由 Spark 定义的最大分区字节)提供了216 个分区。
对 PySpark 代码的更改是从新分区数据并确保每一个分区如今有 1048576 行或接近它。为此,首先在DataFrame中获取记录数量,而后将其除以 1048576。此划分的结果将是用于加载数据的分区数,假设分区数为n。可是,可能有一些分区如今有 >=1048576 行,所以,为了确保每一个分区都<=1048576行,咱们将分区数做为n+1。使用n+1在分区结果为 0 的状况下也很重要。在这种状况下,您将有一个分区。
因为数据已加载到DataFrame中,而 Spark 默认已建立分区,咱们如今必须再次从新分区数据,分区数等于n+1。
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216 # Get the number of rows of DataFrame and get the number of partitions to be used. rows = df_gl.count() n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)
所以,在从新划分分区后,分区数量从216 个减小到 105 (n+1),所以每一个分区如今都有接近1048576行。
此时,让咱们将数据再次写入 SQL 表中,并验证行组质量。这一次,每一个行组的行数将接近每一个分区中的行数(略低于 1048576)。让咱们看看下面:
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 105
从本质上讲,此次总体数据加载比以前慢了 2 秒,但行组的质量要好得多。行组数量减小到一半,行组几乎已填满到最大容量。请注意,因为DataFrame的从新划分,将消耗额外的时间,这取决于数据帧的大小和分区数。
请注意,您不会老是得到每row_group 100 万条记录。它将取决于数据类型、列数等,以及以前讨论的因素-请参阅sys.dm_db_column_store_row_group_physical_stats
这是一篇很是好的数据ETL文章,Spark和SQL Server列存储表功能的组合。
Azure Data Factory是当前最成熟,功能最强大的ETL/ELT数据集成服务。其架构就是使用Spark做为计算引擎。
https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory