原创文章,转载请务必将下面这段话置于文章开头处。
本文转发自技术世界,原文连接 http://www.jasongj.com/spark/rbo/sql
本文所述内容均基于 2018年9月10日 Spark 最新 Release 2.3.1 版本。后续将持续更新apache
Spark SQL 的总体架构以下图所示
json
从上图可见,不管是直接使用 SQL 语句仍是使用 DataFrame,都会通过以下步骤转换成 DAG 对 RDD 的操做架构
Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan。oop
当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法。该方法分两步post
如今两张表,分别定义以下性能
CREATE TABLE score ( id INT, math_score INT, english_score INT )
CREATE TABLE people ( id INT, age INT, name INT )
对其进行关联查询以下优化
SELECT sum(v) FROM ( SELECT score.id, 100 + 80 + score.math_score + score.english_score AS v FROM people JOIN score ON people.id = score.id AND people.age > 10 ) tmp
生成的 UnresolvedPlan 以下图所示。
ui
从上图可见spa
Spark SQL 解析出的 UnresolvedPlan 以下所示
== Parsed Logical Plan == 'Project [unresolvedalias('sum('v), None)] +- 'SubqueryAlias tmp +- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#493] +- 'Filter (('people.id = 'score.id) && ('people.age > 10)) +- 'Join Inner :- 'UnresolvedRelation `people` +- 'UnresolvedRelation `score`
从 Analyzer 的构造方法可见
class Analyzer( catalog: SessionCatalog, conf: SQLConf, maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis {
Analyzer 包含了以下的转换规则
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveTimeZone(conf) :: ResolvedUuidExpressions :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
例如, ResolveRelations 用于分析查询用到的 Table 或 View。本例中 UnresolvedRelation (people) 与 UnresolvedRelation (score) 被解析为 HiveTableRelation (json.people) 与 HiveTableRelation (json.score),并列出其各自包含的字段名。
经 Analyzer 分析后获得的 Resolved Logical Plan 以下所示
== Analyzed Logical Plan == sum(v): bigint Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L] +- SubqueryAlias tmp +- Project [id#500, (((100 + 80) + math_score#501) + english_score#502) AS v#493] +- Filter ((id#496 = id#500) && (age#497 > 10)) +- Join Inner :- SubqueryAlias people : +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498] +- SubqueryAlias score +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]
Analyzer 分析先后的 LogicalPlan 对好比下
由上图可见,分析后,每张表对应的字段集,字段类型,数据存储位置都已肯定。Project 与 Filter 操做的字段类型以及在表中的位置也已肯定。
有了这些信息,已经能够直接将该 LogicalPlan 转换为 Physical Plan 进行执行。
可是因为不一样用户提交的 SQL 质量不一样,直接执行会形成不一样用户提交的语义相同的不一样 SQL 执行效率差距甚远。换句话说,若是要保证较高的执行效率,用户须要作大量的 SQL 优化,使用体验大大下降。
为了尽量保证不管用户是否熟悉 SQL 优化,提交的 SQL 质量如何, Spark SQL 都能以较高效率执行,还需在执行前进行 LogicalPlan 优化。
Spark SQL 目前的优化主要是基于规则的优化,即 RBO (Rule-based optimization)
PushdownPredicate
PushdownPredicate 是最多见的用于减小参与计算的数据量的方法。
前文中直接对两表进行 Join 操做,而后再 进行 Filter 操做。引入 PushdownPredicate 后,可先对两表进行 Filter 再进行 Join,以下图所示。
当 Filter 可过滤掉大部分数据时,参与 Join 的数据量大大减小,从而使得 Join 操做速度大大提升。
这里须要说明的是,此处的优化是 LogicalPlan 的优化,从逻辑上保证了将 Filter 下推后因为参与 Join 的数据量变少而提升了性能。另外一方面,在物理层面,Filter 下推后,对于支持 Filter 下推的 Storage,并不须要将表的全量数据扫描出来再过滤,而是直接只扫描符合 Filter 条件的数据,从而在物理层面极大减小了扫描表的开销,提升了执行速度。
ConstantFolding
本文的 SQL 查询中,Project 部分包含了 100 + 800 + match_score + english_score 。若是不进行优化,那若是有一亿条记录,就会计算一亿次 100 + 80,很是浪费资源。所以可经过 ConstantFolding 将这些常量合并,从而减小没必要要的计算,提升执行速度。
ColumnPruning
在上图中,Filter 与 Join 操做会保留两边全部字段,而后在 Project 操做中筛选出须要的特定列。若是能将 Project 下推,在扫描表时就只筛选出知足后续操做的最小字段集,则能大大减小 Filter 与 Project 操做的中间结果集数据量,从而极大提升执行速度。
这里须要说明的是,此处的优化是逻辑上的优化。在物理上,Project 下推后,对于列式存储,如 Parquet 和 ORC,可在扫描表时就只扫描须要的列而跳过不须要的列,进一步减小了扫描开销,提升了执行速度。
通过如上优化后的 LogicalPlan 以下
== Optimized Logical Plan == Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L] +- Project [((180 + math_score#501) + english_score#502) AS v#493] +- Join Inner, (id#496 = id#500) :- Project [id#496] : +- Filter ((isnotnull(age#497) && (age#497 > 10)) && isnotnull(id#496)) : +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498] +- Filter isnotnull(id#500) +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]
获得优化后的 LogicalPlan 后,SparkPlanner 将其转化为 SparkPlan 即物理计划。
本例中因为 score 表数据量较小,Spark 使用了 BroadcastJoin。所以 score 表通过 Filter 后直接使用 BroadcastExchangeExec 将数据广播出去,而后结合广播数据对 people 表使用 BroadcastHashJoinExec 进行 Join。再通过 Project 后使用 HashAggregateExec 进行分组聚合。
至此,一条 SQL 从提交到解析、分析、优化以及执行的完整过程就介绍完毕。
本文介绍的 Optimizer 属于 RBO,实现简单有效。它属于 LogicalPlan 的优化,全部优化均基于 LogicalPlan 自己的特色,未考虑数据自己的特色,也未考虑算子自己的代价。下文将介绍 CBO,它充分考虑了数据自己的特色(如大小、分布)以及操做算子的特色(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。