Spark 源码系列(九)Spark SQL 初体验之解析过程详解

很久没更新博客了,以前学了一些 R 语言和机器学习的内容,作了一些笔记,以后也会放到博客上面来给你们共享。一个月前就打算更新 Spark Sql 的内容了,由于一些别的事情耽误了,今天就简单写点,Spark1.2 立刻就要出来了,不知道变更会不会很大,听说添加了不少的新功能呢,期待中...html

首先声明一下这个版本的代码是 1.1 的,以前讲的都是 1.0 的。sql

Spark 支持两种模式,一种是在 spark 里面直接写 sql,能够经过 sql 来查询对象,相似. net 的 LINQ 同样,另一种支持 hive 的 HQL。不论是哪一种方式,下面提到的步骤都会有,不一样的是具体的执行过程。下面就说一下这个过程。数据库

Sql 解析成 LogicPlanapache

使用 Idea 的快捷键 Ctrl + Shift + N 打开 SQLQuerySuite 文件,进行调试吧。api

def sql(sqlText: String): SchemaRDD = {
    if (dialect == "sql") {
      new SchemaRDD(this, parseSql(sqlText))
    } else {
      sys.error(s"Unsupported SQL dialect: $dialect")
    }
  }
复制代码

从这里能够看出来,第一步是解析 sql,最后把它转换成一个 SchemaRDD。点击进入 parseSql 函数,发现解析 Sql 的过程在 SqlParser 这个类里面。 在 SqlParser 的 apply 方法里面,咱们能够看到 else 语句里面的这段代码。缓存

//对input进行解析,符合query的模式的就返回Success
      phrase(query)(new lexical.Scanner(input)) match {
        case Success(r, x) => r
        case x => sys.error(x.toString)
      }
复制代码

这里咱们主要关注 query 就能够。app

protected lazy val query: Parser[LogicalPlan] = (
    select * (
        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
        INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
        EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
        UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
      )
    | insert | cache
  )
复制代码

这里面有不少看不懂的操做符,请到下面这个网址里面去学习。这里能够看出来它目前支持的 sql 语句只是 select 和 insert。机器学习

www.scala-lang.org/api/2.10.4/…ide

咱们继续查看 select。函数

// ~>只保留右边的模式 opt可选的 ~按顺序合成 <~只保留左边的
  protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> opt(DISTINCT) ~ projections ~
    opt(from) ~ opt(filter) ~
    opt(grouping) ~
    opt(having) ~
    opt(orderBy) ~
    opt(limit) <~ opt(";") ^^ {
      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
        val base = r.getOrElse(NoRelation)
        val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
        val withProjection =
          g.map {g =>
            Aggregate(assignAliases(g), assignAliases(p), withFilter)
          }.getOrElse(Project(assignAliases(p), withFilter))
        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
        val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
        val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
        val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
        withLimit
  }
复制代码

能够看得出来它对 sql 的解析是和咱们经常使用的 sql 写法是一致的,这里面再深刻下去还有递归,并非看起来那么好理解。这里就不继续讲下去了,在解析 hive 的时候我会重点讲一下,我认为目前你们使用得更可能是仍然是来源于 hive 的数据集,毕竟 hive 那么稳定。

到这里咱们能够知道第一步是经过 Parser 把 sql 解析成一个 LogicPlan。

LogicPlan 到 RDD 的转换过程

好,下面咱们回到刚才的代码,接着咱们应该看 SchemaRDD。

override def compute(split: Partition, context: TaskContext): Iterator[Row] =
    firstParent[Row].compute(split, context).map(_.copy())

  override def getPartitions: Array[Partition] = firstParent[Row].partitions

  override protected def getDependencies: Seq[Dependency[_]] =
    List(new OneToOneDependency(queryExecution.toRdd))
复制代码

SchemaRDD 是一个 RDD 的话,那么它最重要的 3 个属性:compute 函数,分区,依赖全在这里面,其它的函数咱们就不看了。

挺奇怪的是,咱们 new 出来的 RDD,怎么会有依赖呢,这个 queryExecution 是啥,点击进去看看吧,代码跳转到 SchemaRDD 继承的 SchemaRDDLike 里面。

lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }
复制代码

把这两段很短的代码都放一块儿了,executePlan 方法就是 new 了一个 QueryExecution 出来,那咱们继续看看 QueryExecution 这个类吧。

lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
    lazy val optimizedPlan = optimizer(analyzed)
    lazy val sparkPlan = {
      SparkPlan.currentContext.set(self)
      planner(optimizedPlan).next()
    }
    // 在须要的时候加入Shuffle操做
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
    lazy val toRdd: RDD[Row] = executedPlan.execute()
复制代码

从这里能够看出来 LogicPlan 是通过了 5 个步骤的转换,要被 analyzer 和 optimizer 的处理,而后转换成 SparkPlan,在执行以前还要被 prepareForExecution 处理一下,最后调用 execute 方法转成 RDD.

下面咱们分步讲这些个东东究竟是干啥了。

首先咱们看看 Anayzer,它是继承自 RuleExecutor 的,这里插句题外话,Spark sql 的做者 Michael Armbrust 在 2013 年的 Spark Submit 上介绍 Catalyst 的时候,就说到要从总体地去优化一个 sql 的执行是很困难的,全部设计成这种基于一个一个小规则的这种优化方式,既简单又方便维护。

好,咱们接下来看看 RuleExecutor 的 apply 方法。

def apply(plan: TreeType): TreeType = {
    var curPlan = plan
    //规则还分批次的,分批对plan进行处理
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        //用batch种的小规则从左到右挨个对plan进行处理
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val result = rule(plan)
            result
        }
        iteration += 1
        //超过了规定的迭代次数就要退出的
        if (iteration > batch.strategy.maxIterations) {
              continue = false
        }
        //通过处理成功的plan是会发生改变的,若是和上一次处理接触的plan同样,这说明已经没有优化空间了,能够结束,这个就是前面提到的Fixed point
        if (curPlan.fastEquals(lastPlan)) {
          continue = false
        }
        lastPlan = curPlan
      }
    }

    curPlan
  }
复制代码

看完了 RuleExecutor,咱们继续看 Analyzer,下面我只贴出来 batches 这块的代码,剩下的要本身去看了哦。

val batches: Seq[Batch] = Seq(
    //碰到继承自MultiInstanceRelations接口的LogicPlan时,发现id之后重复的,就强制要求它们生成一个新的全局惟一的id
    //涉及到InMemoryRelation、LogicRegion、ParquetRelation、SparkLogicPlan
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    //若是大小写不敏感就把属性都变成小写
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    //这个牛逼啊,竟然想迭代100次的。
    Batch("Resolution", fixedPoint,
      //解析从子节点的操做生成的属性,通常是别名引发的,好比a.id
      ResolveReferences ::
      //经过catalog解析表名
      ResolveRelations ::
      //在select语言里,order by的属性每每在前面没写,查询的时候也须要把这些字段查出来,排序完毕以后再删除
      ResolveSortReferences ::
      //前面讲过了
      NewRelationInstances ::
      //清除被误认为别名的属性,好比sum(score) as a,其实它应该是sum(score)才对
      //它被解析的时候解析成Project(Seq(Alias(g: Generator, _)),直接返回Generator就能够了
      ImplicitGenerate ::
      //处理语句中的*,好比select *, count(*)
      StarExpansion ::
      //解析函数
      ResolveFunctions ::
      //解析全局的聚合函数,好比select sum(score) from table
      GlobalAggregates ::
      //解析having子句后面的聚合过滤条件,好比having sum(score) > 400
      UnresolvedHavingClauseAttributes ::
      //typeCoercionRules是hive的类型转换规则
      typeCoercionRules :_*),
    //检查全部节点的属性是否都已经处理完毕了,若是还有没解析出来的属性,这里就会报错!
    Batch("Check Analysis", Once,
      CheckResolution),
    //清除多余的操做符,如今是Subquery和LowerCaseSchema,
    //第一个是子查询,第二个HiveContext查询树里面把子节点所有转换成小写
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )
复制代码

能够看得出来 Analyzer 是把 Unresolved 的 LogicPlan 解析成 resolved 的,解析里面的表名、字段、函数、别名什么的。

咱们接着看 Optimizer, 从单词上看它是用来作优化的,可是从代码上来看它更多的是为了过滤咱们写的一些垃圾语句,并无作什么实际的优化。

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
      //递归合并相邻的两个limit
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
    Batch("ConstantFolding", FixedPoint(100),
      //替换null值
      NullPropagation,
      //替换一些简单的常量表达式,好比 1 in (1,2) 直接返回一个true就能够了
      ConstantFolding,
      //简化like语句,避免全表扫描,目前支持'%demo%', '%demo','demo*','demo'
      LikeSimplification,
      //简化过滤条件,好比true and score > 0 直接替换成score > 0
      BooleanSimplification,
      //简化filter,好比where 1=1 或者where 1=2,前者直接去掉这个过滤,后者这个查询就不必作了
      SimplifyFilters,
      //简化转换,好比两个比较字段的数据类型是同样的,就不须要转换了
      SimplifyCasts,
      //简化大小写转换,好比Upper(Upper('a'))转为认为是Upper('a')
      SimplifyCaseConversionExpressions) ::
    Batch("Filter Pushdown", FixedPoint(100),
      //递归合并相邻的两个过滤条件
      CombineFilters,
      //把从表达式里面的过滤替换成,先作过滤再取表达式,而且掉过滤里面的别名属性
      //典型的例子 select * from (select a,b from table) where a=1
      //替换成select * from (select a,b from table where a=1)
      PushPredicateThroughProject,
      //把join的on条件中能够在原表当中作过滤的先作过滤
      //好比select a,b from x join y on x.id = y.id and x.a >0 and y.b >0
      //这个语句能够改写为 select a,b from x where x.a > 0 join (select * from y where y.b >0) on x.id = y.id
      PushPredicateThroughJoin,
      //去掉一些用不上的列
      ColumnPruning) :: Nil
}
复制代码

真是用心良苦啊,看来咱们写 sql 的时候仍是要注意一点的,你看人家花多大的功夫来优化咱们的烂 sql。要是我确定不优化。写得烂就慢去吧!

接下来,就改看这一句了 planner(optimizedPlan).next() 咱们先看看 SparkPlanner 吧。

protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext: SparkContext = self.sparkContext

    val sqlContext: SQLContext = self

    def codegenEnabled = self.codegenEnabled

    def numPartitions = self.numShufflePartitions
    //把LogicPlan转换成实际的操做,具体操做类在org.apache.spark.sql.execution包下面
    val strategies: Seq[Strategy] =
      //把cache、set、expain命令转化为实际的Command
      CommandStrategy(self) ::
      //把limit转换成TakeOrdered操做
      TakeOrdered ::
      //名字有点蛊惑人,就是转换聚合操做
      HashAggregation ::
      //left semi join只显示链接条件成立的时候链接左边的表的信息
      //好比select * from table1 left semi join table2 on(table1.student_no=table2.student_no);
      //它只显示table1中student_no在表二当中的信息,它能够用来替换exist语句
      LeftSemiJoin ::
      //等值链接操做,有些优化的内容,若是表的大小小于spark.sql.autoBroadcastJoinThreshold设置的字节
      //就自动转换为BroadcastHashJoin,即把表缓存,相似hive的map join(顺序是先判断右表再判断右表)。
      //这个参数的默认值是10000
      //另外作内链接的时候还会判断左表右表的大小,shuffle取数据大表不动,从小表拉取数据过来计算
      HashJoin ::
      //在内存里面执行select语句进行过滤,会作缓存
      InMemoryScans ::
      //和parquet相关的操做
      ParquetOperations ::
      //基本的操做
      BasicOperators ::
      //没有条件的链接或者内链接作笛卡尔积
      CartesianProduct ::
      //把NestedLoop链接进行广播链接
      BroadcastNestedLoopJoin :: Nil
      ......  
}
复制代码

这一步是把逻辑计划转换成物理计划,或者说是执行计划了,里面有不少概念是我之前没听过的,网上查了一下才知道,原来数据库的执行计划还有那么多的说法,这一块须要是专门研究数据库的人比较了解了。剩下的两步就是 prepareForExecution 和 execute 操做。

prepareForExecution 操做是检查物理计划当中的 Distribution 是否知足 Partitioning 的要求,若是不知足的话,须要从新弄作分区,添加 shuffle 操做,这块暂时没咋看懂,之后还须要仔细研究。最后调用 SparkPlan 的 execute 方法,这里面稍微讲讲这块的树型结构。

img

sql 解析出来就是一个二叉树的结构,不论是逻辑计划仍是物理计划,都是这种结构,因此在代码里面能够看到 LogicPlan 和 SparkPlan 的具体实现类都是有继承上面图中的三种类型的节点的。

非 LeafNode 的 SparkPlan 的 execute 方法都会有这么一句 child.execute(),由于它须要先执行子节点的 execute 来返回数据,执行的过程是一个先序遍历。

最后把这个过程也用一个图来表示吧,方便记忆。

img

(1) 经过一个 Parser 来把 sql 语句转换成 Unresolved LogicPlan,目前有两种 Parser,SqlParser 和 HiveQl。

(2) 经过 Analyzer 把 LogicPlan 当中的 Unresolved 的内容给解析成 resolved 的,这里面包括表名、函数、字段、别名等。

(3) 经过 Optimizer 过滤掉一些垃圾的 sql 语句。

(4) 经过 Strategies 把逻辑计划转换成能够具体执行的物理计划,具体的类有 SparkStrategies 和 HiveStrategies。

(5) 在执行前用 prepareForExecution 方法先检查一下。

(6) 先序遍历,调用执行计划树的 execute 方法。

相关文章
相关标签/搜索