做者:
王道远,花名健身,阿里云EMR技术专家,Apache Spark活跃贡献者,主要关注大数据计算优化相关工做。sql
Hive以及Spark SQL等大数据计算引擎为咱们操做存储在HDFS上结构化数据提供了易于上手的SQL接口,大大下降了ETL等操做的门槛,也所以在实际生产中有着普遍的应用。SQL是非过程化语言,咱们写SQL的时候并不能控制具体的执行过程,它们依赖执行引擎决定。而Hive和Spark SQL做为Map-Reduce模型的分布式执行引擎,其执行过程首先就涉及到如何将输入数据切分红一个个任务,分配给不一样的Map任务。在本文中,咱们就来说解Hive和Spark SQL是如何切分输入路径的。微信
Hive
Hive是起步较早的SQL on Hadoop项目,最先也是诞生于Hadoop中,因此输入划分这部分的代码与Hadoop相关度很是高。如今Hive广泛使用的输入格式是CombineHiveInputFormat
,它继承于HiveInputFormat
,而HiveInputFormat
实现了Hadoop的InputFormat
接口,其中的getSplits
方法用来获取具体的划分结果,划分出的一份输入数据被称为一个“Split”。在执行时,每一个Split对应到一个map任务。在划分Split时,首先挑出不能合并到一块儿的目录——好比开启了事务功能的路径。这些不能合并的目录必须单独处理,剩下的路径交给私有方法getCombineSplits
,这样Hive的一个map task最多能够处理多个目录下的文件。在实际操做中,咱们通常只要经过set mapred.max.split.size=xx;
便可控制文件合并的大小。当一个文件过大时,父类的getSplits
也会帮咱们完成相应的切分工做。session
Spark SQL
Spark的表有两种:DataSource表和Hive表。另外Spark后续版本中DataSource V2也将逐渐流行,目前还在不断发展中,暂时就不在这里讨论。咱们知道Spark SQL其实底层是Spark RDD,而RDD执行时,每一个map task会处理RDD的一个Partition中的数据(注意这里的Partition是RDD的概念,要和表的Partition进行区分)。所以,Spark SQL做业的任务切分关键在于底层RDD的partition如何切分。并发
Data Source表
Spark SQL的DataSource表在最终执行的RDD类为FileScanRDD
,由FileSourceScanExec
建立出来。在建立这种RDD的时候,具体的Partition直接做为参数传给了构造函数,所以划分输入的方法也在DataSourceScanExec.scala
文件中。具体分两步:首先把文件划分为PartitionFile
,再将较小的PartitionFile
进行合并。分布式
第一步部分代码以下:ide
if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, partitionDeleteDeltas, hosts) } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) Seq(PartitionedFile(partition.values, file.getPath.toUri.toString, 0, file.getLen, partitionDeleteDeltas, hosts)) }
咱们能够看出,Spark SQL首先根据文件类型判断单个文件是否可以切割,若是能够则按maxSplitBytes
进行切割。若是一个文件剩余部分没法填满maxSplitBytes
,也单独做为一个Partition。函数
第二部分代码以下所示:oop
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
这样咱们就能够依次遍历第一步切好的块,再按照maxSplitBytes
进行合并。注意合并文件时还需加上打开文件的预估代价openCostInBytes
。那么maxSplitBytes
和openCostInBytes
这两个关键参数怎么来的呢?布局
val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
不难看出,主要是spark.sql.files.maxPartitionBytes
、spark.sql.files.openCostInBytes
、调度器默认并发度以及全部输入文件实际大小所控制。大数据
Hive表
Spark SQL中的Hive表底层的RDD类为HadoopRDD
,由HadoopTableReader
类实现。不过此次,具体的Partition划分仍是依赖HadoopRDD
的getPartitions
方法,具体实现以下:
override def getPartitions: Array[Partition] = { ... try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { ... } }
不难看出,在处理Hive表的时候,Spark SQL把任务划分又交给了Hadoop的InputFormat那一套。不过须要注意的是,并非全部Hive表都归为这一类,Spark SQL会默认对ORC和Parquet的表进行转化,用本身的Data Source实现OrcFileFormat
和ParquetFileFormat
来把这两种表做为Data Source表来处理。
总结
切分输入路径只是大数据处理的第一步,虽然不起眼,可是也绝对不可或缺。低效的文件划分可能会给端到端的执行速度带来巨大的负面影响,更有可能影响到输出做业的文件布局,从而影响到整个数据流水线上全部做业的执行效率。万事开头难,为程序输入选择合适的配置参数,能够有效改善程序执行效率。
留个思考题给读者们:如何设置参数完全关闭Spark SQL data source表的文件合并?
本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。