深刻研究Spark SQL的Catalyst优化器(原创翻译)

Spark SQL是Spark最新和技术最为复杂的组件之一。它支持SQL查询和新的DataFrame API。Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言特性(例如Scala的 模式匹配quasiquotes)来构建可扩展查询优化器。
咱们最近发布了一篇关于Spark SQL的 论文,该论文将出如今SIGMOD 2015(由Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin和Ali Ghodsi合著)中。在这篇博客文章中,咱们重述该论文的部份内容,解释Catalyst优化器的内部功能以实现更普遍的应用。
为了实现Spark SQL,咱们设计了一个新的可扩展优化器Catalyst,它基于Scala中的函数式编程结构。 Catalyst的可扩展设计有两个目的。首先,咱们但愿可以轻松地为Spark SQL添加新的优化技术和功能,尤为是为了解决咱们在使用大数据时遇到的各类问题(例如,半结构化数据和高级分析)。其次,咱们但愿使外部开发人员可以扩展优化器 - 例如,经过添加数据源特定规则,能够将过滤或聚合的数据推送到外部存储系统,或者支持新的数据类型。 Catalyst支持基于规则和基于成本的优化。
Catalyst的核心是使用一个通用库生成树并使用规则操做这些树。在该框架的基础上,构建了用于关系查询处理库(例如表达式,逻辑查询计划)和处理执行查询不一样阶段的几组规则:分析、逻辑优化、物理计划和代码生成,代码生成将部分查询编译为Java字节码。对于后者,使用了Scala特性quasiquotes,它能够很容易地在运行时由组合表达式生成代码。最后,Catalyst提供了几个公共扩展点,包括外部数据源和用户定义的类型。

Catalyst中的主要数据类型是由节点对象组成的树。 每一个节点都有一个节点类型和零个或多个子节点。 新的节点类型在Scala中定义为TreeNode类的子类。 这些对象是不可变的,并可使用函数转换来操做,以下一小节所讨论的。
一个简单的例子,使用很是简单的表达式语言描述三个节点类:
  • Literal(值:Int):常数值
  • Attribute(名称:String):输入行的属性,例如“x”
  • Add(左:TreeNode,右:TreeNode):两个表达式的总和。
这些类能够用来构建树; 例如,表达式x +(1 + 2)的树将在Scala代码中表示以下:
1 Add(Attribute(x), Add(Literal(1), Literal(2)))

规则html

可使用规则来操做树,这些规则是从一棵树到另外一棵树的函数。虽然规则能够在其输入树上运行任意代码(由于该树只是一个Scala对象),但最多见的方法是使用一组模式匹配函数来查找和替换具备特定结构的子树。
模式匹配是许多函数式语言的一个特性,它容许从代数数据类型的潜在嵌套结构中提取值。在Catalyst中,树提供了一种转换方法,该方法递归地在树的全部节点上应用模式匹配函数,将每一个模式匹配转换为结果。例如,咱们能够实现一个在常量之间相加???Add操做的规则,以下所示:
1 tree.transform {
2   case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
3 }

将此应用于x +(1 + 2)的树会产生新的树x + 3。这里关键是使用了Scala的标准模式匹配语法,它可用于匹配对象的类型和为提取的值(这里为c1和c2)提供名称。node

传递给变换的模式匹配表达式是一个部分函数,​​这意味着它只须要匹配全部输入树的子集。 Catalyst将测试规则适用树的哪些部分,自动跳过并降低到不匹配的子树。这种能力意味着规则只需对给定适用优化的树进行推理,而对那些不适用的数不进行推理。所以,当新的操做符新增到系统中时,这些规则不须要修改。
规则(和通常的Scala模式匹配)能够在同一个变换调用中匹配多个模式,这使得一次实现多个转换来得很是简洁。
1 tree.transform {
2   case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
3   case Add(left, Literal(0)) => left
4   case Add(Literal(0), right) => right
5 }

实际上,规则可能须要屡次执行才能彻底转换树。Catalyst将规则造成批处理,并执行每一个批处理至固定点,该固定点是树应用其规则后不发生改变。虽然规则运行到固定点意味着每一个规则是简单且自包含,但这些规则仍会对树上产生较大的全局效果。在上面的例子中,重复的应用规则会持续折叠较大的树,好比(x + 0)+(3 + 3)。另外一个例子,第一个批处理能够分析全部属性指定类型的表达式,而第二批处理可以使用这些类型来进行常量折叠。在每批处理完毕后,开发人员还能够对新树进行规范性检查(例如,查看全部属性为指定类型),这些检查通常使用递归匹配来编写。git

最后,规则条件及其自己能够包含任意的Scala代码。这使得Catalyst比领域特定语言在优化器上更强大,同时保持简洁特性。
根据经验,对不可变树的函数转换使得整个优化器很是易于推理和调试。规则也支持在优化器中并行化,尽管该特性尚未利用这个。

在Spark SQL中使用Catalyst

Catalyst的通用树转换框架分为四个阶段,以下所示:(1)分析解决引用的逻辑计划,(2)逻辑计划优化,(3)物理计划,(4)代码生成用于编译部分查询生成Java字节码。 在物理规划阶段,Catalyst可能会生成多个计划并根据成本进行比较。 全部其余阶段彻底是基于规则的。 每一个阶段使用不一样类型的树节点; Catalyst包括用于表达式、数据类型以及逻辑和物理运算符的节点库。 这些阶段以下所示:

解析

sales”中列的类型,甚至列名是否有效,在查询表sale元数据以前这些都是未知的。若是不知道它的类型或没有将它匹配到输入表(或别名)时,那么该属性称为未解析。Spark SQL使用Catalyst规则和记录全部表元数据的Catalog对象来解析这些属性的。构建具备未绑定属性和数据类型的“未解析的逻辑计划”树后,而后执行如下规则:
一、从Catalog中查找名称关系
二、将命名属性(如col)映射到操做符的子项
三、将那些属性引用相同的值给它们一个惟一的ID(随后遇到如col=col时能够进行优化)
四、经过表达式传递和强制类型:例如,咱们没法知道1+col的返回类型,直到解析出col并将其子表达式转换为兼容类型。
通过统计,解析器的规则大约有 1000行代码

逻辑计划优化

在逻辑优化阶段,逻辑计划应用了标准的基于规则的优化。(基于成本的优化经过规则生成多个计划,而后计算其成原本执行。)这些优化包括常量折叠、谓词下推、项目裁剪、空值传播、布尔表达式简化以及其余规则。总的来讲,为各类状况添加规则很是简单。例如,当咱们将固定精度的DECIMAL类型添加到Spark SQL时,咱们想要以较低精度的方式优化DECIMAL的聚合(例如求和和平均值);只须要12行代码编写一个规则即可在SUM和AVG表达式中找到该数,而后将它们转换为未缩放的64位LONG,而后进行聚合,最后将结果转换回来。这个规则的简化版本,只能优化SUM表达式以下所示:
1 object DecimalAggregates extends Rule[LogicalPlan] {
2   /** Maximum number of decimal digits in a Long */
3   val MAX_LONG_DIGITS = 18
4   def apply(plan: LogicalPlan): LogicalPlan = {
5     plan transformAllExpressions {
6       case Sum(e @ DecimalType.Expression(prec, scale))
7           if prec + 10 <= MAX_LONG_DIGITS =>
8         MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }
9 }

再举一个例子,一个12行代码的规则经过简单的正则表达式将LIKE表达式优化为String.startsWith或String.contains调用。在规则中使用任意Scala代码使得这些优化易于表达,而这些规则超越了子树结构的模式匹配。github

通过统计,逻辑优化规则有 800行代码

物理计划

在物理计划阶段,Spark SQL使用逻辑计划生成一个或多个物理计划,这个过程采用了匹配Spark执行引擎的物理运算符。而后使用成本模型选择计划。目前,基于成本的优化仅用于选择链接算法:对于已知很小的关系,Spark SQL使用Spark中的点对点广播工具进行广播链接。不过,该框架支持更深刻地使用基于成本的优化,由于可使用规则对整棵树进行递归估计。所以,咱们打算在将来实施更加丰富的基于成本的优化。
物理计划还执行基于规则的物理优化,例如将管道项目或过滤器合并到一个Spark映射操做中。另外,它能够将操做从逻辑计划推送到支持谓词或项目下推的数据源。咱们将在后面的章节中描述这些数据源的API。
总的来讲,物理计划规则大约有 500行代码

代码生成

查询优化的最后阶段涉及生成Java字节码用于在每台机器上运行。因为Spark SQL常常在内存数据集上运行,其中处理受CPU限制,咱们但愿Spark SQL支持代码生成以加快执行速度。尽管如此,代码生成引擎的构建一般很复杂,特别是编译器。Catalyst依靠Scala语言的特殊功能quasiquotes来简化代码生成。 Quasiquotes容许在Scala语言中对抽象语法树(AST)进行编程式构建,而后在运行时将其提供给Scala编译器以生成字节码。使用Catalyst将表示SQL表达式的树转换为Scala代码的AST用于描述表达式,而后编译并运行生成的代码。
做为一个简单的例子,参考第4.2节介绍的Add、Attribute和Literal树节点能够写成(x + y)+1表达式。若是没有使用代码生成,这些表达式必须遍历Add、Attribute和Literal节点树行走才能解释每行数据。这会引入大量的分支和虚函数调用,从而减慢执行速度。若是使用了代码生成,能够编写一个函数将特定的表达式树转换为Scala AST,以下所示:
1 def compile(node: Node): AST = node match {
2   case Literal(value) => q"$value"
3   case Attribute(name) => q"row.get($name)"
4   case Add(left, right) => q"${compile(left)} + ${compile(right)}"
5 }

以q开头的字符串是quasiquotes,虽然它们看起来像字符串,但它们在编译时由Scala编译器解析,并表明其代码的AST。 Quasiquotes用$符号表示法将变量或其余AST拼接到它们中。例如,文字(1)将成为1的Scala表达式的AST,而属性(“x”)变为row.get(“x”)。最后,相似Add(Literal(1),Attribute(“x”))的树成为像1 + row.get(“x”)这样的Scala表达式的AST。正则表达式

Quasiquotes在编译时进行类型检查,以确保只替换合适的AST或文字,使得它们比字符串链接更有用,而且直接生成Scala AST,而非在运行时运行Scala语法分析器。此外,它们是高度可组合的,由于每一个节点的代码生成规则不须要知道其子节点返回的树是如何构建的。最后,若是Catalyst缺乏表达式级别的优化,则由Scala编译器对结果代码进行进一步优化。下图显示quasiquotes生成代码其性能相似于手动优化的程序。
咱们发现quasiquotes很是接近于代码生成,而且发现即便是Spark SQL的新贡献者也能够快速为新类型的表达式添加规则。 Quasiquotes也适用于在本地Java对象上运行的目标:当从这些对象访问字段时,能够直接访问所需字段,而没必要将对象复制成Spark SQL 行,并使用行访问器方法。最后,将代码生成的评估与对还没有生成代码的表达式的解释评估结合起来很简单,由于编译的Scala代码能够直接使用到表达式解释器中。
Catalyst生成器总共有大约 700行代码
这篇博客文章介绍了Spark SQL的Catalyst优化器内部原理。 经过这种新颖、简单的设计使Spark社区可以快速创建原型、实现和扩展引擎。 你能够在这里 阅读其他的论文。 
您还能够从如下内容中找到有关Spark SQL的更多信息:
 
 
相关文章
相关标签/搜索