spark sql中不少后续操做,如select(),filter()等都是在dataset中定义的。好比select()操做会生成新的Projectection类型的logicPlan,filter会生成Filter类型的logicPlan。dataset中有两大类数据源:一种是format()方法从DataSource子类中读取数据,如cvs、json、txt等格式;另外一种是sql()方法从sql语句来,解析成SparkPlan。最后二者都是经过RDD[internalRow]格式的迭代器来检索数据。java
format()返回的是DataFrameReader对象,而后load()方法返回DataFrame。load()的核心实现代码:算法
@scala.annotation.varargssql
def load(paths: String*): DataFrame = {数据库
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {express
throw new AnalysisException("Hive data source can only be used with tables, you can not " +apache
"read files of Hive data source directly.")编程
}json
sparkSession.baseRelationToDataFrame(数组
DataSource.apply(性能优化
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
SparkSession中:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
抽象类,表明基础的数据源关系,子类通常经过Scan方法来返回RDD[Row]格式的数据。对于json、cvs、txt等数据源都有对应的BaseRelation子类。
一些子类的特性定义的例子,定义在sql/source/interfaces.scala中:
/**
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait TableScan {
def buildScan(): RDD[Row]
}
/**
* A BaseRelation that can eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
/**
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
* The actual filter should be the conjunction of all `filters`,
* i.e. they should be "and" together.
*
* The pushed down filters are currently purely an optimization as they will all be evaluated
* again. This means it is safe to use them with methods that produce false positives such
* as filtering partitions based on a bloom filter.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
逻辑查询关系,基础LeadNode。从BaseRelation生成LogicalRelation。
将BaseRelation生成的LogicalRelation到LogicalPlan生成SparkPlan。这里定义了一系列策略决定若是处理各类不一样的数据源。
最核心和基础的一个方法是BasicOperators,定义了各类LogicalPlan的执行逻辑,以下:
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case logical.Distinct(child) =>
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Intersect(left, right) =>
throw new IllegalStateException(
"logical intersect operator should have been replaced by semi-join in the optimizer")
case logical.Except(left, right) =>
throw new IllegalStateException(
"logical except operator should have been replaced by anti-join in the optimizer")
case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil
case logical.SerializeFromObject(serializer, child) =>
execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
case logical.MapPartitions(f, objAttr, child) =>
execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) =>
execution.MapPartitionsExec(
execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
data, objAttr, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil
case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsWithState(
f, key, value, grouping, data, output, _, _, _, timeout, child) =>
execution.MapGroupsExec(
f, key, value, grouping, data, output, timeout, planLater(child)) :: Nil
case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) =>
execution.CoGroupExec(
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
planLater(left), planLater(right)) :: Nil
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.ProjectExec(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.FilterExec(condition, planLater(child)) :: Nil
case f: logical.TypedFilter =>
execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Union(unionChildren) =>
execution.UnionExec(unionChildren.map(planLater)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
execution.GenerateExec(
generator, join = join, outer = outer, g.qualifiedGeneratorOutput,
planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case logical.RepartitionByExpression(expressions, child, numPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, numPartitions), planLater(child)) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
case h: ResolvedHint => planLater(h.child) :: Nil
case _ => Nil
}
}
数据源基类,cvs、json、txt等都是其扩张子类。统一的方法:
定义了怎么物理执行DataSource,生成InternalRow数据。
一系列子类或者实现类:
名称 |
说明 |
FileSourceScanExec |
读取文件,生成Rdd[InternalRow] 。 里面会调用到具体的DataSource或者BaseRelation的readFile方法来迭代数据源生成InternalRow的迭代器。 内部会调用FileScanRDD来分区读数据 |
RowDataSourceScanExec |
对行执行操做,如Project,选择指定的输出列的操做之类。 |
|
|
主要这两个实现类。
jdbc首先定义了JDBCRelation继承BaseRelation,重写了buildScan方法,该方法经过建立JDBCRDD的方式来返回RDD[Row],JDBCRDD中经过jdbc驱动生成查询sql语句,查询数据库。生成sql语句的时候JDBCRDD会拼接一个sql,包括了过滤条件的解析。
JDBCRDD中有一段compute()方法的定义:
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow]
其中最后这段:
CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
它的做用是将jdbc驱动生成的Iterator封装成新的Iterator,新的Iterator会在最后一条数据处理完成以后执行close()方法,close()方法关闭jdbc链接释放链接资源。
UnsafeRow是InternalRow的子类,是字节数组表示的一行数据,若是字段是整形、浮点等数字型,则unsafeRow中对应的字段直接保存值,若是是字符串、数组等大数据,则保存数据的位置。
UnsafeRow的类说明摘录以下:
/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
*
* Each tuple has three parts: [null bit set] [values] [variable length portion]
*
* The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores
* one bit per field.
*
* In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
* primitive types, such as long, double, or int, we store the value directly in the word. For
* fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
* base address of the row) that points to the beginning of the variable-length field, and length
* (they are combined into a long).
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
全部读取DataSource的实现都经过HadoopFsRelation,它继承BaseRelation。
通常sql操做都是做用 在单行数据上,只要在Iterator上层层嵌套操做运算符就能够了,通常的聚合操做如求和,求平均值也好办,只要在迭代器上添加一个全局累加器变量也能够很轻松的实现,有的聚合操做不是那么容易的,好比排序、链接操做。这里能够重点观察链接操做的数据链,看它的内部实现机制是怎样的。
消除指定的列中的重复行,只保留一份。
定义Deduplicate,指定哪些列是要判断是否重复的(keys):
case class Deduplicate(
keys: Seq[Attribute],
child: LogicalPlan,
streaming: Boolean) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
对应的物理执行计划,在SparkStrateies中:
/**
* Used to plan the streaming deduplicate operator.
*/
object StreamingDeduplicationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case Deduplicate(keys, child, true) =>
StreamingDeduplicateExec(keys, planLater(child)) :: Nil
case _ => Nil
}
}
顺藤摸瓜,看看StreamingDeduplicateExec的关键函数的代码,如下代码在sql/execution/streaming/state/statefulOperatores中:
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
getStateId.operatorId,
getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}
val result = baseIterator.filter { r =>
val row = r.asInstanceOf[UnsafeRow]
val key = getKey(row)
val value = store.get(key)
if (value.isEmpty) {
store.put(key.copy(), StreamingDeduplicateExec.EMPTY_ROW)
numUpdatedStateRows += 1
numOutputRows += 1
true
} else {
// Drop duplicated rows
false
}
}
CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
store.commit()
numTotalStateRows += store.numKeys()
})
}
看看里面用到了哪些关键类或者关键方法:
(1)mapPartitionsWithStateStore
引入StateStore。跟踪代码进去看看StateStore是干什么用的。
在sql/execution/streaming/state/package.scala中,定义了StateStoreOps类,里面有mapPartitionsWithStateStore方法,这里贴出其代码:
/** Map each partition of an RDD along with data in a [[StateStore]]. */
def mapPartitionsWithStateStore[U: ClassTag](
sqlContext: SQLContext,
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType)(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
mapPartitionsWithStateStore(
checkpointLocation,
operatorId,
storeVersion,
keySchema,
valueSchema,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator))(
storeUpdateFunction)
}
/** Map each partition of an RDD along with data in a [[StateStore]]. */
private[streaming] def mapPartitionsWithStateStore[U: ClassTag](
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
val wrappedF = (store: StateStore, iter: Iterator[T]) => {
// Abort the state store in case of error
TaskContext.get().addTaskCompletionListener(_ => {
if (!store.hasCommitted) store.abort()
})
cleanedF(store, iter)
}
new StateStoreRDD(
dataRDD,
wrappedF,
checkpointLocation,
operatorId,
storeVersion,
keySchema,
valueSchema,
sessionState,
storeCoordinator)
}
}
在sql/execution/streaming/state/StateStore.scala中,定义了stateStore.scala。它是用来处理流式Iterator的。好比处理重复行数据等相似操做。
一个StateStore的实现。
去重操做的实现。
还没彻底看完,先说说编程e思想。
定义一种StoreState对象,用于管理数据源的中间存储,每行数据保存到StoreState时能够进行去重处理,而后再从StoreState生成Iterator做为结果数据,完成Distinct之类的聚合操做。性能优化和扩展在StoreState中完成,保证框架的稳定性。
一种内存中保存<Key Value>数据的机制。字节组级别的操做。
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
*
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
* which is guaranteed to exhaust the space.
*
* The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
* probably be using sorting instead of hashing for better cache locality.
*
* The key and values under the hood are stored together, in the following format:
* Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
* Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
* is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
内核中的内存外部排序类,供排序场景时调用。
将Compare接口传给它,它会维护一个内存区域保存要排序的数据,insertRecord方法每次处理一条记录将数据插入到内存中,UnsafeExternalSorter在插入数据时作排序,最终它返回一个Iterator出来表明已经排序好的迭代器。数据在内存中以<Key,Value>的方式保存为字节组。
主要是内存管理,Sort利用Sorter.java和TimSort.java的逻辑来实现。
内存排序算法,使用Alpha-prefix算法。首先他会用到SortComparator。
它使用memoryManager来保存带比较的两个对象,将对象的基础对象和位移(offset)传递给Sorter作比较,比较的结果为1,0,-1分别表明是大于、等于仍是小于。
比较的对象包括对象的内存地址和prefix,若是prefix不等则直接根据prefix的结果做为比较的结果,若是prefix相等,则根据内存地址从memoryManager中取出内存块的首地址和该对象在内存块的offset,而后比较两个内存块。内存块表明的具体对象格式本身来定义,若是解析内存块对象也经过入参recordComparator来指定。
系列聚合操做的实现。
Dataset.agg的实现,大部分是调用dataset.groupby().agg方法。
先看看groupByKey方法
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)
new KeyValueGroupedDataset(
encoderFor[K],
encoderFor[T],
executed,
inputPlan.output,
withGroupingKey.newColumns)
}
对sql添加一个表示聚合函数的Column,而后生成KeyValueGroupedDataset,这是dataset的子类,表示聚合过以后的dataset。
分组函数。
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
val colNames: Seq[String] = col1 +: cols
RelationalGroupedDataset(
toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.GroupByType)
}
groupby操做都是将dataset转换成RelationalGroupedDataset,经过RelationalGroupedDataset来操做数据的,看看RelationalGroupedDataset的关键定义和关键方法。
RelationalGroupedDataset对于GroupByType类型的groupby操做,转换成Aggregate类型的逻辑计划来执行。转换代码以下:
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs ++ aggExprs
} else {
aggExprs
}
val aliasedAgg = aggregates.map(alias)
groupType match {
case RelationalGroupedDataset.GroupByType =>
Dataset.ofRows(
df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.RollupType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.CubeType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.ofRows(
df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
下面再看看里面比较关键的一个方法agg的实现代码:
/**
* (Scala-specific) Compute aggregates by specifying the column names and
* aggregate methods. The resulting `DataFrame` will also contain the grouping columns.
*
* The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
* {{{
* // Selects the age of the oldest employee and the aggregate expense for each department
* df.groupBy("department").agg(
* "age" -> "max",
* "expense" -> "sum"
* )
* }}}
*
* @since 1.3.0
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
strToExpr(expr)(df(colName).expr)
})
}
经过toDF方法将本Dataframe转换成目标Dataframe,将agg方法放在转换函数中。其余各类聚合操做如count,avg,min,max等都是经过调用toDF方法来执行的。toDF方法经过Aggregate来描述逻辑计划(LogicPlan)。所以研究聚合函数的重心实际上是在研究Aggregate的编程实现上面来。
看看Aggregate的主函数和主要逻辑。它的类名是:org.apache.spark.sql.catalyst.plans.logical.Aggregate,定义在文件catalyst/plans/logical/badicLogicalOperators.scala中。
定义以下:
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan)
extends UnaryNode {
。。。。
}
再看看对应的Aggregate的物理计划是怎么定义的。
object Aggregation extends Strategy
在SparkStrateries中定义如何处理Agg类型的逻辑计划。
val aggregateOperator =
if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
} else {
aggregate.AggUtils.planAggregateWithOneDistinct(
groupingExpressions,
functionsWithDistinct,
functionsWithoutDistinct,
resultExpressions,
planLater(child))
}
aggregateOperator
经过AggUtils.planAggregateWithoutDistinct方法来构造对应的SparkPlan物理执行计划。而该planAggregateWithoutDistinct(或planAggregateWithDistinct)方法的主要做用就是判断是应该选择哪一种类型的Agg操做,是HashAggregateExec、ObjectHashAggregateExec仍是SortAggregateExec。
咱们看一个HashAggregateExec的例子就好了,其余的都很相似。
HashAggregateExec的定义和主要方法。
HashAggregateExec用到了TungstenAggregationIterator。
源码说明摘要:
/**
* An iterator used to evaluate aggregate functions. It operates on [[UnsafeRow]]s.
*
* This iterator first uses hash-based aggregation to process input rows. It uses
* a hash map to store groups and their corresponding aggregation buffers. If
* this map cannot allocate memory from memory manager, it spills the map into disk
* and creates a new one. After processed all the input, then merge all the spills
* together using external sorter, and do sort-based aggregation.
*
* The process has the following step:
* - Step 0: Do hash-based aggregation.
* - Step 1: Sort all entries of the hash map based on values of grouping expressions and
* spill them to disk.
* - Step 2: Create an external sorter based on the spilled sorted map entries and reset the map.
* - Step 3: Get a sorted [[KVIterator]] from the external sorter.
* - Step 4: Repeat step 0 until no more input.
* - Step 5: Initialize sort-based aggregation on the sorted iterator.
* Then, this iterator works in the way of sort-based aggregation.
*
* The code of this class is organized as follows:
* - Part 1: Initializing aggregate functions.
* - Part 2: Methods and fields used by setting aggregation buffer values,
* processing input rows from inputIter, and generating output
* rows.
* - Part 3: Methods and fields used by hash-based aggregation.
* - Part 4: Methods and fields used when we switch to sort-based aggregation.
* - Part 5: Methods and fields used by sort-based aggregation.
* - Part 6: Loads input and process input rows.
* - Part 7: Public methods of this iterator.
* - Part 8: A utility function used to generate a result when there is no
* input and there is no grouping expression.
*
* @param groupingExpressions
* expressions for grouping keys
* @param aggregateExpressions
* [[AggregateExpression]] containing [[AggregateFunction]]s with mode [[Partial]],
* [[PartialMerge]], or [[Final]].
* @param aggregateAttributes the attributes of the aggregateExpressions'
* outputs when they are stored in the final aggregation buffer.
* @param resultExpressions
* expressions for generating output rows.
* @param newMutableProjection
* the function used to create mutable projections.
* @param originalInputAttributes
* attributes of representing input rows from `inputIter`.
* @param inputIter
* the iterator containing input [[UnsafeRow]]s.
*/
最终执行都是子类的要么是buildScana生成RDD[Row],要么是readFile生成InternalRow的迭代器。