Spark SQL原理解析前言:html
Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述git
Spark SQL源码解析(二)Antlr4解析Sql并生成树sql
Spark SQL源码解析(三)Analysis阶段分析数据库
前面已经介绍了SQL parse,将一条SQL语句使用antlr4解析成语法树并使用访问者模式生成Unresolved LogicalPlan,而后是Analysis阶段将Unresolved LogicalPlan转换成Resolved LogicalPlan。这一篇咱们介绍Optimization阶段,和生成Physical Planning阶段。express
通过这两个阶段后,就差很少要到最后转换成Spark的RDD任务了。apache
先来看看Logical Optimization阶段。app
上一篇咱们讨论了Analysis阶段如何生成一个真正的Logical Plan树。这一阶段听名字就知道是优化阶段,Spark SQL中有两个部分的优化,第一部分就是这里,是rule-base阶段的优化,就是根据各类关系代数的优化规则,对生成的Logical Plan适配,匹配到就进行相应的优化逻辑。这些规则大概有:投影消除,constant folding,替换null值,布尔表达式简化等等。固然大部分规则细节我也不是很清楚,仅仅能从名字推断一二。这框架
同时还能够添加本身的优化rule,也比较容易实现,论文中就给出了一段自定义优化rule的代码:ide
object DecimalAggregates extends Rule[LogicalPlan] { /** Maximum number of decimal digits in a Long */ val MAX_LONG_DIGITS = 18 def apply(plan: LogicalPlan): LogicalPlan = { plan transformAllExpressions { case Sum(e @ DecimalType.Expression(prec , scale)) if prec + 10 <= MAX_LONG_DIGITS => MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) } }
这段代码的大意是自定义了一个rule,若是匹配到SUM的表达式,那就执行相应的逻辑,论文里描述这里是找到对应的小数并将其转换为未缩放的64位LONG。具体逻辑看不是很明白不过不重要,重要的是编写本身的优化rule很方便就是。源码分析
顺便点一下另外一种优化,名字叫作cost-base优化(CBO),是发生在Physical Planning阶段的,这里就先卖个关子,后面说到的时候再讨论吧。
而后看到源码的时候,会发现Optimizer这个类也是继承自RuleExecutor,继承这个类以后的流程基本都是同样的。前面分析Analysis阶段的时候已经有详细介绍过这个流程,这里就不展开说了。
其实这优化器的重点应该是各类优化规则,这里我以为更多的是设计到关系代数表达式优化理论方面的知识,这部分我也不甚精通,因此也就不说了。对这块感兴趣的童鞋能够看看网上别人的文章,这里顺便列几个可能有帮助的博客,
下面仍是来看看最开始的例子进行Optimization阶段后会变成什么样吧,先看看以前的示例代码:
val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") val queryCaseWhen = sql("select key from src ")
而后在Optimization优化阶段后,变成了:
Project [_1#2 AS key#5] +- LocalRelation [_1#2, _2#3]
好吧,看起来没什么变化,与Analysis阶段相比,也就少了个SubqueryAlias ,符合预期。不过也对,就一条SELECT语句能优化到哪去啊。
相比较于Logical Plan,Physical plan算是Spark能够去执行的东西了,固然本质上它也是一棵树。
前面说到,Spark有一种cost-based的优化。主要就在这一阶段,在这一阶段,会生成一个或多个Physical Plan,而后使用cost model预估各个Physical Plan的处理性能,最后选择一个最优的Physical Plan。这里最主要优化的是join操做,当触发join操做的时候,会根据左右两边的数据集判断,而后决定使用Broadcast join,仍是传统的Hash join,抑或是MergeSort join,有关这几种join的区别这里就不详细解释了,有兴趣童鞋能够百度看看。
除了cost-based优化,这一阶段也依旧会有rule-based优化,因此说RuleExecutor这个类是很重要的,前面提到的Analysis阶段也好,Optimization阶段也好,包括这里的Physical Plan阶段,只要是涉及到rule-based优化,都会跟RuleExecutor这个类扯上关系。固然这样无疑是极大使用了面向对象的特性,不一样的阶段编写不一样的rule就行,一次编写,处处复用。
首先是在QueryExecution中调度,
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其余代码 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } ......其余代码 }
这里的planner是org.apache.spark.sql.execution.SparkPlanner这个类,而这个类继承自org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父类QueryPlanner中实现的。和RuleExecution相似,QueryPlanner中有一个返回Seq[GenericStrategy[PhysicalPlan]]的方法:def strategies: Seq[GenericStrategy[PhysicalPlan]],这个方法会在子类(也就是SparkPlanner)重写,而后被QueryPlanner的plan()方法调用。
咱们来看看SparkPlanner中strategies方法的重写,再来看QueryPlanner的plan()方法吧。
class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, val experimentalMethods: ExperimentalMethods) extends SparkStrategies { ......其余代码 override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( PythonEvals :: DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: Window :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil) ......其余代码
strategies()返回策略列表,是生成策略GenericStrategy,这是个具体的抽象类,位于org.apache.spark.sql.catalyst.planning包。所谓生成策略,就是决定若是根据Logical Plan生成Physical Plan的策略。好比上面介绍的join操做能够生成Broadcast join,Hash join,抑或是MergeSort join,就是一种生成策略,具体的类就是上面代码中的JoinSelection。每一个生成策略GenericStrategy都是object,其apply()方法返回的是Seq[SparkPlan],这里的SparkPlan就是PhysicalPlan(注意:下文会将SparkPlan和PhysicalPlan混着用)。
明白了生成策略后,就能够来看看QueryPlanner的plan()方法了。
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { ......其余代码 def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... // Collect physical plan candidates. val candidates = strategies.iterator.flatMap(_(plan)) //迭代调用并平铺,变成Iterator[SparkPlan] // The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p.eq(placeholder) => childPlan } } } } } } val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned } ......其余代码 }
这里的流程其实不难,主要工做其实就是调用各个生成策略GenericStrategy的apply()方法,生成Iterator[SparkPlan]。后面很大部分代码是处理占位符,按个人理解,在生成Logical Plan的时候,可能有些无心义的占位符,这种须要使用子节点替换调它。倒数第三行prunePlans()方法按注释说是用来去掉bad plan的,但看实际代码只是原封不动返回。
这样最终就获得一个Iterator[SparkPlan],每一个SparkPlan就是可执行的物理操做了。
大体流程就是如此,固然具体到一些生成策略没有细说,包括输入源策略,聚合策略等等,每个都蛮复杂的,这里就不细说,有兴趣能够自行查阅。
对了,最后还要看看示例代码到这一步变成什么样了,先上示例代码:
//生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //调用spark.sql val queryCaseWhen = sql("select key from src ")
通过Physical Planning阶段后,变成以下:
Project [_1#2 AS key#5] +- LocalTableScan [_1#2, _2#3]
对比上面的optimized阶段,直观看就是LocalRelation变成LocalTableScan。变得更加具体了,但实际上,Project也变了,虽然打印名字相同,但一个的类型是Project,本质上是LogicalPlan。而一个是ProjectExec,本质上是SparkPlan(也就是PhysicalPlan)。这一点经过断点看的更清楚。
到这一步已经很解决终点了,后面再通过一个Preparations阶段就能生成RDD了,剩下的部分留待下篇介绍吧。
以上~