Dataset:生成DataFrame的一系列wrap接口,统一的调度接口。有从DataSource生成DataFrame,有从RDD生成DataFrame。node
SparkSession:经过Dataset来建立DataFrame,触发DataFrame的执行等。sql
SparkPlan:物理执行计划express
QueryExecution:执行查询,生成SparkPlanapache
SqlExecution:管理QueryExecution列表,调度执行等。json
ExistingRDD:RDD格式的DataFrame。数组
DataSource:外部数据存储格式,如cvs、jdbc、txt、parquest等存储格式。在SparkSession中format()函数会引用到ExistingRDD或者DataSource。session
建立DataFrame,有三种模式,一种是sql()主要是访问Hive表;一种是从RDD生成DataFrame,主要从ExistingRDD开始建立;还有一种是read/format格式,从json/txt/csv等数据源格式建立。app
先看看第三种方式的建立流程。ide
def read: DataFrameReader = new DataFrameReader(self)函数
SparkSession.read()方法直接建立DataFrameReader,而后再DataFrameReader的load()方法来导入外部数据源。load()方法主要逻辑以下:
/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
*
* @since 1.6.0
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
建立对应数据源类型的DataSource,DataSource解析成BaseRelation,而后经过SparkSession的baseRelationToDataFrame方法从BaseRelation映射生成DataFrame。从BaseRelation建立LogicalRelation,而后调用Dataset.ofRows方法从LogicalRelation建立DataFrame。DataFrame实际就是Dataset。
type DataFrame = Dataset[Row]
baseRelationToDataFrame的定义:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
Dataset.ofRows方法主要是将逻辑计划转换成物理计划,而后生成新的Dataset。
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
SparkSession的执行关键是如何从LogicalPlan生成物理计划。咱们试试跟踪这部分逻辑。
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
Dataset的count()动做触发物理计划的执行,调用物理计划plan的executeCollect方法,该方法实际上会调用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定义。
def executeCollect(): Array[InternalRow] = {
val byteArrayRdd = getByteArrayRdd()
val results = ArrayBuffer[InternalRow]()
byteArrayRdd.collect().foreach { bytes =>
decodeUnsafeRows(bytes).foreach(results.+=)
}
results.toArray
}
须要跟踪下如何从HadoopFsRelation生成物理计划(也就是SparkPlan)
经过FileSourceStrategy来解析。它在FileSourceScanExec上叠加Filter和Projection等操做,看看FileSourceScanExec的定义:
/**
* Physical plan node for scanning data from HadoopFsRelations.
*
* @param relation The file-based relation to scan.
* @param output Output attributes of the scan, including data attributes and partition attributes.
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
。。。
}
它的主要执行代码doExecute()的功能逻辑以下:
protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric("numOutputRows")
unsafeRows.map { r =>
numOutputRows += 1
r
}
}
}
inputRDD有两种方式建立,一是createBucketedReadRDD,二是createNonBucketedReadRDD。二者没有本质的区别,仅仅是文件分区规则的不一样。
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
createNonBucketedReadRDD调用FileScanRDD :
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
SparkPlan是物理执行计划的基类。sql语句经过分析和优化造成logicPlan,logicPlan根据数据源的类型生成具体的sparkPlan。
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
经过SqlParser类的parsePlan方法将sql语句解析成logicPlan,最后经过Dataset.ofRows方法将logicPlan生成Dataframe。
这里存在两种sessionState,一种是Hive的,另外一种是in-memory类型的。这里咱们看看in-memory类型的。文件是spark.sql.internal.SessionState.scala。
SessionState中主要的方法是:
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
从逻辑计划生成QueryExecution,而SparkPlan是QueryPlan的子类。QueryExecution和QueryPlan的关系应该是QueryPlan是物理计划,而QueryExecution是物理计划的实际执行。要注意的一点是SessionState中的createQueryExecution方法是SessionState的构造函数入参传进去的。默认实现代码以下:
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}
直接从logicPlan构造QueryExecution对象。
方法 |
说明 |
doExecute |
构造出RDD[InternalRow],在sparkPlan的子类中具体实现。 总的来讲经过各类datasource或hive来生成迭代器,建立出对应的RDD。 |
executeCollect () |
得到Array[InternalRow]。实际启动job |
executeToIterator |
def executeToIterator(): Iterator[InternalRow] = { getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) } 实际启动job执行了。 |
|
|
|
|
|
|
|
|
从LogicPlan生成QueryExecution,表明了逻辑计划的实际执行。
方法 |
说明 |
analyzed |
分析logicPlan,生成新的分析过的logicPlan |
assertAnalyzed() |
调用analyzed,检查是否分析(sparkSession.sessionState.analyzer.checkAnalysis(analyzed)) |
optimizedPlan |
优化logicPlan,返回优化后的logicPlan |
sparkPlan |
从optimizedPlan规划出最优的sparkPlan |
executedPlan |
从sparkPlan生成等待执行的sparkPlan,调用prepareForExecution方法。入参和返回类型相同。 |
toRdd |
生成RDD[InternalRow],实现代码是直接调用executedPlan.execute()方法。 |
|
|
SparkSession.sql()方法的流程大体以下:
首先从SessionState对sql语句分析生成LogicPlan;
而后用QueryExecution对logicPlan进行各类预处理、包括分析优化等,生成SprkPlan;
最后从优化后的sparkPlan调用execute方法生成RDD[InternalRow]对象,用dataset.ofRows方法从RDD生成Dataframe。
LogicPlan是QueryPlan的子类。
本地Relation,也算是本地LogicPlan的一种。表示内存本地的dataset数据。
逻辑关系,入参是BaseRelation。读cvs,json,txt等文件是经过logicalRelation来构造逻辑计划的,此时cvs,json,txt等都是DataSource,构造BaseRelation。
LogicalRelation继承leafNode,而leafNode继承LogicPlan。其余没有多少额外的逻辑。
注意的是unaryNode也是继承LogicPlan。
基本的逻辑计划处理函数,在解析logicPlan时须要调用到里面一系列逻辑去解析。这里的类都是讲逻辑计划生成新的logicPlan。包括UnaryNode、LeafNode等类型(他们都是LogicPlan的子类)。
方法 |
说明 |
Generate |
生成新的列,是否在原来列的基础上加上新的列。 |
Filter |
过滤。主要逻辑: override protected def validConstraints: Set[Expression] = { val predicates = splitConjunctivePredicates(condition) .filterNot(SubqueryExpression.hasCorrelatedSubquery) child.constraints.union(predicates.toSet) } 将条件变成predicates,添加到child的constraints里。 |
Subquery |
子查询,好像没有更多操做。简单的设置output为child的output |
Project |
主要处理别名,将别名加到child的constraints里。 |
Intersect |
链接操做,将left\right两个logicPlan链接,constraints也链接 |
Except |
排除right的logicPlan,constraints设置为left的constraints |
Union |
链接多个logicPlan |
Sort |
排序。设置了Seq[SortOrder]。 |
Range |
id字段范围? |
Aggregate |
聚合函数类。定义: case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode
|
|
|
sql分析器。在BaseSessionStateBuilder中初始化。调用catalyst的analyzer。
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
HiveOnlyCheck +:
customCheckRules
}
分析器定义一系列的Rule,每一个Rule对logicPlan处理替换生成新的logicPlan。用execution/datasources/rules.scala文件中定义的各类Rule示例。
建立表或者添加数据到表时,检查表结构和sql之间是否匹配,是否存在sql错误(如表分区不存在)等。
截图部分示例代码简单说明其机制:
// When we append data to an existing table, check if the given provider, partition columns,
// bucket spec, etc. match the existing table, and adjust the columns order of the given query
// if necessary.
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
if query.resolved && catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
assert(tableDesc.provider.isDefined)
val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
val existingTable = catalog.getTableMetadata(tableIdentWithDB)
if (existingTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("Saving data into a view is not allowed.")
}
// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
throw new AnalysisException(s"The format of the existing table $tableName is " +
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
s"`${specifiedProvider.getSimpleName}`.")
}
if (query.schema.length != existingTable.schema.length) {
throw new AnalysisException(
s"The column number of the existing table $tableName" +
s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
s"(${query.schema.catalogString})")
}
val resolver = sparkSession.sessionState.conf.resolver
val tableCols = existingTable.schema.map(_.name)
// As we are inserting into an existing table, we should respect the existing schema, preserve
// the case and adjust the column order of the given DataFrame according to it, or throw
// an exception if the column names do not match.
val adjustedColumns = tableCols.map { col =>
query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
}
}
若是sql错误,这个阶段就会抛出异常,不往下执行了。
分析insert into语法,判断列名称是否存在,列数组和内容数组是否数量匹配等,错误时抛出异常。
sql优化器。在BaseSessionStateBuilder中初始化。调用catalyst的optimizer。
protected def optimizer: Optimizer = {
new SparkOptimizer(catalog, conf, experimentalMethods) {
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
}
若是一个sql语句中的DISTINCT类操做,如(distinct,max,min)等做用的col是分区的col。那么对该sql优化,将查询限制在该分区进行。
用到spark-catalyst相关类。
如select col1, max(col2) from tbl1 group by col1。会优化成select col1.partion,max(col2) from tbl1[col1 partion]这样的形式,这里只是示意。
物理计划规划器,从logicPlan中生成多个可供选择的sparkPlan。SparkPlanner继承SparkStrategies,SparkStrategies继承QueryPlanner。
从下面方法建立:
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}
plan方法:
组合条件生成SparkPlan的策略,生成哪一个子类的SparkPlan。
根据logicPlan的类型不一样,生成不一样的UnaryExecNode,它的子类有:
名称 |
说明 |
SortExec |
重写doExecute(): RDD[InternalRow]方法。用sorter迭代器(UnsafeExternalRowSorter)。而UnsafeExternalRowSorter使用了UnsafeExternalSorter类(这是Spark Core中的一个类,用于对迭代器排序:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;) |
FilterExec |
执行过滤。 |
|
|
|
|
|
|
排序迭代器实现。重写doExecute()函数:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = newPredicate(condition, child.output)
predicate.initialize(0)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
过滤操做的物理执行。
各类类型物理执行计划。有ProjectExec、SortExec、FilterExec等等。都重写了SparkPlan的doExecute方法。
名称 |
说明 |
ProjectExec |
输出列 |
FilterExec |
过滤物理计划 |
SampleExec |
取样 |
RangeExec |
范围 |
UnionExec |
联合查询 |
CoalesceExec |
级联返回非空值? |
SubqueryExec |
主要功能是执行child计划 |
重写方法:
protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
project.initialize(index)
iter.map(project)
}
}
调用UnsafeProjection对InternalRow进行映射转化。
如:select id, func(name) from table1 where id>1;
其中id,func(name)的解析就是Projection。id和func(name)能够看出Expression。对每一个Expression调用expression.eval(internalRow)的过程就是Projection,根据本条数据解析表达式,输出最终结果。
入参:Expression和val child sparkPlan
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = newPredicate(condition, child.output)
predicate.initialize(0)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
用newPredicate对InternalRow进行过滤,若是知足条件则输出行数累加器加1.
对数据取样,取出分区数据的一部分数据。
子查询,执行子查询的execute方法
Range查询
联合查询,合并多个查询。
protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
}