Spark SQL模块,主要就是处理跟SQL解析相关的一些内容,说得更通俗点就是怎么把一个SQL语句解析成Dataframe或者说RDD的任务。以Spark 2.4.3为例,Spark SQL这个大模块分为三个子模块,以下图所示html
其中Catalyst能够说是Spark内部专门用来解析SQL的一个框架,在Hive中相似的框架是Calcite(将SQL解析成MapReduce任务)。Catalyst将SQL解析任务分红好几个阶段,这个在对应的论文中讲述得比较清楚,本系列不少内容也会参考论文,有兴趣阅读原论文的能够到这里看:Spark SQL: Relational Data Processing in Spark。sql
而Core模块其实就是Spark SQL主要解析的流程,固然这个过程当中会去调用Catalyst的一些内容。这模块里面比较经常使用的类包括SparkSession,DataSet等。shell
至于hive模块,这个不用说,确定跟hive有关的。这个模块在本系列基本不会涉及到,就很少介绍了。数据库
值得一提的是,论文发表的时候仍是在Spark1.x阶段,那个时候SQL解析成词法树用的是scala写的一个解析工具,到2.x阶段改成使用antlr4来作这部分工做(这应该算是最大的改变)。至于为何要改,我猜是出于可读性和易用性方面的考虑,固然这个仅是我的猜想。apache
另外,这一系列会简单介绍一条SQL语句的处理流程,基于spark 2.4.3(sql这个模块在spark2.1后变化不大)。这一篇先从总体介绍Spark SQL出现的背景及解决问题,Dataframe API以及Catalyst的流程大概是怎么样,后面分阶段细说Catalyst的流程。编程
在最先的时候,大规模处理数据的技术是MapReduce,但这种框架执行效率太慢,进行一些关系型处理(如join)须要编写大量代码。后来hive这种框架可让用户输入sql语句,自动进行优化并执行。json
但在大型系统中,任然有两个主要问题,一个是ETL操做须要对接多个数据源。另外一个是用户须要执行复杂分析,好比机器学习和图计算等。但传统的关系型处理系统中较难实现。api
Spark SQL提供了两个子模块来解决这个问题,DataFrame API和Catalyst。缓存
相比于RDD,Dataframe api提供了更加丰富的关系型api,而且能和RDD相互转换,后面Spark机器学习方面的工做重心,也从以RDD为基础的mllib转移到以Dataframe为基础的Spark ML(虽然Dataframe底层也是RDD)。session
另外一个就是Catalyst,经过它能够轻松为诸如机器学习之类的域添加数据源(好比json或经过case class自定义的类型),优化规则和数据类型。
经过这两个模块,Spark SQL主要实现如下目标:
那下面就介绍Dataframe和Catalyst的流程,固然主要讨论的仍是Catalyst。
先来看看论文里面提供的一张图:
这张图能够说明不少,首先Spark的Dataframe API底层也是基于Spark的RDD。但与RDD不一样的在于,Dataframe会持有schema(这个实在很差翻译,能够理解为数据的结构吧),以及能够执行各类各样的关系型操做,好比Select,Filter,Join,Groupby等。从操做上来讲,和pandas的Dataframe有点像(连名字都是同样的)。
同时由于是基于RDD的,因此不少RDD的特性Dataframe都可以享受到,好比说分布式计算中一致性,可靠性方面的保证,以及能够经过cache缓存数据,提升计算性能啊等等。
同时图中页展现了Dataframe能够经过JDBC连接外部数据库,经过控制台操做(spark-shell),或者用户程序。说白了,就是Dataframe能够经过RDD转换而来,也能够经过外部数据表生成。
对了,这里顺便说一句,不少初次接触Spark SQL的童鞋可能会对Dataset和Dataframe这两个东西感到疑惑,在1.x时代它们确实有些差异,不过在spark2.x的时候,这两个API已经统一了。因此基本上Dataset和Dataframe能够当作是等价的东西。
最后仍是结合代码作一下实际的展现吧,以下展现生成一个RDD,而且根据这个RDD生成对应的Dataframe,从中能够看出RDD和Dataframe的区别:
//生成RDD scala> val data = sc.parallelize(Array((1,2),(3,4))) data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> data.foreach(println) (1,2) (3,4) scala> val df = data.toDF("fir","sec") df: org.apache.spark.sql.DataFrame = [fir: int, sec: int] scala> df.show() +---+---+ |fir|sec| +---+---+ | 1| 2| | 3| 4| +---+---+ //跟RDD相比,多了schema scala> df.printSchema() root |-- fir: integer (nullable = false) |-- sec: integer (nullable = false)
Catalyst在论文中被叫作优化器(Optimizer),这部分是论文里面较为核心的内容,不过其实流程仍是蛮好理解的,依旧贴下论文里面的图。
主要流程大概能够分为如下几步:
提早说一下吧,上述流程多数是在org.apache.spark.sql.execution.QueryExecution这个类里面,这个贴一下简单的代码,看看就好,先很少作深究。后面的文章会详细介绍这里的内容。
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其余代码 //analyzer阶段 lazy val analyzed: LogicalPlan = { SparkSession.setActiveSession(sparkSession) sparkSession.sessionState.analyzer.executeAndCheck(logical) } //optimizer阶段 lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) //SparkPlan阶段 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } //prepareForExecution阶段 // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //execute阶段 /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() ......其余代码 }
值得一提的是每一个阶段都使用了lazy懒加载,对这块感兴趣能够看看我以前的文章Scala函数式编程(六) 懒加载与Stream。
上述主要介绍Spark SQL模块内容,其出现的背景以及主要解决问题。然后简单介绍下Dataframe API的内容,以及Spark SQL解析SQL的内部框架Catalyst。后续主要会介绍Catalyst中各个步骤的流程,结合源码来作一些分析。
以上~