Spark SQL在100TB上的自适应执行实践(转载)

Spark SQL是Apache Spark最普遍使用的一个组件,它提供了很是友好的接口来分布式处理结构化数据,在不少应用领域都有成功的生产实践,可是在超大规模集群和数据集上,Spark SQL仍然遇到很多易用性和可扩展性的挑战。为了应对这些挑战,英特尔大数据技术团队和百度大数据基础架构部工程师在Spark 社区版本的基础上,改进并实现了自适应执行引擎。本文首先讨论Spark SQL在大规模数据集上遇到的挑战,而后介绍自适应执行的背景和基本架构,以及自适应执行如何应对Spark SQL这些问题,最后咱们将比较自适应执行和现有的社区版本Spark SQL在100 TB 规模TPC-DS基准测试碰到的挑战和性能差别,以及自适应执行在Baidu Big SQL平台的使用状况。sql

挑战1:关于shuffle partition数

在Spark SQL中, shufflepartition数能够经过参数spark.sql.shuffle.partition来设置,默认值是200。这个参数决定了SQL做业每一个reduce阶段任务数量,对整个查询性能有很大影响。假设一个查询运行前申请了E个Executor,每一个Executor包含C个core(并发执行线程数),那么该做业在运行时能够并行执行的任务数就等于E x C个,或者说该做业的并发数是E x C。假设shuffle partition个数为P,除了map stage的任务数和原始数据的文件数量以及大小相关,后续的每一个reduce stage的任务数都是P。因为Spark做业调度是抢占式的,E x C个并发任务执行单元会抢占执行P个任务,“能者多劳”,直至全部任务完成,则进入到下一个Stage。但这个过程当中,若是有任务由于处理数据量过大(例如:数据倾斜致使大量数据被划分到同一个reducer partition)或者其它缘由形成该任务执行时间过长,一方面会致使整个stage执行时间变长,另外一方面E x C个并发执行单元大部分可能都处于空闲等待状态,集群资源总体利用率急剧降低。网络

那么spark.sql.shuffle.partition参数到底是多少比较合适?若是设置太小,分配给每个reduce任务处理的数据量就越多,在内存大小有限的状况下,不得不溢写(spill)到计算节点本地磁盘上。Spill会致使额外的磁盘读写,影响整个SQL查询的性能,更差的状况还可能致使严重的GC问题甚至是OOM。相反,若是shuffle partition设置过大。第一,每个reduce任务处理的数据量很小而且很快结束,进而致使Spark任务调度负担变大。第二,每个mapper任务必须把本身的shuffle输出数据分红P个hash bucket,即肯定数据属于哪个reduce partition,当shuffle partition数量太多时,hash bucket里数据量会很小,在做业并发数很大时,reduce任务shuffle拉取数据会形成必定程度的随机小数据读操做,当使用机械硬盘做为shuffle数据临时存取的时候性能降低会更加明显。最后,当最后一个stage保存数据时会写出P个文件,也可能会形成HDFS文件系统中大量的小文件。数据结构

从上,shuffle partition的设置既不能过小也不能太大。为了达到最佳的性能,每每须要经屡次试验才能肯定某个SQL查询最佳的shuffle partition值。然而在生产环境中,每每SQL以定时做业的方式处理不一样时间段的数据,数据量大小可能变化很大,咱们也没法为每个SQL查询去作耗时的人工调优,这也意味这些SQL做业很难以最佳的性能方式运行。多线程

Shuffle partition的另一个问题是,同一个shuffle partition数设置将应用到全部的stage。Spark在执行一个SQL做业时,会划分红多个stage。一般状况下,每一个stage的数据分布和大小可能都不太同样,全局的shuffle partition设置最多只能对某个或者某些stage最优,没有办法作到全局全部的stage设置最优。架构

这一系列关于shufflepartition的性能和易用性挑战,促使咱们思考新的方法:咱们可否根据运行时获取的shuffle数据量信息,例如数据块大小,记录行数等等,自动为每个stage设置合适的shuffle partition值?并发

挑战2:Spark SQL最佳执行计划

Spark SQL在执行SQL以前,会将SQL或者Dataset程序解析成逻辑计划,而后经历一系列的优化,最后肯定一个可执行的物理计划。最终选择的物理计划的不一样对性能有很大的影响。如何选择最佳的执行计划,这即是Spark SQL的Catalyst优化器的核心工做。Catalyst早期主要是基于规则的优化器(RBO),在Spark 2.2中又加入了基于代价的优化(CBO)。目前执行计划的肯定是在计划阶段,一旦确认之后便再也不改变。然而在运行期间,当咱们获取到更多运行时信息时,咱们将有可能获得一个更佳的执行计划。app

以join操做为例,在Spark中最多见的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin属于map side join,其原理是当其中一张表存储空间大小小于broadcast阈值时,Spark选择将这张小表广播到每个Executor上,而后在map阶段,每个mapper读取大表的一个分片,而且和整张小表进行join,整个过程当中避免了把大表的数据在集群中进行shuffle。而SortMergeJoin在map阶段2张数据表都按相同的分区方式进行shuffle写,reduce阶段每一个reducer将两张表属于对应partition的数据拉取到同一个任务中作join。RBO根据数据的大小,尽量把join操做优化成BroadcastHashJoin。Spark中使用参数spark.sql.autoBroadcastJoinThreshold来控制选择BroadcastHashJoin的阈值,默认是10MB。然而对于复杂的SQL查询,它可能使用中间结果来做为join的输入,在计划阶段,Spark并不能精确地知道join中两表的大小或者会错误地估计它们的大小,以至于错失了使用BroadcastHashJoin策略来优化join执行的机会。可是在运行时,经过从shuffle写获得的信息,咱们能够动态地选用BroadcastHashJoin。如下是一个例子,join一边的输入大小只有600K,但Spark仍然规划成SortMergeJoin。框架

这促使咱们思考第二个问题:咱们可否经过运行时收集到的信息,来动态地调整执行计划?分布式

挑战3:数据倾斜

数据倾斜是常见的致使Spark SQL性能变差的问题。数据倾斜是指某一个partition的数据量远远大于其它partition的数据,致使个别任务的运行时间远远大于其它任务,所以拖累了整个SQL的运行时间。在实际SQL做业中,数据倾斜很常见,join key对应的hash bucket老是会出现记录数不太平均的状况,在极端状况下,相同join key对应的记录数特别多,大量的数据必然被分到同一个partition于是形成数据严重倾斜。如图2,能够看到大部分任务3秒左右就完成了,而最慢的任务却花了4分钟,它处理的数据量倒是其它任务的若干倍。ide

目前,处理join时数据倾斜的一些常见手段有: (1)增长shuffle partition数量,指望本来分在同一个partition中的数据能够被分散到多个partition中,可是对于同key的数据没有做用。(2)调大BroadcastHashJoin的阈值,在某些场景下能够把SortMergeJoin转化成BroadcastHashJoin而避免shuffle产生的数据倾斜。(3)手动过滤倾斜的key,而且对这些数据加入随机的前缀,在另外一张表中这些key对应的数据也相应的膨胀处理,而后再作join。综上,这些手段都有各自的局限性而且涉及不少的人为处理。基于此,咱们思考了第三个问题:Spark可否在运行时自动地处理join中的数据倾斜?

自适应执行背景和简介

早在2015年,Spark社区就提出了自适应执行的基本想法,在Spark的DAGScheduler中增长了提交单个map stage的接口,而且在实现运行时调整shuffle partition数量上作了尝试。但目前该实现有必定的局限性,在某些场景下会引入更多的shuffle,即更多的stage,对于三表在同一个stage中作join等状况也没法很好的处理。因此该功能一直处于实验阶段,配置参数也没有在官方文档中说起。

基于这些社区的工做,英特尔大数据技术团队对自适应执行作了从新的设计,实现了一个更为灵活的自适性执行框架。在这个框架下面,咱们能够添加额外的规则,来实现更多的功能。目前,已实现的特性包括:自动设置shuffle partition数,动态调整执行计划,动态处理数据倾斜等等。

自适应执行架构

在Spark SQL中,当Spark肯定最后的物理执行计划后,根据每个operator对RDD的转换定义,它会生成一个RDD的DAG图。以后Spark基于DAG图静态划分stage而且提交执行,因此一旦执行计划肯定后,在运行阶段没法再更新。自适应执行的基本思路是在执行计划中事先划分好stage,而后按stage提交执行,在运行时收集当前stage的shuffle统计信息,以此来优化下一个stage的执行计划,而后再提交执行后续的stage。

从图3中咱们能够看出自适应执行的工做方法,首先以Exchange节点做为分界将执行计划这棵树划分红多个QueryStage(Exchange节点在Spark SQL中表明shuffle)。每个QueryStage都是一棵独立的子树,也是一个独立的执行单元。在加入QueryStage的同时,咱们也加入一个QueryStageInput的叶子节点,做为父亲QueryStage的输入。例如对于图中两表join的执行计划来讲咱们会建立3个QueryStage。最后一个QueryStage中的执行计划是join自己,它有2个QueryStageInput表明它的输入,分别指向2个孩子的QueryStage。在执行QueryStage时,咱们首先提交它的孩子stage,而且收集这些stage运行时的信息。当这些孩子stage运行完毕后,咱们能够知道它们的大小等信息,以此来判断QueryStage中的计划是否能够优化更新。例如当咱们获知某一张表的大小是5M,它小于broadcast的阈值时,咱们能够将SortMergeJoin转化成BroadcastHashJoin来优化当前的执行计划。咱们也能够根据孩子stage产生的shuffle数据量,来动态地调整该stage的reducer个数。在完成一系列的优化处理后,最终咱们为该QueryStage生成RDD的DAG图,而且提交给DAG Scheduler来执行。

自动设置reducer个数

假设咱们设置的shufflepartition个数为5,在map stage结束以后,咱们知道每个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设咱们设置每个reducer处理的目标数据量是64MB,那么在运行时,咱们能够实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB),如图4所示。

在自适应执行的框架中,由于每一个QueryStage都知道本身全部的孩子stage,所以在调整reducer个数时,能够考虑到全部的stage输入。另外,咱们也能够将记录条数做为一个reducer处理的目标值。由于shuffle的数据每每都是通过压缩的,有时partition的数据量并不大,但解压后记录条数确远远大于其它partition,形成数据不均。因此同时考虑数据大小和记录条数能够更好地决定reducer的个数。

动态调整执行计划

目前咱们支持在运行时动态调整join的策略,在知足条件的状况下,即一张表小于Broadcast阈值,能够将SortMergeJoin转化成BroadcastHashJoin。因为SortMergeJoin和BroadcastHashJoin输出的partition状况并不相同,随意转换可能在下一个stage引入额外的shuffle操做。所以咱们在动态调整join策略时,遵循一个规则,即在不引入额外shuffle的前提下才进行转换。

将SortMergeJoin转化成BroadcastHashJoin有哪些好处呢?由于数据已经shuffle写到磁盘上,咱们仍然须要shuffle读取这些数据。咱们能够看看图5的例子,假设A表和B表join,map阶段2张表各有2个map任务,而且shuffle partition个数为5。若是作SortMergeJoin,在reduce阶段须要启动5个reducer,每一个reducer经过网络shuffle读取属于本身的数据。然而,当咱们在运行时发现B表能够broadcast,而且将其转换成BroadcastHashJoin以后,咱们只须要启动2个reducer,每个reducer读取一个mapper的整个shuffle output文件。当咱们调度这2个reducer任务时,能够优先将其调度在运行mapper的Executor上,所以整个shuffle读变成了本地读取,没有数据经过网络传输。而且读取一个文件这样的顺序读,相比原先shuffle时随机的小文件读,效率也更胜一筹。另外,SortMergeJoin过程当中每每会出现不一样程度的数据倾斜问题,拖慢总体的运行时间。而转换成BroadcastHashJoin后,数据量通常比较均匀,也就避免了倾斜,咱们能够在下文实验结果中看到更具体的信息。

动态处理数据倾斜

在自适应执行的框架下,咱们能够在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,咱们收集该stage每一个mapper 的shuffle数据大小和记录条数。若是某一个partition的数据量或者记录条数超过中位数的N倍,而且大于某个预先配置的阈值,咱们就认为这是一个数据倾斜的partition,须要进行特殊的处理。

假设咱们A表和B表作inner join,而且A表中第0个partition是一个倾斜的partition。通常状况下,A表和B表中partition 0的数据都会shuffle到同一个reducer中进行处理,因为这个reducer须要经过网络拉取大量的数据而且进行处理,它会成为一个最慢的任务拖慢总体的性能。在自适应执行框架下,一旦咱们发现A表的partition 0发生倾斜,咱们随后使用N个任务去处理该partition。每一个任务只读取若干个mapper的shuffle 输出文件,而后读取B表partition 0的数据作join。最后,咱们将N个任务join的结果经过Union操做合并起来。为了实现这样的处理,咱们对shuffle read的接口也作了改变,容许它只读取部分mapper中某一个partition的数据。在这样的处理中,B表的partition 0会被读取N次,虽然这增长了必定的额外代价,可是经过N个任务处理倾斜数据带来的收益仍然大于这样的代价。若是B表中partition 0也发生倾斜,对于inner join来讲咱们也能够将B表的partition 0分红若干块,分别与A表的partition 0进行join,最终union起来。但对于其它的join类型例如Left Semi Join咱们暂时不支持将B表的partition 0拆分。

自适应执行和Spark SQL在100TB上的性能比较

咱们使用99台机器搭建了一个集群,使用Spark2.2在TPC-DS 100TB的数据集进行了实验,比较原版Spark和自适应执行的性能。如下是集群的详细信息:

实验结果显示,在自适应执行模式下,103条SQL中有92条都获得了明显的性能提高,其中47条SQL的性能提高超过10%,最大的性能提高达到了3.8倍,而且没有出现性能降低的状况。另外在原版Spark中,有5条SQL由于OOM等缘由没法顺利运行,在自适应模式下咱们也对这些问题作了优化,使得103条SQL在TPC-DS 100TB数据集上所有成功运行。如下是具体的性能提高比例和性能提高最明显的几条SQL。

经过仔细分析了这些性能提高的SQL,咱们能够看到自适应执行带来的好处。首先是自动设置reducer个数,原版Spark使用10976做为shuffle partition数,在自适应执行时,如下SQL的reducer个数自动调整为1064和1079,能够明显看到执行时间上也提高了不少。这正是由于减小了调度的负担和任务启动的时间,以及减小了磁盘IO请求。

原版Spark:

自适应执行:

在运行时动态调整执行计划,将SortMergeJoin转化成BroadcastHashJoin在某些SQL中也带来了很大的提高。例如在如下的例子中,本来使用SortMergeJoin由于数据倾斜等问题花费了2.5分钟。在自适应执行时,由于其中一张表的大小只有2.5k因此在运行时转化成了BroadcastHashJoin,执行时间缩短为10秒。

原版Spark:

自适应执行:

100 TB的挑战及优化

成功运行TPC-DS 100 TB数据集中的全部SQL,对于Apache Spark来讲也是一大挑战。虽然SparkSQL官方表示支持TPC-DS全部的SQL,但这是基于小数据集。在100TB这个量级上,Spark暴露出了一些问题致使有些SQL执行效率不高,甚至没法顺利执行。在作实验的过程当中,咱们在自适应执行框架的基础上,对Spark也作了其它的优化改进,来确保全部SQL在100TB数据集上能够成功运行。如下是一些典型的问题。

统计map端输出数据时driver单点瓶颈的优化(SPARK-22537)

在每一个map任务结束后,会有一个表示每一个partition大小的数据结构(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回给driver。而在自适应执行中,当一次shuffle的map stage结束后,driver会聚合每一个mapper给出的partition大小信息,获得在各个partition上全部mapper输出的数据总大小。该统计由单线程完成,若是mapper的数量是M,shuffle partition的数量为S,那么统计的时间复杂度在O(M x S) ~ O (M x S x log(M x S)) 之间,当CompressedMapStatus被使用时,复杂度为这个区间的下限,当HighlyCompressedMapStatus被使用时,空间有所节省,时间会更长,在几乎全部的partition数据都为空时,复杂度会接近该区间的上限。

在M x S增大时,咱们会遇到driver上的单点瓶颈,一个明显的表现是UI上map stage和reduce stage之间的停顿。为了解决这个单点瓶颈,咱们将任务尽可能均匀地划分给多个线程,线程之间不相交地为scala Array中的不一样元素赋聚合值。

在这项优化中,新的spark.shuffle.mapOutput.parallelAggregationThreshold(简称threshold)被引入,用于配置使用多线程聚合的阈值,聚合的并行度由JVM中可用core数和M * S / threshold + 1中的小值决定。

Shuffle读取连续partition时的优化 (SPARK-9853)

在自适应执行的模式下,一个reducer可能会从一个mapoutput文件中读取诺干个连续的数据块。目前的实现中,它须要拆分红许多独立的getBlockData调用,每次调用分别从硬盘读取一小块数据,这样就须要不少的磁盘IO。咱们对这样的场景作了优化,使得Spark能够一次性地把这些连续数据块都读上来,这样就大大减小了磁盘的IO。在小的基准测试程序中,咱们发现shuffle read的性能能够提高3倍。

BroadcastHashJoin中避免没必要要的partition读的优化

自适应执行能够为现有的operator提供更多优化的可能。在SortMergeJoin中有一个基本的设计:每一个reducetask会先读取左表中的记录,若是左表的 partition为空,则右表中的数据咱们无需关注(对于非anti join的状况),这样的设计在左表有一些partition为空时能够节省没必要要的右表读取,在SortMergeJoin中这样的实现很天然。

BroadcastHashJoin中不存在按照join key分区的过程,因此缺失了这项优化。然而在自适应执行的一些状况中,利用stage间的精确统计信息,咱们能够找回这项优化:若是SortMergeJoin在运行时被转换成了BroadcastHashJoin,且咱们能获得各个partition key对应partition的精确大小,则新转换成的BroadcastHashJoin将被告知:无需去读那些小表中为空的partition,由于不会join出任何结果。

Baidu真实产品线试用状况

咱们将自适应执行优化应用在Baidu内部基于Spark SQL的即席查询服务BaiduBig SQL之上,作了进一步的落地验证,经过选取单日全天真实用户查询,按照原有执行顺序回放重跑和分析,获得以下几点结论:

1. 对于秒级的简单查询,自适应版本的性能提高并不明显,这主要是由于它们的瓶颈和主要耗时集中在了IO上面,而这不是自适应执行的优化点。

2. 按照查询复杂度维度考量测试结果发现:查询中迭代次数越多,多表join场景越复杂的状况下自适应执行效果越好。咱们简单按照group by, sort, join, 子查询等操做个数来将查询分类,如上关键词大于3的查询有明显的性能提高,优化比从50%~200%不等,主要优化点来源于shuffle的动态并发数调整及join优化。

3. 从业务使用角度来分析,前文所述SortMergeJoin转BroadcastHashJoin的优化在Big SQL场景中命中了多种典型的业务SQL模板,试考虑以下计算需求:用户指望从两张不一样维度的计费信息中捞取感兴趣的user列表在两个维度的总体计费。收入信息原表大小在百T级别,用户列表只包含对应用户的元信息,大小在10M之内。两张计费信息表字段基本一致,因此咱们将两张表与用户列表作inner join后union作进一步分析,SQL表达以下:

1 select t.c1, t.id, t.c2, t.c3, t.c4,  sum(t.num1), sum(t.num2), sum(t.num3) from
2 (
3 select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_a t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4
4 union  all
5 select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_b t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4
6 ) t group by t.c1, t.id, t.c2, t.c3, c4

 

对应的原版Spark执行计划以下:

针对于此类用户场景,能够所有命中自适应执行的join优化逻辑,执行过程当中屡次SortMergeJoin转为BroadcastHashJoin,减小了中间内存消耗及多轮sort,获得了近200%的性能提高。

结合上述3点,下一步自适应执行在Baidu内部的优化落地工做将进一步集中在大数据量、复杂查询的例行批量做业之上,并考虑与用户查询复杂度关联进行动态的开关控制。对于数千台的大规模集群上运行的复杂查询,自适应执行能够动态调整计算过程当中的并行度,能够帮助大幅提高集群的资源利用率。另外,自适应执行能够获取到多轮stage之间更完整的统计信息,下一步咱们也考虑将对应数据及Strategy接口开放给Baidu Spark平台上层用户,针对特殊做业进行进一步的定制化Strategy策略编写。

总结

随着Spark SQL普遍的使用以及业务规模的不断增加,在大规模数据集上遇到的易用性和性能方面的挑战将日益明显。本文讨论了三个典型的问题,包括调整shuffle partition数量,选择最佳执行计划和数据倾斜。这些问题在现有的框架下并不容易解决,而自适应执行能够很好地应对这些问题。咱们介绍了自适应执行的基本架构以及解决这些问题的具体方法。最后咱们在TPC-DS 100TB数据集上验证了自适应执行的优点,相比较原版Spark SQL,103个SQL查询中,90%的查询都获得了明显的性能提高,最大的提高达到3.8倍,而且原先失败的5个查询在自适应执行下也顺利完成。咱们在百度的Big SQL平台也作了进一步的验证,对于复杂的真实查询能够达到2倍的性能提高。总之,自适应执行解决了Spark SQL在大数据规模上遇到的不少挑战,而且很大程度上改善了Spark SQL的易用性和性能,提升了超大集群中多租户多并发做业状况下集群的资源利用率。未来,咱们考虑在自适应执行的框架之下,提供更多运行时能够优化的策略,而且将咱们的工做贡献回馈给社区,也但愿有更多的朋友能够参与进来,将其进一步完善。

 

转载自 http://blog.csdn.net/fl63zv9zou86950w/article/details/79049280

相关文章
相关标签/搜索